Stanislaw PusepPublicado em 01/03/2012
Esse artigo é dedicado aos 20 anos da WWW :)
Para muitos programadores Web, "assíncrono" remete a Ajax (afinal, é acrônimo para asynchronous JavaScript and XML). Para alguns outros, remete a Node.js. Evidentemente, a participação do JavaScript (ou, mais genericamente, ECMAScript) na formação de opinião a respeito de programação orientada a eventos foi significativa.
JavaScript, por sua vez, deve muito para Scheme (assim como Perl deve muito para Lisp). Não que a programação funcional e a orientada a eventos sejam duas faces da mesma moeda, mas ambos os conceitos requerem o mesmo "estado de espírito", por assim dizer. E esse "estado de espírito" se aplicou muito bem no universo da World Wide Web.
A abordagem de event loop é uma "técnica milenar", presente no nosso dia-a-dia nas mais diversas formas. O exemplo mais clássico é User Interface: o programa fica rodando em ciclo contínuo, aguardando input do usuário. Quem nasceu na década de 80, inconscientemente (ou não) explorou a propriedade intrínsica de event loop, desativando o modo "turbo" da CPU nas etapas mais difíceis dos jogos para entrar em modo "câmera lenta" (isso funcionava por que o timer, muitas vezes, era um mero contador dentro do loop principal).
Historicamente, o elo mais lento de um event loop foi a interface entre a cadeira e o teclado. E gastar ciclos da CPU para aguardar I/O é muito custoso em sistemas multitarefa. Então, abstraindo, ao invés de perguntar ao teclado toda hora se tem alguma tecla pressionada, fazemos o teclado disparar um aviso de que "algo aconteceu", e só então CPU se encarrega de verificar. Obviamente, o que vale para o teclado, vale também para o HD. E também para uma placa de rede.
E é aí que chegamos lá: Asynchronous I/O (também conhecido como non-blocking I/O) funciona muito bem para qualquer transação de dados que é significativamente mais lenta do que a CPU. Graças à Internet, isso é particularmente útil para a comunicação entre as CPUs localizadas em máquinas distintas. No caso de um banco de dados, podemos delegar uma query para um servidor e fazer outras coisas que não dependem diretamente do resultado da mesma.
Por muitos anos, sentiu-se a falta de uma linguagem comum entre programas se comunicando (no caso do banco de dados, precisaria ter cliente e servidor "conversando" através do mesmo protocolo; por exemplo, o do MySQL). Mas essa linguagem existiu desde 1991, apesar de só ter entrado na moda quando a apelidaram de REST (Representational state transfer). A visão que Tim Berners-Lee & Cia. tiveram naquela época foi muito... além.
Bom, espero que a minha colocação sobre um elo de casualidade entre Asynchronous I/O e World Wide Web fique um pouco mais clara em vista dessa breve explicação.
Mas, vamos ao Perl, que se destaca em ambas as áreas :D
Asynchronous I/O não é módulo, nem biblioteca, nem framework: é apenas um paradigma. Portanto, pode-se fazer os programas assíncronos em Perl from scratch, assim como pode-se fazer programas orientados a objetos sem usar o Moose. Seguem algumas das possibilidades:
fork()
duplica o código atualmente executado, com todos os handles abertos, buffers, variáveis, etc.
Daí em diante, o processo-pai e o processo-filho seguem independentes.
Agora, o que isso tem a ver com Asynchronous I/O?
Por si só, pouca coisa, mas, acrescentando ao fork()
os sinais, filas, pipes, semáforos e memória compartilhada, dá para ir muito longe.
Um começo é a página Perl interprocess communication. fork()
). A razão de ser do AnyEvent é providenciar uma API compatível, portável e leve para programas assíncronos/orientados a eventos.
Apesar de ter um backend próprio, implementado em Pure Perl, o AnyEvent pode fazer uso de módulos e bibliotecas especializados.
Por exemplo, é possível aproveitar o loop de eventos do próprio Tk.
Todavia, o backend que mais se destaca é o EV: além de apresentar o melhor desempenho, é o único (até onde eu saiba) que suporta fork()
completamente.
Então, para prosseguir com os exemplos desse artigo:
perl -MCPAN -e "install EV"
perl -MCPAN -e "install AnyEvent"
Atenção!
Poupe-se do trabalho de copiar e colar o código picotado! O link para a versão completa está em "Referências"!
Um cliente HTTP é provavelmente a coisa mais simples que faça algo relativamente útil. Também ilustra bem a disparidade existente entre o processamento e o I/O:
/usr/bin/time curl -o /dev/null http://www.cpan.org/src/5.0/perl-5.14.1.tar.gz
...
0.03user 0.14system 1:42.13elapsed 0%CPU (0avgtext+0avgdata 12752maxresident)k
280inputs+0outputs (1major+893minor)pagefaults 0swaps
Traduzindo: o processo que fez download do código-fonte do Perl levou um minuto e quarenta e dois segundos para completar, porém empregou uma quantidade irrisória de ciclos de CPU para isso. O resto do tempo ficou parado, aguardando I/O.
Aqui reescrevi o exemplo da documentação oficial do AnyEvent, só fiz questão em utilizar HTTP::Request para o processamento dos headers.
Começamos com o nosso boilerplate:
#!/usr/bin/env perl
use common::sense;
use AnyEvent;
use Data::Printer;
use HTTP::Request;
use constant CRLF => "\015\012";
Um destaque especial para common::sense, que, assim como o AnyEvent e o Coro, é da autoria do Marc Lehmann. Em suma, é um atalho para:
use feature qw(say state switch);
use strict;
use utf8;
use warnings;
Só que gasta menos memória (!) do que dar um use
em todos eles, um a um.
E outro destaque para o Data::Printer, um excelente visualizador de dados do Breno G. de Oliveira.
Agora, registramos a conditional variable e callback para processar a resposta:
my $cv = AE::cv;
my $cb = sub {
p @_;
$cv->send;
};
Conditional variable é o alicerce do AnyEvent: através dela declaramos ao loop de eventos que estamos esperando alguma coisa.
No caso, quando $cb
será executado, enviará um send()
ao respectivo recv()
(a ser declarado).
Enquanto isso, montamos o nosso HTTP::Request:
my $req = new HTTP::Request(GET => 'http://www.cpan.org/');
$req->header(Host => $req->uri->host_port);
$req->header(User_Agent => "AnyEvent/$AE::VERSION Perl/$] ($^O)");
E um AnyEvent::Handle para a nossa conexão:
my $buf = '';
my $h;
$h = new AnyEvent::Handle
connect => [$req->uri->host => $req->uri->port],
on_eof => sub {
$cb->(HTTP::Response->parse($buf));
$h->destroy;
},
on_error => sub {
$cb->(HTTP::Response->new(500, $!));
$h->destroy;
};
Alguns dos eventos (on_eof
, on_error
) já são declarados na instanciação do handle.
Um cuidado especial deve ser tomado com os escopos léxicos: $h
precisa ser visível nos closures dos eventos, por isso, my $h = new AnyEvent::Handle
não daria certo!
Felizmente, AnyEvent não é 100% assíncrono, e preserva a nossa sanidade através do emprego de filas para dados em streams. Assim, podemos colocar a nossa requisição na fila do handle:
$h->push_write(
$req->method . ' ' . $req->uri->path_query . ' HTTP/1.0' . CRLF .
$req->headers->as_string(CRLF) . CRLF .
$req->content
);
Para receber a resposta, mais um callback, e fechamos com recv()
aguardando o respectivo send()
através do $cv
:
$h->on_read(
sub {
my ($h) = @_;
$buf .= $h->rbuf;
$h->rbuf = '';
}
);
$cv->recv;
Detalhe importante: o buffer do handle precisa ser esvaziado! Então, concatenamos o seu conteúdo no nosso $buf
, que será processado no final da conexão (on_eof
).
Em suma, o nosso fluxo é: cadastrar os callbacks para os eventos e ativar o loop através da conditional variable. Conforme os eventos vão acontecendo, os callbacks são disparados. A sequência dos acontecimentos raramente segue a organização lógica do código-fonte. Tanto que, nesse exemplo, o callback que termina o loop é o primeiro a ser declarado!
Isso pode parecer desmotivante, especialmente para uma aplicação tão simples e que não utilizou nenhuma vantagem da abordagem assíncrona.
Mas calma que estamos chegando lá :D
Tradicionalmente (tradição essa que provém do Apache e antes dele, do NCSA), os servidores HTTP tendem a empregar o velho e bom fork()
para lidar com várias conexões simultâneas.
Ou seja: para cada cliente que conecta, é criado um worker exclusivo, que atende àquele cliente enquanto o servidor está de prontidão para as outras conexões.
Porém o fork()
é relativamente custoso, por mais otimizado que seja (não se copia o processo inteiro; basta replicar apenas as páginas writable da memória, e reaproveitar as read-only em todas as cópias).
Threads (não especificamente os do Perl) são mais eficientes, ainda assim o custo para organizar a bagaça toda é bastante elevado.
Eis que surge o problema C10k: como lidar com uma quantidade alta de conexões simultâneas?
Rodar 10 mil processos é algo perturbador.
O Apache atualmente lida com as conexões de várias formas, inclusive é possível combinar fork()
e threads, entretanto, a sua arquitetura limita bastante o desempenho.
Assim, faz-se necessário combinar o Apache com nginx ou lighttpd.
E como esses últimos conseguem a proeza de atender a C10k?
Asynchronous I/O!
O servidor que desenvolvi para servir como exemplo aqui está longe disso, ainda assim, é evidentemente muito mais veloz do que o Net::Server::HTTP:
Server Software: Net::Server::HTTP/0.99
Server Hostname: 127.0.0.1
Server Port: 8080
...
Requests per second: 96.96 [#/sec] (mean)
Time per request: 103.140 [ms] (mean)
Time per request: 10.314 [ms] (mean, across all concurrent requests)
Transfer rate: 71.86 [Kbytes/sec] received
Server Software: AnyEvent/6.02
Server Hostname: 127.0.0.1
Server Port: 8888
...
Requests per second: 1801.45 [#/sec] (mean)
Time per request: 5.551 [ms] (mean)
Time per request: 0.555 [ms] (mean, across all concurrent requests)
Transfer rate: 2274.68 [Kbytes/sec] received
Então, vamos começar:
#!/usr/bin/env perl
use common::sense;
use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Log;
use AnyEvent::Socket;
use HTTP::Headers;
use HTTP::Response;
use constant CRLF => "\015\012";
use constant MAXCONN => 100;
use constant TIMEOUT => 10;
$AnyEvent::Log::FILTER->level('debug');
Um feature muito bacana do AnyEvent é que ele tem um sisteminha de log. Apesar do próprio autor o descrever como simplório, para mim, pessoalmente, substitui o Log::Log4perl em muitos casos.
Inicializando o servidor:
our %pool;
my $srv = tcp_server(
'127.0.0.1' => 8888,
sub {
my ($fh, $host, $port) = @_;
# código do servidor
...;
}
);
AE->cv->wait;
Como o objetivo do servidor é ficar permanentemente disponível até ser explicitamente terminado, não há necessidade de tratamento de conditional variable.
É só dar um wait()
, que faz o papel do recv()
.
Adentrando o código do servidor, as coisas começam a ficar interessantes. Assim exercemos o controle de limite de conexões:
if (scalar keys %pool > MAXCONN) {
AE::log error =>
"deny connection from $host:$port (too many connections)\n";
return;
} else {
AE::log warn =>
"new connection from $host:$port\n";
}
E instanciamos o AnyEvent::Handle para cada conexão:
my $h = new AnyEvent::Handle(
fh => $fh,
on_eof => \&cleanup,
on_error => \&cleanup,
timeout => TIMEOUT,
);
$pool{fileno($fh)} = $h;
AE::log debug =>
sprintf "%d connection(s) in pool\n", scalar keys %pool;
O hash %pool
é essencial não apenas para o controle de número de conexões.
Como o handle é local ao escopo léxico do closure, é detonado pelo garbage collector assim que o closure termina, o que impede a execução dos seus callbacks
.
Então, o %pool
serve também para preservar as referências aos handles.
Agora, enfileiramos o tratamento da primeira linha da requisição:
my ($req, $hdr);
$h->push_read(regex => qr{\015?\012}, sub {
my ($h, $data) = @_;
$data =~ s/\s+$//s;
$req = $data;
AE::log debug => "request: [$req]\n";
});
(de novo, $req
e $hdr
estão declarados fora dos closures)
Este evento será disparado quando aparecer o primeiro line terminator no stream.
Em outras palavras: o código acima pega a primeira linha da requisição (GET / HTTP/1.0
).
E para tratar o resto da requisição, que consiste de headers (obrigatoriamente) e content (opcional):
$h->push_read(regex => qr{(\015?\012){2}}, sub {
my ($h, $data) = @_;
$hdr = $data;
AE::log debug => "got headers\n";
if ($hdr =~ m{\bContent-length:\s*(\d+)\b}is) {
AE::log debug => "expecting content\n";
$h->push_read(chunk => int($1), sub {
my ($h, $data) = @_;
reply($h, $req, $hdr, $data);
});
} else {
reply($h, $req, $hdr);
}
});
Nesse caso, temos uma decisão a tomar: se vier header com Content-length
, cadastramos mais um evento que pega a quantidade de octets especificada.
Senão, estamos feitos.
A rotina de limpeza (que vinculamos a on_eof
e on_error
) faz de tudo para terminar a conexão:
sub cleanup {
my ($h, $fatal, $msg) = @_;
AE::log debug => "closing connection\n";
my $id = fileno($h->{fh});
delete $pool{$id} if defined $id;
eval {
no warnings;
shutdown $h->{fh}, 2;
};
$h->destroy;
};
Aliás, no fluxo normal (isso é, mais ou menos de acordo com o protocolo HTTP 1.0 como o conheço), a rotina de reply()
deverá ser chamada antes da cleanup()
.
Normalmente, a requisição termina após uma linha em branco (dois line terminators em seguida), ou, se for POST, após esgotar todos os octets especificados no header.
Formulando a resposta para o cliente, através do HTTP::Response:
sub reply {
my ($h, $req, $hdr, $content) = @_;
my $res = HTTP::Response->new(
200 => 'OK',
HTTP::Headers->new(
Connection => 'close',
Content_Type => 'text/html; charset=utf-8',
Server => "AnyEvent/$AE::VERSION Perl/$] ($^O)",
)
);
$res->date(time);
$res->protocol('HTTP/1.0');
if ($req =~ m{^(GET|HEAD|POST|PUT)\s+(.+)\s+(HTTP/1\.[01])$}i) {
my ($method, $uri, $protocol) = ($1, $2, $3);
AE::log debug => "sending response\n";
$res->content("Hello, World!");
} else {
AE::log error => "bad request\n";
$res->code(400);
$res->message('Bad Request');
$res->content('Bad Request');
}
$h->push_write($res->as_string(CRLF));
cleanup($h);
};
Este servidorzinho só responde Hello, World! para os métodos GET, HEAD, POST e PUT, para qualquer URI (ou erro 400 - Bad Request para requisições desconhecidas).
É trivial combiná-lo com File::Slurp para servir conteúdo estático, mas isso cabe a você, caro leitor ;)
Perl é famoso por ser uma linguagem-cola (glue).
Como bônus, e numa jogada de reaproveitamento via Copy & Paste, juntemos os nossos cliente + servidor HTTP para fazer um proxy.
Para isso, o "Cliente HTTP" foi "encapsulado" numa subrotina, http_req()
.
E no "Servidor HTTP", substituímos a rotina reply()
por esta:
sub reply {
my ($h, $req, $hdr, $content) = @_;
if ($req =~ m{^(GET|HEAD|POST|PUT)\s+(https?://.+)\s+(HTTP/1\.[01])$}i) {
# código para métodos HTTP
...;
} elsif ($req =~ m{^CONNECT\s+([\w\.\-]+):(\d+)\s+(HTTP/1\.[01])$}i) {
# código para túnel
...;
} else {
AE::log error => "bad request\n";
$h->push_write(
HTTP::Response->new(
400 => 'Bad Request',
undef, 'Bad Request'
)->as_string(CRLF)
);
cleanup($h);
}
}
A principal diferença está na adição do método CONNECT, que permite o acesso a túnel encriptado HTTPS via proxy.
Formular a resposta para os métodos /(GET|HEAD|POST|PUT)/
consiste em desmontar os headers, construir uma requisição, enviá-la e retornar a resposta:
my ($method, $uri, $protocol) = ($1, $2, $3);
my $headers = HTTP::Headers->new;
for (split /\015?\012/, $hdr) {
$headers->header($1 => $2) if m{^\s*([\w\-]+)\s*:\s*(.+)$};
}
$headers->remove_header('Proxy-Connection');
$headers->header(Connection => 'close');
AE::log debug => "fetching $uri\n";
http_req
new HTTP::Request($method => $uri, $headers, $content),
sub {
my $n = 2**12; # 4KB
my $buf = $_[0]->as_string(CRLF);
my $len = length $buf;
AE::log debug => "sending $uri ($len bytes)\n";
$h->push_write($_)
for unpack "a$n" x ($len / $n - 1) . 'a*', $buf;
AE::timer 0, 0, sub { cleanup($h); };
}
O problema dessa abordagem é que se faz necessário aguardar o término da requisição encaminhada, para só então retornar a resposta.
Por um lado, isso leva bastante tempo, em que o cliente fica simplesmente esperando o proxy baixar a resposta.
Por outro, a resposta é armazenada inteiramente na memória, então, baixe ISO através desse proxy por conta e risco!
E, para terminar, é necessário fragmentar a resposta para enviá-la através do push_write
(pelo menos, com EV como backend).
Em contrapartida, o tratamento para CONNECT é muito mais elegante:
my ($peer_host, $peer_port, $protocol) = ($1, $2, $3);
AE::log debug => "connecting to $peer_host:$peer_port\n";
my $peer_h;
$peer_h = new AnyEvent::Handle
connect => [$peer_host => $peer_port],
on_eof => sub {
$peer_h->destroy;
cleanup($h);
},
on_error => sub {
$peer_h->destroy;
cleanup($h);
},
on_connect => sub {
AE::log debug => "connected to $peer_host:$peer_port\n";
$h->push_write(
"HTTP/1.0 200 Connection established" .
(CRLF x 2)
);
$h->on_read(
sub {
AE::log debug => "send to $peer_host:$peer_port\n";
$peer_h->push_write($_[0]->rbuf);
$_[0]->rbuf = '';
}
);
$peer_h->on_read(
sub {
AE::log debug => "recv from $peer_host:$peer_port\n";
$h->push_write($_[0]->rbuf);
$_[0]->rbuf = '';
}
);
};
Ao receber um CONNECT, o proxy se conecta a endereço/porta especificados e faz um "curto-circuito", vinculando a saída do cliente à entrada do servidor remoto, e vice-versa.
Quanto à funcionalidade: zerei o cache do meu Chrome, apontei ele para usar o proxy recém-criado, e reiniciei.
Abriu todas as minhas abas, entre elas a interface do Gmail e do Google Reader, e funcionou decentemente por algumas horas.
Mas, é claro, cada caso é um caso, então não me responsabilizo se o seu SSD pifar após o uso de qualquer um dos meus códigos :P
Lembrando: AnyEvent é uma API para frameworks de loops de eventos.
De forma alguma sequer um deles se encarrega em distribuir o processamento entre as várias CPUs que você tem!
Traduzindo: se você tem um sistema com 8 núcleos, não espere que os seus programas que empreguem o paradigma assíncrono automaticamente passem a aproveitar os seus recursos de hardware.
Para delegar o serviço pesado a outras CPUs, você ainda precisa de um fork()
.
Recapitulando: se o seu problema é I/O, a sua solução é (provavelmente) AnyEvent.
E, se o seu problema é processamento, a solução é fork()
(threads não é solução, é $problema ** 2
, sorry).
Agora, se você precisa processar lotes de dados e receber os resultados...
Você precisa de mais um glue: AnyEvent::Util.
Como um (mau) exemplo, segue a implementação do sleep sort:
#!/usr/bin/env perl
use common::sense;
use AnyEvent::Util;
use List::Util qw(shuffle);
my $cv = AE::cv;
$AnyEvent::Util::MAX_FORKS = 10;
for my $i (shuffle(1 .. 10)) {
$cv->begin;
fork_call {
sleep $i;
} sub {
say shift;
$cv->end;
};
}
$cv->wait;
Obviamente, espera-se que o programador utilize algo envolvendo o módulo Graph no lugar do sleep()
, para fazer jus à técnica.
Neste momento, estamos prontos para dar um novo significado à expressão "mal gosto":
#!/usr/bin/env perl
use common::sense;
use AnyEvent::Util;
use List::Util qw(shuffle);
my $cv = AE::cv;
for my $i (shuffle(1 .. 10)) {
$cv->begin;
run_cmd ['/bin/sleep', $i],
'>' => sub {
say $i;
$cv->end;
};
}
$cv->wait;
É exatamente o que parece: sleep sort chamando o processo sleep(1) externo. Mas é claro que você enxergou aqui a possibilidade de interfacear com o xz(1) sem apostar todas as fichas no IO::Compress::Lzma.
Algumas observações para escapar da armadilha do spaghetti code:
my
mal-empregado num closure pode se tornar uma grande ameaça à saúde mental. recv()
é anulado por qualquer send()
.
Para exercer controle sobre eventos do mesmo tipo rodando em paralelo, empregue begin()
/end()
: begin()
incrementa o contador da conditional variables, e end()
o decrementa. ;)
Este texto está licenciado sob os termos da Creative Commons by-sa, http://creativecommons.org/licenses/by-sa/3.0/br/