Thiago Berlitz Rondon

AMQP, RabbitMQ e Perl
Publicado em 01/09/2010

AMQP, RabbitMQ e Perl.

Este artigo é sobre o conjunto: as necessidades de se trabalhar em sistemas orientados a mensagem (MOM), o protocolo AMQP, a implementação RabbitMQ e os "martelos" disponíveis no repositório CPAN.

No final dele, irei explorar o conteúdo abordado no artigo com um módulo que estou desenvolvendo para facilitar a utilização deste conjunto.

Introdução, medindo as necessidades.

Vamos refletir sobre necessidades diversas envolvendo filas e mensagens neste primeiro momento, para discutirmos uma solução posteriormente.

Cenários

Necessidade de expansão e comunicação em redes heterogenias de sistemas, a implementação de intermediários responsáveis pela comunicação é inevitável, como protocolos wire-level que definam o comportamento da conversação.

Padrões

Para que sistemas distintos se comuniquem e se tenha interoperabilidade, a padronização destas mensagens se torna um pré-requisito, e quando a modelagem desta conversação é baseada em um processo assíncrono, a utilização de filas é uma ótima solução.

Fila

A fila é uma maneira de se organizar elementos em uma estrutura linear, com um ponto de espera em comum, onde normalmente o primeiro a chegar é o primeiro a sair (FIFO).

Filas, Onde ?

A utilização de filas normalmente esta para suprir a necessidade de aproveitar melhor um recurso que tem uma limitação para processar simultaneamente N elementos, e quando chega neste limite a exigência de uma fila de espera é uma ótima solução para organizar e planejar este processamento.

Geralmente, é utilizada em casos onde a demanda tem "picos" altos de chegada de elementos, porém em outros momentos não, disponibilizando assim o recursos para trabalhar de forma constante para um melhor aproveitamento.

Arquitetura do intermediário

Conhecido em esquemas de fila, como "Message Broker", é o responsável por gerenciar as mensagens trocadas entre os pontos de interesse, ou seja uma arquitetura padrão para validação, transformação (serialização) e roteamento.

Evento

E quando há mensagens ou filas distintas, começamos a ter a necessidade de trabalhar com ações baseadas em eventos, ou seja tomando decisões dependendo da mensagem ou da fila.

Por exemplo, na vida real existem filas preferenciais para gestantes e idosos no qual há uma necessidade que eles sejam atendidos de forma mais rápida quando há fila.

Vamos exemplificar, você pode ter uma "array" com ações distintas para o seu sistema, supondo que esta array se chame "fila_de_acoes".

    array @fila_de_acoes

    colocar_na_fila (@fila_de_acoes, "Primeira mensagem")
    colocar_na_fila (@fila_de_acoes, "Segunda mensagem")
    colocar_na_fila (@fila_de_acoes, "Terceiro mensagem")

Colocar elementos na fila, é um tipo de evento, que pode ser executado por apenas um "publicador" ou vários.

Agora, você tem uma "array" com vários elementos nesta fila, e precisamos tratar os eventos para os sistemas lerem esta mensagem.

    buscar @fila_de_acoes
        retirar esta mensagem da fila;
        imprimir esta mensagem no arquivo de log;
        (...)

Veja, que quando eu busco uma mensagem, eu posso opcionalmente apagar ela da fila, e quando finalizar a minha ação, ir buscar o próximo elemento da fila. O fluxo de comunicação com um intermediador entre os publicadores (que inserem elementos na fila) ou os consumidores (que buscam por elementos), podem estar dispostos da seguinte forma:

Point-to-point

Um publicador e um consumidor.

Store-and-Forward

Vários publicadores e um consumidor.

Publish-Subscribe

Muitos publicadores e muitos consumidores.

Topic/Content-based

Os consumidores podem analisar, dependendo do conteúdo do elemento.

Provavelmente, você irá conseguir se encaixar em alguma destas características, talvez você precise implementar alguns detalhes, como por exemplo que o envio de mensagem seja "anycast", no qual dever resolver problemas de envio de mensagem unicast (um-para-um), como multicast ou broadcast (um-para-muitos).

Enfim, veja como existem possibilidades de serem efetuadas dentro de uma solução baseada em mensagens com filas, e que provavelmente você irá necessitar de um formato de comunicação único para esta gerencia.

Imagine que você ainda precise de interoperabilidade, facilidades de implementação em sistemas distribuidores, resolução de problemas de redes, definição de comportamentos e etc... vem a pergunta. Existe uma solução madura para estes problemas ?

Sim, o protocolo AMQP, e agora vamos tentar explorar os motivos.

Introdução ao AMQP.

AMQP é o acrónimo para Advanced Queueing Protocol no qual é um protocolo que define o comportamento que deve haver entre aplicativos e serviços sobre filas para que exista interoperabilidade, basicamente isto significa que o protocolo é wire-level, ao invés de definir funções e criar libs, ele apenas define uma sequencia de conversação pela rede, para que as coisas sejam feitas.


Podemos dividir nossa abstração em três papeis, o publicador (publisher), o intermediário (broker) e os consumidores (consumer), como é ilustrado na imagem.

A Analogia "comum".

Iremos discutir algumas questões superficiais sobre o protocolo, e para começar vou utilizar uma analogia que existe na especificação do próprio protocolo que é com o serviço de e-mail, e que é muito utilizado na introdução de outros protocolos do mesmo tipo.

Um aviso: Este é um exemplo muito simples, e não deve ser levado tão a sério.

1. Fila de mensagens

É como o mailbox, no qual é um arquivo contendo os e-mails que são recebidos pelo usuário em um servidor de e-mail.

2. Consumidor

É como se um leitor de e-mail, que pode buscar ou apagar e-mails.

3. Exchanges

São como um MTA, no qual analisa o e-mail e decide, baseado na sua "tabela de roteamento" e então move a mensagem para uma ou mais mailbox.

4. Routing key

Corresponde ao To:, Cc: ou Bcc:, sem a informação de qual servidor é.

5. Um Exchange.

Cada instancia de um exchange, é como se fosse um processo de MTA separada.

6. Binding

Desempenha o mesmo papel de uma entrada na tabela de roteamento do MTA.

Porém existem diferenças, no AMQP ele tem a habilidade de criar filas (mailboxes), exchanges (MTA) e bindings (routing) em tempo de execução. Existem outras diferenças também relacionadas como o servidor armazenar as mensagens, entre outros.

Atenção: O uso desta analogia é só para efetuar uma introdução, não é para se aceitar a ideia crua na comparação, pois existem diferenças praticas significativas.

Modelo.

O modelo é a definição da semântica explicita para que haja na implementação dos servidores a interoperabilidade, no qual o funcionamento é um conjunto de componentes e regras para esta ligação. Os três componentes principais são:

Exchange

É responsável por receber estas mensagens dos publicadores e rotear para as filas de mensagens baseado em critérios.

Message queue (Fila de mensagens)

Armazena as mensagens até que elas sejam consumidas com segurança pelos consumidores.

Bindings

Define a relação entre o Exchange e as filas de mensagens, o que define os critérios de roteamento.

O Protocolo.

Como já foi dito, o protocolo é binário e wire-level, e funciona basicamente organizado por frames, e no qual cada ação trabalha de forma isolada em canais independentes porem podendo compartilhar recursos, e isto gera uma nova carcateristica técnica, no qual é faz do protocolo ser multiplexing.

Um protocolo wire-level, significa que não é definida uma API como libs ou funções, e sim apenas um comportamento, fazendo com que existam implementações diversas e na minha opinião flexibilidade para trabalhar nos campos de performance e escalabilidade.

Fila de mensagens

Uma fila - como no começo do artigo pode estar armazenada em memória - em uma array pode não ser ideal, e você queira que ela seja persistente - por exemplo, gravada em disco -.

Além de exigências mais complexas, como lidar com estrategias de /restart/.

Exchange

Um exchange é uma mensagem roteada por um agente em um virtual host, cada instancia aceita mensagens e roteia baseado em informações e as chaves (routing key) e repassa para as filas de mensagem.

Parâmetros

Eles podem ser criados, compartilhados e encerrados pelos aplicativos, dependendo das limitações de autorização, alem de poderem ter algumas características, tais como:

Durable

Ela irá ser mantida até que que seja solicitada para ser apagada.

Transient

Ela irá ser mantida até que o servidor seja desligado.

Auto-deleted

Ela irá ser mantida até que não seja mais usada.

Especificação do tipo de fila

Cada tipo de exchange, tem um algorítimo de roteamento especifico, no quais podemos resumir, dois deles como:

Direct

No qual é roteado baseado na exata combinação entre o bindind key e o routing key.

Topic

No qual é roteado em uma "pattern match" entre o binding key e o routing key.

Fanout

Distribui a mensagem para todas as filas, qualquer rota será ignorada.

Customizado

E possível criar exchanges customizados, como por exemplo baseado em informações de cabeçalhos.

Routing Key

Aqui você declara para onde será enviado as mensagens, por exemplo em uma exchange do tipo "direct", no qual neste caso a rota (routing key) deve ser exatamente o nome da fila, para que a mensagem seja entregue.


Caso você queira criar um roteamento para uma exchange do tipo "topic", vamos supor que você tem as seguintes filas:


Podemos criar uma rota neste caso, para enviar mensagens para todas as filas que comecem com com 'usa', declarando a rota como "usa.#", como podemos dizer que gostaríamos de criar outra rota para todas as filas sobre tempo com a sintaxe "#.news".

E ainda, tem o tipo de rota "fanout", no qual irá ignorar qualquer routing key e enviar para todas as filas existentes.


A implementação RabbitMQ.

Esta é uma implementação opensource, no qual é desenvolvida sob a licença da Mozilla Public License.

Na minha opinião, o RabbitMQ é uma solução que basicamente funciona muito bem, além de manter uma baixo throughput e uma boa performance relacionada a latência.

As ferramentas da linguagem perl (no CPAN).

Net::AMQP

Este modulo implementa o método de conversação para o protocolo AMQP, e é muito útil em necessidades especificas, ou mesmo quando você esta buscando uma melhor performance para sua necessidade.

Ele foi desenvolvido para a utilização do AMQP 0.8, porém carregando a especificação das novas versões, ele trabalha bem.

Net::RabbitMQ

Esta biblioteca é um wrapper para a librabbitmq, que na minha opinião facilitam e lhe oferecem agilidade para implementar as necessidades exigidas. A diferença para o Net::AMQP é que você tem uma abstração maior para trabalhar.

Net::RabbitMQ::Simple

Eu estava olhando para o Devel::Declare e pensei em escrever um módulo para simplificar mais ainda a sintaxe e a utilização do Net::RabbitMQ.

Então, vamos começar tentando olhar primeiramente para o código e depois a explicação sobre o que estamos realizando.

    use Net::RabbitMQ::Simple;

    mqconnect {
        hostname => 'localhost',
        user => 'guest',
        password => 'guest',
        vhost => '/'
    };

Com o método acima, você irá informar as informações para conexão, lembrando que a RabbitMQ mantém um servidor de testes, caso queira utilizar para começar, veja mais informações em http://dev.rabbitmq.com/.

    exchange {
        name => 'nossa_exchange',
    }

Para declarar uma nova exchange, basta utilizar o método exchange, e adicionar as opções que deseja, tais elas como se as mensagens serão excluídas automaticamente, irão persistir e etc.

    publish {
        exchange => 'nossa_exchange',
        queue => 'fila',
        route => 'rota_da_nossa_exchange_para_fila',
        message => 'Testando'
        options => { content_type => 'text/plain' }
    };

Para publicar uma mensagem utilizamos o método publish, talvez o que foi mais simplificado dentro deste módulo que basta apenas passar como parâmetro a hash contendo informações, do exchange, da rota e da fila para qual você deseja enviar.

Agora, para os consumidores temos basicamente dois métodos para buscar informações das filas.

    my $rv = consume;

Com o método acima, iremos receber a informação da fila, e caso o não haja nenhuma mensagem para ser consumida, ele irá aguardar algum retorno até que haja.

    my $rv = get { options => { routing_key => 'foo.*' }}

A principal diferente, é que com o método get, caso não tenha nenhuma mensagem na fila, ele simplesmente irá retornar "undef" e não ficar aguardando uma mensagem na fila, como o método consume.

Espero que isto seja útil para quem esteja querendo aprender mais sobre AMQP, RabbitMQ e filas. No diretório de testes do módulo há vários testes, simulando situações diferentes, recomendo a leitura deles para o entendimento.

Como um dos objetivos do módulo é criar uma sintaxe que seja de leitura simples, espero que com o entendimento deste artigo, você não tenha dificuldade nenhuma para ler eles.

Acompanhe o desenvolvimento e sugira novos recursos através do github ou do próprio CPAN.

Exemplo com ACK.

Quando você precisa de controles de fluxo, para que o receptor, à medida que receba os dados, envie uma confirmação de que recebeu a mensagem e conseguiu processar com sucesso e então descarte a mensagem da fila enviando uma mensagem ACK (Acknowledgement), ou seja você apenas vai descartar da fila com uma confirmação explícita do seu consumidor.

Um exemplo, prático utilizando o Net::RabbitMQ::Simple seria o publicador enviar a mensagem com o seguinte código:

	#!/usr/bin/env perl
	# publicador.pl

	use Net::RabbitMQ::Simple;

	mqconnect;

	exchange {
		name	=> "equinocio",
		type	=> "direct",
		passive	=> 0,
		durable	=> 1,
		auto_delete => 0,
		exclusive => 0,
	};

	publish {
		exchange => "equinocio",
		queue => "fila",
		route => "fila_rota",
		message => "foobar baz",
		ack => 1
	};

	mqdisconnect;

Agora, o código para consumir a mensagem "foobar baz" na fila do exchange equinocio seria:

	#!/usr/bin/env perl
	# consumidor.pl

	use Net::RabbitMQ::Simple;
	use Data::Dumper;

	mqconnect;

	exchange { name => "equinocio" };

	my $rv = get { queue => "fila", ack => 1};
	ack $rv->{delivery_tag} if $rv;

	print Dumper($rv);
	mqdisconnect;

Basicamente com este exemplo, você deverá enviar o ack no consumidor, caso não envie o próximo consumidor irá buscar novamente a mesma mensagem. Experimente executar este código sem enviar a mensagem de ack (ack $rv->{delivery_tag}) duas vezes.

Espero que a sintaxe neste exemplo, seja auto-explicativo baseado no que o artigo já abordou ate aqui.

Como já foi dito anteriormente, veja o diretório de testes para vários exemplos de implementação do módulo.

CONCLUSÃO

O protocolo AMQP já tem uma herança de desenvolvimento ótima, para criação de um mecanismo para comunicação de mensagens eficiente em sistemas distribuidos onde as tarefas são assíncronas, foi criado inicialmente para oferecer interoperabilidade em sistemas financeiros.

Ele te oferece de modo simples, um modelo utilizado e confiante para a tarefa. Porém como toda solução "flex", você em determinados ambientes pode sofrer com a latência que as implementações adicionam.

O RabbitMQ é uma excelente solução escrita em Erlang (para tentar combater o problema de latência), no qual há suporte comercial e já é utilizado por muitas empresas em suas soluções, e o grande motivo: ela simplesmente funciona muito bem.

A utilização dos módulos disponíveis para Perl são para lhe dar agilidade de desenvolvimento e recomendado para necessidades onde o envio de mensagens ou recebimentos não exigem alta perfomance.

Para necessidades onde há uma alta utilização minha recomendação é a utilização de alguma linguagem funcional, como o erlang, no qual já há muitos exemplos de implementações disponíveis na rede.

VEJA MAIS

CREDITOS

Para Red Hat pelas imagens topic-exchange.png, direct-exchange.png e fanout-exchange.png.

AUTOR

Thiago Berlitz Rondon . <thiago@aware.com.br>

blog comments powered by Disqus