Orquestração de pipelines com Prefect: alternativa ao Airflow
1. Por que considerar o Prefect como alternativa ao Airflow?
O Apache Airflow foi por anos a referência em orquestração de pipelines de dados, mas sua arquitetura monolítica e complexidade operacional têm motivado a busca por alternativas mais modernas. O Airflow exige infraestrutura dedicada (banco de dados, filas, workers), configuração de operadores específicos para cada ferramenta e manutenção constante de dependências. Para equipes pequenas ou pipelines dinâmicos, essa sobrecarga muitas vezes supera os benefícios.
O Prefect surgiu em 2018 com uma filosofia radicalmente diferente: tratar pipelines como código Python nativo, sem a rigidez de DAGs estáticos. Sua proposta é simplificar a orquestração eliminando a necessidade de operadores especializados — qualquer função Python pode se tornar uma tarefa. A ferramenta foi projetada para cenários onde a flexibilidade e a prototipação rápida são prioridades, como startups, times de dados enxutos e projetos que exigem experimentação constante.
Casos de uso ideais incluem pipelines de dados que mudam frequentemente, fluxos com lógica condicional complexa e equipes que preferem escrever Python puro em vez de aprender DSLs proprietárias. O Prefect brilha especialmente quando você precisa de orquestração sem o peso de um Airflow.
2. Arquitetura e componentes fundamentais do Prefect
A arquitetura do Prefect é enxuta e modular. Seus dois blocos fundamentais são:
- Flow (fluxo): um container lógico que agrupa tarefas e define a ordem de execução. Criado com o decorador
@flow. - Task (tarefa): uma unidade de trabalho atômica, definida com
@task. Pode receber parâmetros, retornar valores e ter dependências implícitas (baseadas em passagem de argumentos) ou explícitas (viawait_for).
O orquestrador pode ser o servidor Prefect open source (executado localmente ou em um container Docker) ou o Prefect Cloud (versão gerenciada). Agentes executam tarefas em workers — mas ao contrário do Airflow, você não precisa manter workers fixos. O Prefect suporta execução serverless com runners que sobem sob demanda.
3. Construindo pipelines com Prefect: conceitos práticos
A definição de tarefas é intuitiva:
from prefect import task, flow
@task
def extrair_dados(url: str) -> list:
import requests
response = requests.get(url)
return response.json()
@task
def transformar(dados: list) -> list:
return [item for item in dados if item["ativo"]]
@flow
def pipeline_principal(url: str):
dados = extrair_dados(url)
resultado = transformar(dados)
return resultado
Dependências são implícitas: a tarefa transformar só executa após extrair_dados porque recebe seu retorno como argumento. Para loops e mapeamento dinâmico, use map:
@task
def processar_item(item: dict) -> dict:
# lógica de processamento
return item
@flow
def pipeline_map(items: list):
resultados = processar_item.map(items) # paralelização automática
return resultados
Tratamento de falhas é nativo: defina retries e retry_delay_seconds no decorador da tarefa, e configure timeouts com timeout_seconds.
4. Gerenciamento de estado e execução
O Prefect implementa uma máquina de estados completa: Pending, Running, Success, Failed, Cached, entre outros. Cada execução de tarefa ou fluxo é registrada com timestamp e metadados.
O caching inteligente permite reutilizar resultados de tarefas quando os inputs são idênticos. Isso acelera reprocessamentos e economiza recursos:
@task(cache_key_fn=lambda url: url, cache_expiration=timedelta(hours=1))
def extrair_dados(url: str):
# resultado será cacheado por 1 hora
Checkpoints salvam o estado intermediário de fluxos longos. Se uma execução falhar, você pode retomá-la a partir do ponto de falha, sem reprocessar tarefas bem-sucedidas.
5. Agendamento, gatilhos e observabilidade
Agendamento é configurado com schedules cron ou intervalos:
from prefect import flow
from datetime import timedelta
@flow
def pipeline_schedule():
pass
# Agendamento via código
pipeline_schedule.serve(
cron="0 8 * * *", # todos os dias às 8h
name="Pipeline Diário"
)
Gatilhos baseados em eventos permitem iniciar fluxos quando arquivos chegam ou APIs retornam dados. O dashboard nativo do servidor Prefect exibe execuções em tempo real, com logs centralizados e gráficos de dependência. Integrações com Slack, Teams e webhooks enviam notificações automáticas em falhas ou conclusões.
6. Comparação direta: Prefect vs Airflow
| Aspecto | Prefect | Airflow |
|---|---|---|
| Definição de pipeline | Fluxo dinâmico em Python | DAG estático (arquivo Python) |
| Curva de aprendizado | Baixa (Python puro) | Média-alta (operadores + DSL) |
| Execução | Serverless ou agente leve | Workers fixos + banco + fila |
| Caching | Nativo e flexível | Limitado (XComs) |
| Retry/tratamento de falhas | Decoradores simples | Configuração complexa |
| Escalabilidade | Escala sob demanda | Requer infraestrutura dedicada |
O modelo de fluxo dinâmico do Prefect permite criar pipelines que se adaptam em tempo de execução — algo impossível com DAGs estáticos do Airflow. Para equipes que valorizam simplicidade e velocidade, a escolha é clara.
7. Exemplo completo: pipeline de ETL com Prefect
Pipeline que extrai dados de uma API, transforma com pandas e carrega em banco SQLite:
from prefect import task, flow
from prefect.logging import get_run_logger
import pandas as pd
import sqlite3
from datetime import datetime
@task(retries=3, retry_delay_seconds=10)
def extrair_api(url: str) -> pd.DataFrame:
import requests
response = requests.get(url)
response.raise_for_status()
return pd.DataFrame(response.json())
@task
def transformar(df: pd.DataFrame) -> pd.DataFrame:
df["data_processamento"] = datetime.now()
df = df.dropna(subset=["valor"])
return df
@task
def carregar_sqlite(df: pd.DataFrame, db_path: str, tabela: str):
conn = sqlite3.connect(db_path)
df.to_sql(tabela, conn, if_exists="replace", index=False)
conn.close()
@flow(name="ETL Vendas", log_prints=True)
def pipeline_etl():
logger = get_run_logger()
url = "https://api.exemplo.com/vendas"
db_path = "vendas.db"
tabela = "vendas_diarias"
df_bruto = extrair_api(url)
logger.info(f"Extraídos {len(df_bruto)} registros")
df_limpo = transformar(df_bruto)
logger.info(f"Transformados {len(df_limpo)} registros")
carregar_sqlite(df_limpo, db_path, tabela)
logger.info("Dados carregados com sucesso")
# Execução local
if __name__ == "__main__":
pipeline_etl()
Para deploy no servidor Prefect, execute:
prefect deploy pipeline_etl.py:etl -n "ETL Produção" --cron "0 */6 * * *"
8. Considerações finais e próximos passos
Escolha o Prefect quando precisar de flexibilidade, baixa complexidade operacional e prototipação rápida. É ideal para equipes pequenas e médias, pipelines dinâmicos e cenários onde o custo de manter um Airflow não se justifica.
Para migrar do Airflow, comece identificando pipelines com lógica condicional complexa — esses são os que mais se beneficiam. Reescreva gradualmente, mantendo ambos sistemas em paralelo até validar a nova implementação.
Explore a documentação oficial, participe da comunidade no Discord e experimente templates prontos no GitHub. O Prefect está em evolução constante, com novas features sendo adicionadas a cada release.
Referências
- Documentação oficial do Prefect — Guia completo de instalação, conceitos e API de flows e tasks
- Prefect vs Airflow: uma comparação técnica — Artigo do time Prefect analisando diferenças arquiteturais e casos de uso
- Tutorial: Primeiro pipeline com Prefect — Passo a passo para iniciantes no Towards Data Science
- Repositório oficial do Prefect no GitHub — Código fonte, issues, exemplos e contribuições da comunidade
- Guia de migração do Airflow para Prefect — Artigo prático com estratégias de migração gradual