Como criar pipelines de dados simples com Python

1. Fundamentos de Pipelines de Dados com Python

Um pipeline de dados é uma sequência de etapas automatizadas que extraem, transformam e carregam dados de uma ou mais fontes para um destino. Python se destaca nesse contexto por sua sintaxe clara, vasto ecossistema de bibliotecas e facilidade de integração com diferentes sistemas.

O modelo clássico é o ETL (Extract, Transform, Load):
- Extração: coleta de dados de arquivos, APIs ou bancos
- Transformação: limpeza, filtragem, agregação e enriquecimento
- Carga: persistência em arquivos ou bancos de dados

As bibliotecas essenciais para começar são:
- pandas: manipulação de dados tabulares
- sqlalchemy: conexão com bancos relacionais
- requests: consumo de APIs HTTP
- csv: leitura/escrita de arquivos CSV nativos

2. Estruturando o Pipeline: Extração de Dados

A extração é o ponto de partida. Vamos ver três cenários comuns.

Lendo arquivos locais com pandas

import pandas as pd

# CSV
df_vendas = pd.read_csv('vendas.csv')

# JSON
df_clientes = pd.read_json('clientes.json')

# Excel
df_produtos = pd.read_excel('produtos.xlsx', sheet_name='ativos')

Coletando dados de APIs REST

import requests

def extrair_dados_api(url, params=None):
    resposta = requests.get(url, params=params)
    resposta.raise_for_status()
    return resposta.json()

dados_api = extrair_dados_api(
    'https://api.exemplo.com/v1/pedidos',
    params={'data_inicio': '2024-01-01'}
)
df_pedidos = pd.DataFrame(dados_api['resultados'])

Conectando a bancos relacionais

from sqlalchemy import create_engine

engine = create_engine('sqlite:///dados.db')
df_estoque = pd.read_sql_query(
    'SELECT * FROM estoque WHERE quantidade > 0',
    engine
)

3. Transformação e Limpeza de Dados

A transformação é o coração do pipeline. Vamos construir funções reutilizáveis.

Limpeza básica

def limpar_dados(df):
    # Remover linhas duplicadas
    df = df.drop_duplicates()

    # Remover linhas com valores nulos em colunas críticas
    df = df.dropna(subset=['id', 'data'])

    # Padronizar tipos de dados
    df['data'] = pd.to_datetime(df['data'], errors='coerce')
    df['valor'] = pd.to_numeric(df['valor'], errors='coerce')

    # Remover outliers (exemplo: valores negativos em preço)
    df = df[df['valor'] >= 0]

    return df

Transformações com DataFrame

def transformar_vendas(df_vendas, df_produtos):
    # Filtrar apenas vendas confirmadas
    df = df_vendas[df_vendas['status'] == 'confirmado']

    # Agregar por produto
    df_agregado = df.groupby('produto_id').agg(
        total_vendas=('valor', 'sum'),
        quantidade=('quantidade', 'sum'),
        media_preco=('valor_unitario', 'mean')
    ).reset_index()

    # Join com dados de produtos
    df_final = df_agregado.merge(
        df_produtos[['id', 'nome', 'categoria']],
        left_on='produto_id',
        right_on='id',
        how='left'
    )

    return df_final

4. Carga e Persistência dos Dados Processados

Após transformar, precisamos salvar os resultados.

Salvando em arquivos

def carregar_arquivo(df, caminho, formato='csv'):
    if formato == 'csv':
        df.to_csv(caminho, index=False)
    elif formato == 'parquet':
        df.to_parquet(caminho, index=False)
    print(f'Dados salvos em {caminho}')

Inserindo em banco de dados

def carregar_banco(df, tabela, engine, estrategia='append'):
    if estrategia == 'replace':
        df.to_sql(tabela, engine, if_exists='replace', index=False)
    elif estrategia == 'append':
        df.to_sql(tabela, engine, if_exists='append', index=False)
    elif estrategia == 'upsert':
        # Para upsert, precisamos de lógica personalizada
        df_existente = pd.read_sql_table(tabela, engine)
        df_combinado = pd.concat([df_existente, df]).drop_duplicates(
            subset=['id'], keep='last'
        )
        df_combinado.to_sql(tabela, engine, if_exists='replace', index=False)

5. Automatização e Orquestração do Pipeline

Para executar o pipeline de forma automatizada, criamos um script principal.

Script executável com argumentos

#!/usr/bin/env python3
import argparse
import sys

def main():
    parser = argparse.ArgumentParser(
        description='Pipeline de dados simples'
    )
    parser.add_argument('--data-inicio', required=True)
    parser.add_argument('--tabela-destino', default='vendas_processadas')
    parser.add_argument('--estrategia', choices=['append', 'replace'], 
                       default='append')

    args = parser.parse_args()

    # Executar pipeline
    engine = create_engine('sqlite:///pipeline.db')

    # Extração
    df_vendas = pd.read_csv('vendas.csv')
    df_produtos = pd.read_csv('produtos.csv')

    # Transformação
    df_limpo = limpar_dados(df_vendas)
    df_final = transformar_vendas(df_limpo, df_produtos)

    # Carga
    carregar_banco(df_final, args.tabela_destino, engine, args.estrategia)

    print('Pipeline executado com sucesso!')

if __name__ == '__main__':
    main()

Agendamento com schedule

import schedule
import time

def executar_pipeline_diario():
    # Importar e executar o pipeline
    from pipeline_vendas import main
    import sys
    sys.argv = ['pipeline.py', '--data-inicio', '2024-01-01']
    main()

schedule.every().day.at("06:00").do(executar_pipeline_diario)

while True:
    schedule.run_pending()
    time.sleep(60)

6. Monitoramento e Tratamento de Erros

Um pipeline robusto precisa de logging e tratamento de exceções.

import logging
from datetime import datetime

# Configurar logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'pipeline_{datetime.now():%Y%m%d}.log'),
        logging.StreamHandler()
    ]
)

def executar_com_erro(etapa, funcao, *args, **kwargs):
    try:
        logging.info(f'Iniciando etapa: {etapa}')
        resultado = funcao(*args, **kwargs)
        logging.info(f'Etapa {etapa} concluída com sucesso')
        return resultado
    except Exception as e:
        logging.error(f'Erro na etapa {etapa}: {str(e)}')
        raise

def validar_saida(df, nome_tabela):
    assert len(df) > 0, f'{nome_tabela} vazio após transformação'
    assert not df.isnull().all().any(), f'Colunas totalmente nulas em {nome_tabela}'
    logging.info(f'Validação de {nome_tabela}: {len(df)} registros')

7. Exemplo Prático: Pipeline de Dados Meteorológicos

Vamos construir um pipeline completo que coleta dados da API OpenWeatherMap.

import pandas as pd
import requests
import sqlite3
from sqlalchemy import create_engine
import logging
from datetime import datetime
import schedule
import time

logging.basicConfig(level=logging.INFO)

API_KEY = 'sua_chave_aqui'
CIDADES = ['São Paulo', 'Rio de Janeiro', 'Belo Horizonte', 'Salvador', 'Brasília']

def extrair_clima(cidade, api_key):
    url = 'https://api.openweathermap.org/data/2.5/weather'
    params = {'q': cidade, 'appid': api_key, 'units': 'metric'}

    try:
        resposta = requests.get(url, params=params)
        resposta.raise_for_status()
        dados = resposta.json()

        return {
            'cidade': cidade,
            'temperatura': dados['main']['temp'],
            'sensacao': dados['main']['feels_like'],
            'umidade': dados['main']['humidity'],
            'pressao': dados['main']['pressure'],
            'descricao': dados['weather'][0]['description'],
            'vento_velocidade': dados['wind']['speed'],
            'timestamp': datetime.now().isoformat()
        }
    except Exception as e:
        logging.error(f'Erro ao extrair dados de {cidade}: {e}')
        return None

def transformar_dados(lista_registros):
    df = pd.DataFrame(lista_registros)
    df = df.dropna()

    # Agregar por cidade (média do dia)
    df_agregado = df.groupby('cidade').agg({
        'temperatura': 'mean',
        'umidade': 'mean',
        'pressao': 'mean',
        'vento_velocidade': 'mean'
    }).round(2).reset_index()

    df_agregado['data_coleta'] = datetime.now().date()
    return df_agregado

def carregar_dados(df, engine):
    df.to_sql('clima', engine, if_exists='append', index=False)
    logging.info(f'{len(df)} registros inseridos na tabela clima')

    # Também salvar CSV para backup
    df.to_csv(f'clima_{datetime.now():%Y%m%d_%H%M}.csv', index=False)

def pipeline_clima():
    logging.info('Iniciando pipeline de clima')

    engine = create_engine('sqlite:///clima.db')

    registros = []
    for cidade in CIDADES:
        dados = extrair_clima(cidade, API_KEY)
        if dados:
            registros.append(dados)

    if registros:
        df_transformado = transformar_dados(registros)
        carregar_dados(df_transformado, engine)
        logging.info('Pipeline concluído com sucesso')
    else:
        logging.warning('Nenhum dado foi coletado')

# Agendamento para executar a cada hora
schedule.every().hour.do(pipeline_clima)

if __name__ == '__main__':
    pipeline_clima()  # Execução imediata
    while True:
        schedule.run_pending()
        time.sleep(60)

Este pipeline demonstra o ciclo completo: extração de API externa, transformação com agregações, carga em SQLite e CSV, logging, tratamento de erros e automação com schedule. A partir dessa base, você pode expandir para pipelines mais complexos com Apache Airflow, monitoramento com Prometheus e validações avançadas.

Referências