Perl Assíncrono

Stanislaw Pusep
Publicado em 01/03/2012

Perl Assíncrono

Introdução

Esse artigo é dedicado aos 20 anos da WWW :)

Para muitos programadores Web, "assíncrono" remete a Ajax (afinal, é acrônimo para asynchronous JavaScript and XML). Para alguns outros, remete a Node.js. Evidentemente, a participação do JavaScript (ou, mais genericamente, ECMAScript) na formação de opinião a respeito de programação orientada a eventos foi significativa.

JavaScript, por sua vez, deve muito para Scheme (assim como Perl deve muito para Lisp). Não que a programação funcional e a orientada a eventos sejam duas faces da mesma moeda, mas ambos os conceitos requerem o mesmo "estado de espírito", por assim dizer. E esse "estado de espírito" se aplicou muito bem no universo da World Wide Web.

Um pouco de história

A abordagem de event loop é uma "técnica milenar", presente no nosso dia-a-dia nas mais diversas formas. O exemplo mais clássico é User Interface: o programa fica rodando em ciclo contínuo, aguardando input do usuário. Quem nasceu na década de 80, inconscientemente (ou não) explorou a propriedade intrínsica de event loop, desativando o modo "turbo" da CPU nas etapas mais difíceis dos jogos para entrar em modo "câmera lenta" (isso funcionava por que o timer, muitas vezes, era um mero contador dentro do loop principal).

Historicamente, o elo mais lento de um event loop foi a interface entre a cadeira e o teclado. E gastar ciclos da CPU para aguardar I/O é muito custoso em sistemas multitarefa. Então, abstraindo, ao invés de perguntar ao teclado toda hora se tem alguma tecla pressionada, fazemos o teclado disparar um aviso de que "algo aconteceu", e só então CPU se encarrega de verificar. Obviamente, o que vale para o teclado, vale também para o HD. E também para uma placa de rede.

E é aí que chegamos lá: Asynchronous I/O (também conhecido como non-blocking I/O) funciona muito bem para qualquer transação de dados que é significativamente mais lenta do que a CPU. Graças à Internet, isso é particularmente útil para a comunicação entre as CPUs localizadas em máquinas distintas. No caso de um banco de dados, podemos delegar uma query para um servidor e fazer outras coisas que não dependem diretamente do resultado da mesma.

Por muitos anos, sentiu-se a falta de uma linguagem comum entre programas se comunicando (no caso do banco de dados, precisaria ter cliente e servidor "conversando" através do mesmo protocolo; por exemplo, o do MySQL). Mas essa linguagem existiu desde 1991, apesar de só ter entrado na moda quando a apelidaram de REST (Representational state transfer). A visão que Tim Berners-Lee & Cia. tiveram naquela época foi muito... além.

Bom, espero que a minha colocação sobre um elo de casualidade entre Asynchronous I/O e World Wide Web fique um pouco mais clara em vista dessa breve explicação. Mas, vamos ao Perl, que se destaca em ambas as áreas :D

Perl

Asynchronous I/O não é módulo, nem biblioteca, nem framework: é apenas um paradigma. Portanto, pode-se fazer os programas assíncronos em Perl from scratch, assim como pode-se fazer programas orientados a objetos sem usar o Moose. Seguem algumas das possibilidades:

  • fork() (CORE) - a forma mais tradicional e robusta de se fazer mais de uma coisa ao mesmo tempo, em sistemas UNIX. Também é a única forma decente de se fazer uso do SMP (symmetric multiprocessing). Simplificando, a chamada de fork() duplica o código atualmente executado, com todos os handles abertos, buffers, variáveis, etc. Daí em diante, o processo-pai e o processo-filho seguem independentes. Agora, o que isso tem a ver com Asynchronous I/O? Por si só, pouca coisa, mas, acrescentando ao fork() os sinais, filas, pipes, semáforos e memória compartilhada, dá para ir muito longe. Um começo é a página Perl interprocess communication.
  • threads (CORE) - controverso! Diferentemente de muitas das coisas do Perl, nem sempre se comporta da forma que você espera. Ou seja, requer um conhecimento técnico bastante elaborado. Também não é muito amigável com os frameworks de eventos (o EV ao menos lida bem com o fork()).
  • AnyEvent - conforme a documentação do próprio módulo diz, está para os frameworks de loops de eventos assim como DBI está para os bancos de dados. Por essa razão, será o foco do presente artigo.
  • Coro - implementa coroutines em Perl. Diga-se de passagem, implementa de uma forma muito engenhosa. E o que seriam coroutines? Simplificando, é mais ou menos o mesmo mecanismo que o sistema operacional emprega para rodar vários programas "simultaneamente" na mesma CPU. Isso é, guarda estado do que está rodando, "congela", roda outro, guarda estado, "congela", e assim por diante.
  • IO::Lambda - outra abordagem interessante, implementa I/O assíncrono como um cálculo lambda. Lispers gonna Lisp.
  • EV, Event, IO::Async, POE - diversos frameworks de I/O assíncrono. A parte boa é: todos servem como backend para o AnyEvent!

AnyEvent

A razão de ser do AnyEvent é providenciar uma API compatível, portável e leve para programas assíncronos/orientados a eventos. Apesar de ter um backend próprio, implementado em Pure Perl, o AnyEvent pode fazer uso de módulos e bibliotecas especializados. Por exemplo, é possível aproveitar o loop de eventos do próprio Tk. Todavia, o backend que mais se destaca é o EV: além de apresentar o melhor desempenho, é o único (até onde eu saiba) que suporta fork() completamente. Então, para prosseguir com os exemplos desse artigo:

    perl -MCPAN -e "install EV"
    perl -MCPAN -e "install AnyEvent"

Cliente HTTP

Atenção!

Poupe-se do trabalho de copiar e colar o código picotado! O link para a versão completa está em "Referências"!

Um cliente HTTP é provavelmente a coisa mais simples que faça algo relativamente útil. Também ilustra bem a disparidade existente entre o processamento e o I/O:

    /usr/bin/time curl -o /dev/null http://www.cpan.org/src/5.0/perl-5.14.1.tar.gz
    ...
    0.03user 0.14system 1:42.13elapsed 0%CPU (0avgtext+0avgdata 12752maxresident)k
    280inputs+0outputs (1major+893minor)pagefaults 0swaps

Traduzindo: o processo que fez download do código-fonte do Perl levou um minuto e quarenta e dois segundos para completar, porém empregou uma quantidade irrisória de ciclos de CPU para isso. O resto do tempo ficou parado, aguardando I/O.

Aqui reescrevi o exemplo da documentação oficial do AnyEvent, só fiz questão em utilizar HTTP::Request para o processamento dos headers.

Começamos com o nosso boilerplate:

    #!/usr/bin/env perl
    use common::sense;

    use AnyEvent;
    use Data::Printer;
    use HTTP::Request;

    use constant CRLF => "\015\012";

Um destaque especial para common::sense, que, assim como o AnyEvent e o Coro, é da autoria do Marc Lehmann. Em suma, é um atalho para:

    use feature qw(say state switch);
    use strict;
    use utf8;
    use warnings;

Só que gasta menos memória (!) do que dar um use em todos eles, um a um.

E outro destaque para o Data::Printer, um excelente visualizador de dados do Breno G. de Oliveira.

Agora, registramos a conditional variable e callback para processar a resposta:

    my $cv = AE::cv;

    my $cb = sub {
        p @_;
        $cv->send;
    };

Conditional variable é o alicerce do AnyEvent: através dela declaramos ao loop de eventos que estamos esperando alguma coisa. No caso, quando $cb será executado, enviará um send() ao respectivo recv() (a ser declarado).

Enquanto isso, montamos o nosso HTTP::Request:

    my $req = new HTTP::Request(GET => 'http://www.cpan.org/');
    $req->header(Host           => $req->uri->host_port);
    $req->header(User_Agent     => "AnyEvent/$AE::VERSION Perl/$] ($^O)");

E um AnyEvent::Handle para a nossa conexão:

    my $buf = '';

    my $h;
    $h = new AnyEvent::Handle
        connect     => [$req->uri->host => $req->uri->port],
        on_eof      => sub {
            $cb->(HTTP::Response->parse($buf));
            $h->destroy;
        },
        on_error    => sub {
            $cb->(HTTP::Response->new(500, $!));
            $h->destroy;
        };

Alguns dos eventos (on_eof, on_error) já são declarados na instanciação do handle. Um cuidado especial deve ser tomado com os escopos léxicos: $h precisa ser visível nos closures dos eventos, por isso, my $h = new AnyEvent::Handle não daria certo!

Felizmente, AnyEvent não é 100% assíncrono, e preserva a nossa sanidade através do emprego de filas para dados em streams. Assim, podemos colocar a nossa requisição na fila do handle:

    $h->push_write(
        $req->method . ' ' . $req->uri->path_query . ' HTTP/1.0' . CRLF .
        $req->headers->as_string(CRLF) . CRLF .
        $req->content
    );

Para receber a resposta, mais um callback, e fechamos com recv() aguardando o respectivo send() através do $cv:

    $h->on_read(
        sub {
            my ($h) = @_;
            $buf .= $h->rbuf;
            $h->rbuf = '';
        }
    );

    $cv->recv;

Detalhe importante: o buffer do handle precisa ser esvaziado! Então, concatenamos o seu conteúdo no nosso $buf, que será processado no final da conexão (on_eof).

Em suma, o nosso fluxo é: cadastrar os callbacks para os eventos e ativar o loop através da conditional variable. Conforme os eventos vão acontecendo, os callbacks são disparados. A sequência dos acontecimentos raramente segue a organização lógica do código-fonte. Tanto que, nesse exemplo, o callback que termina o loop é o primeiro a ser declarado!

Isso pode parecer desmotivante, especialmente para uma aplicação tão simples e que não utilizou nenhuma vantagem da abordagem assíncrona. Mas calma que estamos chegando lá :D

Servidor HTTP

Tradicionalmente (tradição essa que provém do Apache e antes dele, do NCSA), os servidores HTTP tendem a empregar o velho e bom fork() para lidar com várias conexões simultâneas. Ou seja: para cada cliente que conecta, é criado um worker exclusivo, que atende àquele cliente enquanto o servidor está de prontidão para as outras conexões.

Porém o fork() é relativamente custoso, por mais otimizado que seja (não se copia o processo inteiro; basta replicar apenas as páginas writable da memória, e reaproveitar as read-only em todas as cópias). Threads (não especificamente os do Perl) são mais eficientes, ainda assim o custo para organizar a bagaça toda é bastante elevado. Eis que surge o problema C10k: como lidar com uma quantidade alta de conexões simultâneas? Rodar 10 mil processos é algo perturbador.

O Apache atualmente lida com as conexões de várias formas, inclusive é possível combinar fork() e threads, entretanto, a sua arquitetura limita bastante o desempenho. Assim, faz-se necessário combinar o Apache com nginx ou lighttpd. E como esses últimos conseguem a proeza de atender a C10k?

Asynchronous I/O!

O servidor que desenvolvi para servir como exemplo aqui está longe disso, ainda assim, é evidentemente muito mais veloz do que o Net::Server::HTTP:

    Server Software:        Net::Server::HTTP/0.99
    Server Hostname:        127.0.0.1
    Server Port:            8080
    ...
    Requests per second:    96.96 [#/sec] (mean)
    Time per request:       103.140 [ms] (mean)
    Time per request:       10.314 [ms] (mean, across all concurrent requests)
    Transfer rate:          71.86 [Kbytes/sec] received

    Server Software:        AnyEvent/6.02
    Server Hostname:        127.0.0.1
    Server Port:            8888
    ...
    Requests per second:    1801.45 [#/sec] (mean)
    Time per request:       5.551 [ms] (mean)
    Time per request:       0.555 [ms] (mean, across all concurrent requests)
    Transfer rate:          2274.68 [Kbytes/sec] received

Então, vamos começar:

    #!/usr/bin/env perl
    use common::sense;

    use AnyEvent;
    use AnyEvent::Handle;
    use AnyEvent::Log;
    use AnyEvent::Socket;
    use HTTP::Headers;
    use HTTP::Response;

    use constant CRLF => "\015\012";
    use constant MAXCONN => 100;
    use constant TIMEOUT => 10;

    $AnyEvent::Log::FILTER->level('debug');

Um feature muito bacana do AnyEvent é que ele tem um sisteminha de log. Apesar do próprio autor o descrever como simplório, para mim, pessoalmente, substitui o Log::Log4perl em muitos casos.

Inicializando o servidor:

    our %pool;
    my $srv = tcp_server(
        '127.0.0.1' => 8888,
        sub {
            my ($fh, $host, $port) = @_;

            # código do servidor
            ...;
        }
    );

    AE->cv->wait;

Como o objetivo do servidor é ficar permanentemente disponível até ser explicitamente terminado, não há necessidade de tratamento de conditional variable. É só dar um wait(), que faz o papel do recv().

Adentrando o código do servidor, as coisas começam a ficar interessantes. Assim exercemos o controle de limite de conexões:

    if (scalar keys %pool > MAXCONN) {
        AE::log error =>
            "deny connection from $host:$port (too many connections)\n";
        return;
    } else {
        AE::log warn =>
            "new connection from $host:$port\n";
    }

E instanciamos o AnyEvent::Handle para cada conexão:

    my $h = new AnyEvent::Handle(
        fh          => $fh,
        on_eof      => \&cleanup,
        on_error    => \&cleanup,
        timeout     => TIMEOUT,
    );

    $pool{fileno($fh)} = $h;
    AE::log debug =>
        sprintf "%d connection(s) in pool\n", scalar keys %pool;

O hash %pool é essencial não apenas para o controle de número de conexões. Como o handle é local ao escopo léxico do closure, é detonado pelo garbage collector assim que o closure termina, o que impede a execução dos seus callbacks. Então, o %pool serve também para preservar as referências aos handles.

Agora, enfileiramos o tratamento da primeira linha da requisição:

    my ($req, $hdr);

    $h->push_read(regex => qr{\015?\012}, sub {
        my ($h, $data) = @_;
        $data =~ s/\s+$//s;
        $req = $data;
        AE::log debug => "request: [$req]\n";
    });

(de novo, $req e $hdr estão declarados fora dos closures)

Este evento será disparado quando aparecer o primeiro line terminator no stream. Em outras palavras: o código acima pega a primeira linha da requisição (GET / HTTP/1.0).

E para tratar o resto da requisição, que consiste de headers (obrigatoriamente) e content (opcional):

    $h->push_read(regex => qr{(\015?\012){2}}, sub {
        my ($h, $data) = @_;
        $hdr = $data;
        AE::log debug => "got headers\n";
        if ($hdr =~ m{\bContent-length:\s*(\d+)\b}is) {
            AE::log debug => "expecting content\n";
            $h->push_read(chunk => int($1), sub {
                my ($h, $data) = @_;
                reply($h, $req, $hdr, $data);
            });
        } else {
            reply($h, $req, $hdr);
        }
    });

Nesse caso, temos uma decisão a tomar: se vier header com Content-length, cadastramos mais um evento que pega a quantidade de octets especificada. Senão, estamos feitos.

A rotina de limpeza (que vinculamos a on_eof e on_error) faz de tudo para terminar a conexão:

    sub cleanup {
        my ($h, $fatal, $msg) = @_;
        AE::log debug => "closing connection\n";
        my $id = fileno($h->{fh});
        delete $pool{$id} if defined $id;
        eval {
            no warnings;
            shutdown $h->{fh}, 2;
        };
        $h->destroy;
    };

Aliás, no fluxo normal (isso é, mais ou menos de acordo com o protocolo HTTP 1.0 como o conheço), a rotina de reply() deverá ser chamada antes da cleanup(). Normalmente, a requisição termina após uma linha em branco (dois line terminators em seguida), ou, se for POST, após esgotar todos os octets especificados no header.

Formulando a resposta para o cliente, através do HTTP::Response:

    sub reply {
        my ($h, $req, $hdr, $content) = @_;

        my $res = HTTP::Response->new(
            200 => 'OK',
            HTTP::Headers->new(
                Connection      => 'close',
                Content_Type    => 'text/html; charset=utf-8',
                Server          => "AnyEvent/$AE::VERSION Perl/$] ($^O)",
            )
        );
        $res->date(time);
        $res->protocol('HTTP/1.0');

        if ($req =~ m{^(GET|HEAD|POST|PUT)\s+(.+)\s+(HTTP/1\.[01])$}i) {
            my ($method, $uri, $protocol) = ($1, $2, $3);
            AE::log debug => "sending response\n";
            $res->content("Hello, World!");
        } else {
            AE::log error => "bad request\n";
            $res->code(400);
            $res->message('Bad Request');
            $res->content('Bad Request');
        }

        $h->push_write($res->as_string(CRLF));
        cleanup($h);
    };

Este servidorzinho só responde Hello, World! para os métodos GET, HEAD, POST e PUT, para qualquer URI (ou erro 400 - Bad Request para requisições desconhecidas). É trivial combiná-lo com File::Slurp para servir conteúdo estático, mas isso cabe a você, caro leitor ;)

Proxy HTTP

Perl é famoso por ser uma linguagem-cola (glue). Como bônus, e numa jogada de reaproveitamento via Copy & Paste, juntemos os nossos cliente + servidor HTTP para fazer um proxy. Para isso, o "Cliente HTTP" foi "encapsulado" numa subrotina, http_req(). E no "Servidor HTTP", substituímos a rotina reply() por esta:

    sub reply {
        my ($h, $req, $hdr, $content) = @_;

        if ($req =~ m{^(GET|HEAD|POST|PUT)\s+(https?://.+)\s+(HTTP/1\.[01])$}i) {
            # código para métodos HTTP
            ...;
        } elsif ($req =~ m{^CONNECT\s+([\w\.\-]+):(\d+)\s+(HTTP/1\.[01])$}i) {
            # código para túnel
            ...;
       } else {
            AE::log error => "bad request\n";
            $h->push_write(
                HTTP::Response->new(
                    400 => 'Bad Request',
                    undef, 'Bad Request'
                )->as_string(CRLF)
            );
            cleanup($h);
        }
    }

A principal diferença está na adição do método CONNECT, que permite o acesso a túnel encriptado HTTPS via proxy.

Formular a resposta para os métodos /(GET|HEAD|POST|PUT)/ consiste em desmontar os headers, construir uma requisição, enviá-la e retornar a resposta:

    my ($method, $uri, $protocol) = ($1, $2, $3);
    my $headers = HTTP::Headers->new;
    for (split /\015?\012/, $hdr) {
        $headers->header($1 => $2) if m{^\s*([\w\-]+)\s*:\s*(.+)$};
    }
    $headers->remove_header('Proxy-Connection');
    $headers->header(Connection => 'close');

    AE::log debug => "fetching $uri\n";
    http_req
        new HTTP::Request($method => $uri, $headers, $content),
        sub {
            my $n = 2**12;  # 4KB
            my $buf = $_[0]->as_string(CRLF);
            my $len = length $buf;

            AE::log debug => "sending $uri ($len bytes)\n";

            $h->push_write($_)
                for unpack "a$n" x ($len / $n - 1) . 'a*', $buf;

            AE::timer 0, 0, sub { cleanup($h); };
        }

O problema dessa abordagem é que se faz necessário aguardar o término da requisição encaminhada, para só então retornar a resposta. Por um lado, isso leva bastante tempo, em que o cliente fica simplesmente esperando o proxy baixar a resposta. Por outro, a resposta é armazenada inteiramente na memória, então, baixe ISO através desse proxy por conta e risco! E, para terminar, é necessário fragmentar a resposta para enviá-la através do push_write (pelo menos, com EV como backend).

Em contrapartida, o tratamento para CONNECT é muito mais elegante:

    my ($peer_host, $peer_port, $protocol) = ($1, $2, $3);
    AE::log debug => "connecting to $peer_host:$peer_port\n";

    my $peer_h;
    $peer_h = new AnyEvent::Handle
        connect     => [$peer_host => $peer_port],
        on_eof      => sub {
            $peer_h->destroy;
            cleanup($h);
        },
        on_error    => sub {
            $peer_h->destroy;
            cleanup($h);
        },
        on_connect  => sub {
            AE::log debug => "connected to $peer_host:$peer_port\n";
            $h->push_write(
                "HTTP/1.0 200 Connection established" .
                (CRLF x 2)
            );

            $h->on_read(
                sub {
                    AE::log debug => "send to $peer_host:$peer_port\n";
                    $peer_h->push_write($_[0]->rbuf);
                    $_[0]->rbuf = '';
                }
            );

            $peer_h->on_read(
                sub {
                    AE::log debug => "recv from $peer_host:$peer_port\n";
                    $h->push_write($_[0]->rbuf);
                    $_[0]->rbuf = '';
                }
            );
        };

Ao receber um CONNECT, o proxy se conecta a endereço/porta especificados e faz um "curto-circuito", vinculando a saída do cliente à entrada do servidor remoto, e vice-versa.

Quanto à funcionalidade: zerei o cache do meu Chrome, apontei ele para usar o proxy recém-criado, e reiniciei. Abriu todas as minhas abas, entre elas a interface do Gmail e do Google Reader, e funcionou decentemente por algumas horas. Mas, é claro, cada caso é um caso, então não me responsabilizo se o seu SSD pifar após o uso de qualquer um dos meus códigos :P

Apêndice: AnyEvent::Util

Lembrando: AnyEvent é uma API para frameworks de loops de eventos. De forma alguma sequer um deles se encarrega em distribuir o processamento entre as várias CPUs que você tem! Traduzindo: se você tem um sistema com 8 núcleos, não espere que os seus programas que empreguem o paradigma assíncrono automaticamente passem a aproveitar os seus recursos de hardware. Para delegar o serviço pesado a outras CPUs, você ainda precisa de um fork(). Recapitulando: se o seu problema é I/O, a sua solução é (provavelmente) AnyEvent. E, se o seu problema é processamento, a solução é fork() (threads não é solução, é $problema ** 2, sorry). Agora, se você precisa processar lotes de dados e receber os resultados... Você precisa de mais um glue: AnyEvent::Util.

Como um (mau) exemplo, segue a implementação do sleep sort:

    #!/usr/bin/env perl
    use common::sense;

    use AnyEvent::Util;
    use List::Util qw(shuffle);

    my $cv = AE::cv;

    $AnyEvent::Util::MAX_FORKS = 10;

    for my $i (shuffle(1 .. 10)) {
        $cv->begin;
        fork_call {
            sleep $i;
        } sub {
            say shift;
            $cv->end;
        };
    }

    $cv->wait;

Obviamente, espera-se que o programador utilize algo envolvendo o módulo Graph no lugar do sleep(), para fazer jus à técnica.

Neste momento, estamos prontos para dar um novo significado à expressão "mal gosto":

    #!/usr/bin/env perl
    use common::sense;

    use AnyEvent::Util;
    use List::Util qw(shuffle);

    my $cv = AE::cv;

    for my $i (shuffle(1 .. 10)) {
        $cv->begin;
        run_cmd ['/bin/sleep', $i],
            '>' => sub {
                say $i;
                $cv->end;
            };
    }

    $cv->wait;

É exatamente o que parece: sleep sort chamando o processo sleep(1) externo. Mas é claro que você enxergou aqui a possibilidade de interfacear com o xz(1) sem apostar todas as fichas no IO::Compress::Lzma.

Considerações

Algumas observações para escapar da armadilha do spaghetti code:

  • Os programas orientados a eventos tendem a crescer "na diagonal". Isso é contornável criando subrotinas a torto e a direito... O que pode causar outro problema.
  • O escopo das variáveis pode causar uma grande confusão! Um my mal-empregado num closure pode se tornar uma grande ameaça à saúde mental.
  • Atenção redobrada para conditional variables: tudo o que sobe tem que descer! Um recv() é anulado por qualquer send(). Para exercer controle sobre eventos do mesmo tipo rodando em paralelo, empregue begin()/end(): begin() incrementa o contador da conditional variables, e end() o decrementa.
  • Debugar programas assíncronos não é divertido. Pare para pensar e trate de acertar na primeira. É sério.
  • O namespace AnyEvent tem muita coisa boa. Pesquise antes de fazer você mesmo. No pior caso (quando não tem exatamente aquilo que você pretende fazer), aproveite o módulo mais similar/simples como boilerplate ;)

Referências

Autor

Licença

Este texto está licenciado sob os termos da Creative Commons by-sa, http://creativecommons.org/licenses/by-sa/3.0/br/

Licença Creative Commons
This work is licensed under a Creative Commons Attribution-ShareAlike License.
blog comments powered by Disqus