DataFlow - Um Framework Para Fluxo de Dados

Alexei Znamensky
Publicado em 01/03/2011

DataFlow - Um Framework Para Fluxo de Dados

OpenData - O Começo e o Fim

Tudo começou com um e-mail na lista da SP-PM, no qual se falou de "hackear" dados públicos. Logo isso evoluiu para a idéia de produzir um framework para buscar e analisar dados públicos. Sem demora, o Thiago Rondon montou um repositório no GitHub e começou a rabiscar umas idéias de código. Em seguida nasceu o site do OpenData-BR.

O projeto ainda tem um status de work in progress, mas as expectativas e as perspectivas são muito animadoras, e espera-se que em breve o DataFlow possa ser utilizado amplamente pelo OpenData-BR para disponibilizar os dados públicos.

Primeiras Idéias

Buscar dados na web (scrape)

A primeira idéia a ser implementada foi a lógica de buscar dados na web. Na lista, ou no canal #saopaulo.pm no irc.perl.org, surgiram as primeiras indicações de sites onde a informação estava disponibilizada. Um dos sites primariamente visados foi o PortalTransparencia, o site do governo federal para promover a transparência nos dados do governo.

Seguindo a métodologia opensource "release early, release often", o projeto começou a disponibilizar código que, efetivamente, conseguia buscar dados. No entanto, esse código inicialmente não fazia muitas provisões para manutenção e extensão no futuro.

ETL - Extract, Transform, Load

Surgiu então uma tentativa de modularizar melhor o OpenData-BR, dividindo-o em 3 tipos de componentes:

Extractors: Extratores de dados. A idéia seria que todos os scrapers, sejam web ou não, na forma que forem, seriam um tipo de Extractor.

Transformers: Robôs assassinos que se transformam em carros ou outras máquinas fantásticas. Ou, componentes que iriam conter toda a lógica de transformação dos dados, dos documentos originais para os dados buscados.

Loaders: Seriam os end-points do processo de extração dos dados, e a principal tarefa de um Loader seria gravar os dados em algum lugar. Os primeiros testes gravavam esses dados em um banco de dados MongoDB, ou imprimiam os dados obtidos usando o Data::Dumper (para debug).

Flow, Baby, Flow

Quanto mais pensávamos no framework, mais ele se parecia com um fluxo de dados, do começo (extractors) ao meio (transformers), ao fim (loaders). Todos os componentes pareciam, de uma forma genérica, ter o mesmo comportamento: recebem algo na entrada, fazem alguma coisa com esses dados, e (eventualmente) disponibilizam esses dados em uma saída. Assim surgiu a idéia de fazer um sub-projeto de fluxo de dados.

Em meados de Dezembro de 2010, a primeira idéia foi codificada em termos de "caixas", que teriam um entrada e uma saída - de uma forma bem genérica: "entra porquinho, sai salsicha".

Em termos de programação em Perl, podemos colocar vários tipos de porquinhos:

E as caixas poderiam ser enfileiradas para que a salsicha de um pudesse virar a feijoada do próximo.

Assim surgiu, dentro do repositório do OpenData, o sub-projeto que foi, inicialmente, denominado de "Box". Depois se tornou o "Flow" e, enquanto este artigo é escrito, tornou-se um projeto independente, o "DataFlow".

Show Me The Node

Um exemplo básico de um nó que transforma os dados para caixa-alta ( upper-case), usando a função uc() padrão do Perl.

    my $uc = DataFlow::Node->new(
        process_item => sub {
            shift; return uc(shift);
        }
    );

O racional de um DataFlow::Node é o mesmo de um comando de Unix ou Linux que atua como um filtro: ele é um pedaço de cano (pipe), que faz uma coisa só, simples, e bem definida:

E como usar esse nó? Seguindo a tradição, de mais de um jeito:

    # em duas etapas
    $uc->input( 'laranja' );
    my $out = $uc->output();  # LARANJA

    # ou em um unica etapa
    my $out = $uc->process('laranja');

    # podemos passar arrays
    my @out = $uc->process( qw/feira da fruta/ ); # qw/FEIRA DA FRUTA/

    # podemos passar referencias
    my $out = $uc->process( [ 'oh', 'my', 'god' ] );

Atenção: nesta última forma, os elementos do ArrayRef não serão transformados, pois você pode ter um node que espera de fato uma reference e a trata de acordo com suas próprias regras. O mesmo se aplica a outros tipos de referência. Se você quer que o node processe a informação dentro da referência, use:

    my $uc = DataFlow::Node->new(
        process_into => 1,
        process_item => sub {
            shift; return uc(shift);
        }
    );

    my $out = $uc->process( [ 'oh', 'my', 'god' ] );
    # resultado: [ 'OH', 'MY', 'GOD' ]

    my $out = $uc->process( { a => 'value', another => 'different value' } );
    # resultado { a => 'VALUE', another => 'DIFFERENT VALUE' } );

Também podemos passar code references para serem processadas (usando o process_into):

    my $code = sub { return 'batatas' };
    my $out = $uc->process( $code );
    print $out->(); # imprime 'BATATAS'

Notem que, quando são passadas referências, sempre são retornadas referências do mesmo tipo. Neste último caso $out contém um code reference que, ao ser invocado, aplica a função uc() ao resultado do code reference passado ao node.

Futuro: Talvez o parâmetro process_into passe a ser habilitado por default. Isso está sendo considerado.

Chains

Um node sozinho não faz um hack, então é preciso "enfileirar" os nós para conseguir obter um resultado significativo. O poder dos comandos pequenos está em juntá-los para fazer algo maior:

Exatamente como se fosse um pipe na linha de comando. No DataFlow, isso se faz com a classe DataFlow::Chain, que pode ser usada assim:

    my $uc = DataFlow::Node->new(
        process_item => sub { shift; return uc(shift) }
    );
    my $rv = DataFlow::Node->new(
        process_item => sub { shift; return scalar reverse $_[0]; }
    );

    my $chain = DataFlow::Chain->new( links => [ $uc, $rv ] );

Um objeto da classe Chain é também um Node, e funciona da mesma forma:

    $chain->input( qw/abc def ghi/ );
    my @result = $chain->output();
    # @result == qw/CBA FED IHG/

Biblioteca de Nós

Algumas operações são mais comuns, e nós especializados podem ser criados para executar processamentos específicos. O pacote DataFlow, contém ainda apenas um pequeno número dessas classes, mas com o tempo isso deve aumentar. Alguns exemplos de classes já disponíveis hoje são:

DataFlow::Chain

Tipo especial de nó, que possui uma lista de nós, e concatena esses nós, como descrito acima.

DataFlow::Node::CSV

Nó que transforma array references em strings no formato CSV.

DataFlow::Node::Dumper

Nó que utiliza o módulo Data::Dumper para imprimir a estrutura de cada item para STDERR.

DataFlow::Node::HTMLFilter

Nó utilizado para filtrar tags de um documento HTML utilizando HTML::TreeBuilder::XPath. Pode produzir nodes (HTML::Element), HTML ou somente os valores de tags e/ou atributos.

DataFlow::Node::LiteralData

Nó que injeta na sua fila de entrada os parâmetros passados ao construtor.

DataFlow::Node::MultiPageURLGenerator

Nó utilizado para gerar uma lista de URLs com os endereços de todas as páginas web de um conjunto de dados.

DataFlow::Node::NOP

Nó "NO-OP". A informação sai do mesmo jeito que entrou. Pode ser utilizado como classe base para outras classes, ou apenas para alterar algum atributo do fluxo de dados.

DataFlow::Node::Null

Nó que sempre retorna undef.

DataFlow::Node::URLRetriever

Nó que recebe URLs na entrada, e retorna o conteúdo das mesmas na saída.

E Como Uso Tudo Isso?

O DataFlow surgiu do projeto OpenData-BR, e um dos usos para o qual ele foi pensado foi justamente a obtenção e manipulação de dados disponíveis em sites na web.

DATAFLOW EM AÇÃO

Por exemplo, o trecho de script abaixo (baseado em df-portaltransparencia.pl), obtém as informações das pessoas (físicas e jurídicas) consideradas inidôneas e/ou que tenham sido sancionadas por algum órgão do governo:

    my $chain = Chain->new(
        links => [
            LiteralData->new($base_url),
            MultiPageURLGenerator->new(
                first_page => -1,
                produce_last_page => sub {
                    my $url = shift;

                    my $get  = DataFlow::Node::URLRetriever::Get->new;
                    my $html = $get->get($url);

                    my $texto =
                      HTML::TreeBuilder::XPath->new_from_content($html)
                      ->findvalue('//p[@class="paginaAtual"]');
                    croak q{NE<#227>o conseguiu determinar a E<#250>ltima pE<#225>gina}
                      unless $texto;
                    return $1 if $texto =~ /\d\/(\d+)/;
                },
                make_page_url => sub {
                    my ( $self, $url, $page ) = @_;

                    my $u = URI->new($url);
                    $u->query_form( $u->query_form, Pagina => $page );
                    return $u->as_string;
                },
            ),
            NOP->new( deref => 1, ),
            URLRetriever->new,
            HTMLFilter->new(
                process_into => 1,
                search_xpath =>
                  '//div[@id="listagemEmpresasSancionadas"]/table/tbody/tr',
            ),
            HTMLFilter->new(
                search_xpath => '//td',
                result_type  => 'VALUE',
                ref_result   => 1,
            ),
            Node->new(
                process_into => 1,
                process_item => sub {
                    shift; local $_ = shift; s/^\s*//; s/\s*$//;
                    return $_;
                }
            ),
            DataFlow::Node::Dumper->new,
        ],
    );

    $chain->flush;

RESULTADO

Esse código imprimirá na tela (STDERR) algo como:

    ...

    $VAR1 = [
              '11.222.333/0001-44',
              'A CALOTEIRA LTDA.',
              'Suspensa',
              '04/06/08',
              '03/06/13',
              'SENADO FEDERAL',
              '**',
              'SENADO FEDERAL',
              '14/04/2009'
            ];
    $VAR1 = [
              '555.666.777-88',
              'JOSE<#201> DO TRAMBIQUE',
              'IniE<#65533>nea',
              '27/10/09',
              '27/10/14',
              '1E<#65533> VARA CIVEL - SE<#65533>O SEBASTIAO DO PARAE<#65533>SO - TJMG',
              '',
              'CONSELHO NACIONAL DE JUSTIE<#65533>A',
              '02/01/2011'
            ];

Como pode ser visto, a parte de codificação de caracteres ainda precisa de algum trabalho. Mas os dados já foram obtidos e "limpos", faltando apenas convertê-los para algum formato que possa ser facilmente manipulável por outros sistemas.

Já estão sendo trabalhados nós para conversão de codificação de caracteres, bem como para a transformação para outros formatos, como CSV e XML.

Passo-a-Passo

Vamos examinar cada nó da cadeia.

    LiteralData->new($base),

Esse nó simplesmente pega o valor de $base_url, uma URL (para este exemplo estamos considerando que é a URL do Cadastro de Empresas Inidôneas ou Sancionadas do site PortalTransparência) e a injeta na fila de entrada do nó. O objeto do tipo DataFlow::Node::LiteralData não modifica o valor, logo essa URL será o único item de saída desse nó.

    MultiPageURLGenerator->new(
        first_page => -2,
        produce_last_page => sub {
            my $url = shift;
            ...
        },
        make_page_url => sub {
            my ( $self, $url, $page ) = @_;
            ...
        },
    ),

A classe DataFlow::Node::MultiPageURLGenerator serve para gerar, a partir de uma URL base, uma lista de URLs que correspondam às várias páginas nas quais aquele conjunto de dados se encontra. É obrigatório prover a sub anônima make_page_url. Esta sub irá receber uma URL $url e um número de página $page, e deverá retornar uma outra URL para a página $page do conjunto de dados.

As páginas inicial e final podem ser indicadas explicitamente, passando os parâmetros first_page e last_page, respectivamente. Caso first_page não seja passado, será usado 1 (um) como valor default. Se last_page não for passado, o parâmetro produce_last_page deverá conter uma sub anônima que irá calcular o número da última página, baseado na $url base.

No exemplo acima, a última página é obtida no próprio site, e a primeira página está como -2, o que significa que irá começar na penúltima página.

A saída desse nó será um ArrayRef que aponta para um array que contém as URLs das últimas duas páginas.

    NOP->new( deref => 1, ),

Esse nó, do tipo DataFlow::NOP não irá transformar a informação em si, mas como passamos o valor 1 para o parâmetro deref, o ArrayRef recebido será dereferenciado, isto é, transformado de volta em um array, e injetado na fila de saída do nó. Assim, cada elemento do array será tratado como um dado independente pelos próximos nós. Poderíamos ter passado deref no nó anterior, mas deixamos aqui para efeito de ilustração.

A saída desse nó serão duas strings contendo as URLs, respectivamente, da penúltima e última páginas do cadastro do site.

    URLRetriever->new,

Este nó acima, como o nome da classe DataFlow::URLRetriever indica, irá acessar as URLs passadas e buscará o conteúdo (no caso, código HTML).

A saída desse nó serão duas strings, cada uma delas correspondendo ao conteúdo HTML completo das URLs das últimas duas páginas.

    HTMLFilter->new(
        search_xpath =>
          '//div[@id="listagemEmpresasSancionadas"]/table/tbody/tr',
    ),

A classe DataFlow::HTMLFilter é, obviamente, utilizada para filtrar conteúdo HTML. Essa filtragem é baseada em XPath. No caso do exemplo acima, o filtro irá buscar uma tag com atributo, <div id="listagemEmpresasSancionadas">, e dentro do bloco delimitado por essa tag, uma <table>, dentro dela um <tbody>, e dentro dele todas as tags <tr>, que naturalmente correspondem às linhas da tabela com os dados que buscamos.

Por default, será retornado o texto HTML resultante da busca. No caso, a saída deste nó será um array de strings, cada uma contendo integralmente o texto HTML de cada linha (<tr>) encontrada nas tabelas de ambas as duas páginas, isto é, uma única seqüência de itens.

    HTMLFilter->new(
        search_xpath => '//td',
        result_type  => 'VALUE',
        ref_result   => 1,
    ),

Mais um filtro HTML, desta vez para obter, de cada linha da tabela os valores de cada célula, ou seja, de cada tag <td> que a linha (<tr>) contiver. No entanto, aqui passamos o parâmetro result_type igual a 'VALUE', isso faz com que, por exemplo, <tr><td>1</td><td>aa</td></tr>, retorne ( 1, 'aa' ). Mas, como cada item de uma linha são atributos de um único item (pessoa inidônea), gostaríamos que eles ficassem agrupados - para isso passamos o parâmetro ref_result, que irá transformar a lista de valores de cada linha em um ArrayRef para essa lista.

A saída deste nó é um array de ArrayRefs, cada um contendo os dados de cada pessoa listada nas últimas duas páginas do cadastro no site.

    Node->new(
        process_into => 1,
        process_item => sub {
            shift; local $_ = shift; s/^\s*//; s/\s*$//;
            return $_;
        }
    ),

Aqui criamos um nó do próprio tipo DataFlow::Node, ao qual fornecemos o código process_item, que irá, neste caso, remover eventuais espaços em branco no início e no final de cada dado, dentro de cada ArrayRef - isso ocorre devido ao uso do parâmetro process_into.

A saída desse nó terá a mesma estrutura de dados do nó anterior, mas o conteúdo terá os espaços iniciais e finais removidos.

    DataFlow::Node::Dumper->new,

Este nó, do tipo DataFlow::Node::Dumper, utiliza o módulo Data::Dumper para imprimir em STDERR o conteúdo de cada item de dado. Neste caso ele listará, para cada pessoa inidônea ou sancionada das últimas duas páginas do cadastro, o conteúdo do ArrayRef contendo os dados da pessoa.

Para colocar a Chain em ação, invocamos o método flush(), que irá consumir itens da saída de um nó, no caso $chain, até que não haja mais itens.

    $chain->flush;

Conclusão

O DataFlow é um projeto que está em um estágio muito novo do seu desenvolvimento, e por enquanto não há promessas ou garantias de que as interfaces serão mantidas. Se houver interesse em usar o DataFlow, sugiro acompanhar de perto o desenvolvimento do mesmo para ficar a par de quaisquer mudanças.

Já existem várias idéias de melhorias, como por exemplo:

* Execução em paralelo (com threads, com forks)
* Nó para leitura/escrita de arquivos (em andamento)
* Nó para encoding de caracteres (em andamento)
* Nó para envio/recebimento de mensagens em filas (RabbitMQ, MQ, etc...)
* Nó para geração de dados em formato RDF
* Nó para decodificar imagens com OCR
* Nó para executar comandos externos (enviar ou receber dados para esses comandos)
* Uso de operadores (concatenação de nós seria uma cadeia)
* Construção de nós (e principalmente cadeias de nós) a partir de especificações em JSON e/ou YAML
* Nós que permitam split e join de fluxos de informação

Estas e outras idéias de melhorias estão no arquivo de TODO no repositório do projeto.

Você pode ajudar! De várias formas: escrevendo código, tanto para os módulos quanto testes. Executando testes e enviando reports - neste momento há, por exemplo, alguns reports de erro em FreeBSD que não estão ocorrendo no sistema dos mantenedores (Linux).

Referências

Portal Transparência

http://www.portaltransparencia.gov.br/

Portal Transparência - CEIS (Cadastro de Empresas Inidôneas ou Sancionadas)

http://goo.gl/UDNaG

OpenData-BR

http://www.opendatabr.org/

Agradecimentos

Nelson Ferraz

Por lançar a sugestão de ter o Equinócio 2011 sobre o tema Hack de Dados Públicos. Nelson, a culpa é toda sua. ;-)

Thiago "Maluco" Rondon

Por tirar o OpenData-BR do plano das idéias e trazê-lo para a realidade. Pelos testes, idéias e discussões, muitas vezes em horários excusos. Muitos dos conceitos do DataFlow germinaram a partir desses momentos.

Blabos de Blebe

Pela revisão deste texto. Valeu, Blabos!!

Autor

Alexei "Russo" Znamensky < russoz no cpan org >

Blog: http://russoz.wordpress.com/

LinkedIn: http://www.linkedin.com/profile?viewProfile=&key=754668&trk=tab_pro

Licença

Este texto está licenciado sob os termos da Creative Commons by-sa, http://creativecommons.org/licenses/by-sa/3.0/br/

Licença Creative Commons
This work is licensed under a Creative Commons Attribution-ShareAlike License.
blog comments powered by Disqus