Como criar pipelines de dados simples com Python
1. Fundamentos de Pipelines de Dados com Python
Um pipeline de dados é uma sequência de etapas automatizadas que extraem, transformam e carregam dados de uma ou mais fontes para um destino. Python se destaca nesse contexto por sua sintaxe clara, vasto ecossistema de bibliotecas e facilidade de integração com diferentes sistemas.
O modelo clássico é o ETL (Extract, Transform, Load):
- Extração: coleta de dados de arquivos, APIs ou bancos
- Transformação: limpeza, filtragem, agregação e enriquecimento
- Carga: persistência em arquivos ou bancos de dados
As bibliotecas essenciais para começar são:
- pandas: manipulação de dados tabulares
- sqlalchemy: conexão com bancos relacionais
- requests: consumo de APIs HTTP
- csv: leitura/escrita de arquivos CSV nativos
2. Estruturando o Pipeline: Extração de Dados
A extração é o ponto de partida. Vamos ver três cenários comuns.
Lendo arquivos locais com pandas
import pandas as pd
# CSV
df_vendas = pd.read_csv('vendas.csv')
# JSON
df_clientes = pd.read_json('clientes.json')
# Excel
df_produtos = pd.read_excel('produtos.xlsx', sheet_name='ativos')
Coletando dados de APIs REST
import requests
def extrair_dados_api(url, params=None):
resposta = requests.get(url, params=params)
resposta.raise_for_status()
return resposta.json()
dados_api = extrair_dados_api(
'https://api.exemplo.com/v1/pedidos',
params={'data_inicio': '2024-01-01'}
)
df_pedidos = pd.DataFrame(dados_api['resultados'])
Conectando a bancos relacionais
from sqlalchemy import create_engine
engine = create_engine('sqlite:///dados.db')
df_estoque = pd.read_sql_query(
'SELECT * FROM estoque WHERE quantidade > 0',
engine
)
3. Transformação e Limpeza de Dados
A transformação é o coração do pipeline. Vamos construir funções reutilizáveis.
Limpeza básica
def limpar_dados(df):
# Remover linhas duplicadas
df = df.drop_duplicates()
# Remover linhas com valores nulos em colunas críticas
df = df.dropna(subset=['id', 'data'])
# Padronizar tipos de dados
df['data'] = pd.to_datetime(df['data'], errors='coerce')
df['valor'] = pd.to_numeric(df['valor'], errors='coerce')
# Remover outliers (exemplo: valores negativos em preço)
df = df[df['valor'] >= 0]
return df
Transformações com DataFrame
def transformar_vendas(df_vendas, df_produtos):
# Filtrar apenas vendas confirmadas
df = df_vendas[df_vendas['status'] == 'confirmado']
# Agregar por produto
df_agregado = df.groupby('produto_id').agg(
total_vendas=('valor', 'sum'),
quantidade=('quantidade', 'sum'),
media_preco=('valor_unitario', 'mean')
).reset_index()
# Join com dados de produtos
df_final = df_agregado.merge(
df_produtos[['id', 'nome', 'categoria']],
left_on='produto_id',
right_on='id',
how='left'
)
return df_final
4. Carga e Persistência dos Dados Processados
Após transformar, precisamos salvar os resultados.
Salvando em arquivos
def carregar_arquivo(df, caminho, formato='csv'):
if formato == 'csv':
df.to_csv(caminho, index=False)
elif formato == 'parquet':
df.to_parquet(caminho, index=False)
print(f'Dados salvos em {caminho}')
Inserindo em banco de dados
def carregar_banco(df, tabela, engine, estrategia='append'):
if estrategia == 'replace':
df.to_sql(tabela, engine, if_exists='replace', index=False)
elif estrategia == 'append':
df.to_sql(tabela, engine, if_exists='append', index=False)
elif estrategia == 'upsert':
# Para upsert, precisamos de lógica personalizada
df_existente = pd.read_sql_table(tabela, engine)
df_combinado = pd.concat([df_existente, df]).drop_duplicates(
subset=['id'], keep='last'
)
df_combinado.to_sql(tabela, engine, if_exists='replace', index=False)
5. Automatização e Orquestração do Pipeline
Para executar o pipeline de forma automatizada, criamos um script principal.
Script executável com argumentos
#!/usr/bin/env python3
import argparse
import sys
def main():
parser = argparse.ArgumentParser(
description='Pipeline de dados simples'
)
parser.add_argument('--data-inicio', required=True)
parser.add_argument('--tabela-destino', default='vendas_processadas')
parser.add_argument('--estrategia', choices=['append', 'replace'],
default='append')
args = parser.parse_args()
# Executar pipeline
engine = create_engine('sqlite:///pipeline.db')
# Extração
df_vendas = pd.read_csv('vendas.csv')
df_produtos = pd.read_csv('produtos.csv')
# Transformação
df_limpo = limpar_dados(df_vendas)
df_final = transformar_vendas(df_limpo, df_produtos)
# Carga
carregar_banco(df_final, args.tabela_destino, engine, args.estrategia)
print('Pipeline executado com sucesso!')
if __name__ == '__main__':
main()
Agendamento com schedule
import schedule
import time
def executar_pipeline_diario():
# Importar e executar o pipeline
from pipeline_vendas import main
import sys
sys.argv = ['pipeline.py', '--data-inicio', '2024-01-01']
main()
schedule.every().day.at("06:00").do(executar_pipeline_diario)
while True:
schedule.run_pending()
time.sleep(60)
6. Monitoramento e Tratamento de Erros
Um pipeline robusto precisa de logging e tratamento de exceções.
import logging
from datetime import datetime
# Configurar logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'pipeline_{datetime.now():%Y%m%d}.log'),
logging.StreamHandler()
]
)
def executar_com_erro(etapa, funcao, *args, **kwargs):
try:
logging.info(f'Iniciando etapa: {etapa}')
resultado = funcao(*args, **kwargs)
logging.info(f'Etapa {etapa} concluída com sucesso')
return resultado
except Exception as e:
logging.error(f'Erro na etapa {etapa}: {str(e)}')
raise
def validar_saida(df, nome_tabela):
assert len(df) > 0, f'{nome_tabela} vazio após transformação'
assert not df.isnull().all().any(), f'Colunas totalmente nulas em {nome_tabela}'
logging.info(f'Validação de {nome_tabela}: {len(df)} registros')
7. Exemplo Prático: Pipeline de Dados Meteorológicos
Vamos construir um pipeline completo que coleta dados da API OpenWeatherMap.
import pandas as pd
import requests
import sqlite3
from sqlalchemy import create_engine
import logging
from datetime import datetime
import schedule
import time
logging.basicConfig(level=logging.INFO)
API_KEY = 'sua_chave_aqui'
CIDADES = ['São Paulo', 'Rio de Janeiro', 'Belo Horizonte', 'Salvador', 'Brasília']
def extrair_clima(cidade, api_key):
url = 'https://api.openweathermap.org/data/2.5/weather'
params = {'q': cidade, 'appid': api_key, 'units': 'metric'}
try:
resposta = requests.get(url, params=params)
resposta.raise_for_status()
dados = resposta.json()
return {
'cidade': cidade,
'temperatura': dados['main']['temp'],
'sensacao': dados['main']['feels_like'],
'umidade': dados['main']['humidity'],
'pressao': dados['main']['pressure'],
'descricao': dados['weather'][0]['description'],
'vento_velocidade': dados['wind']['speed'],
'timestamp': datetime.now().isoformat()
}
except Exception as e:
logging.error(f'Erro ao extrair dados de {cidade}: {e}')
return None
def transformar_dados(lista_registros):
df = pd.DataFrame(lista_registros)
df = df.dropna()
# Agregar por cidade (média do dia)
df_agregado = df.groupby('cidade').agg({
'temperatura': 'mean',
'umidade': 'mean',
'pressao': 'mean',
'vento_velocidade': 'mean'
}).round(2).reset_index()
df_agregado['data_coleta'] = datetime.now().date()
return df_agregado
def carregar_dados(df, engine):
df.to_sql('clima', engine, if_exists='append', index=False)
logging.info(f'{len(df)} registros inseridos na tabela clima')
# Também salvar CSV para backup
df.to_csv(f'clima_{datetime.now():%Y%m%d_%H%M}.csv', index=False)
def pipeline_clima():
logging.info('Iniciando pipeline de clima')
engine = create_engine('sqlite:///clima.db')
registros = []
for cidade in CIDADES:
dados = extrair_clima(cidade, API_KEY)
if dados:
registros.append(dados)
if registros:
df_transformado = transformar_dados(registros)
carregar_dados(df_transformado, engine)
logging.info('Pipeline concluído com sucesso')
else:
logging.warning('Nenhum dado foi coletado')
# Agendamento para executar a cada hora
schedule.every().hour.do(pipeline_clima)
if __name__ == '__main__':
pipeline_clima() # Execução imediata
while True:
schedule.run_pending()
time.sleep(60)
Este pipeline demonstra o ciclo completo: extração de API externa, transformação com agregações, carga em SQLite e CSV, logging, tratamento de erros e automação com schedule. A partir dessa base, você pode expandir para pipelines mais complexos com Apache Airflow, monitoramento com Prometheus e validações avançadas.
Referências
- Documentação oficial do pandas — Guia completo de manipulação de dados tabulares com DataFrame
- SQLAlchemy Documentation — Referência para conexão com bancos de dados relacionais em Python
- Requests: HTTP for Humans — Biblioteca para requisições HTTP, essencial para consumo de APIs
- OpenWeatherMap API — Documentação da API de dados meteorológicos usada no exemplo prático
- Apache Airflow Documentation — Framework profissional para orquestração de pipelines de dados
- Python Schedule Library — Biblioteca leve para agendamento de tarefas em Python
- Real Python: Data Pipelines with Python — Tutorial prático sobre construção de pipelines de dados