Orquestração de pipelines com Prefect: alternativa ao Airflow

1. Por que considerar o Prefect como alternativa ao Airflow?

O Apache Airflow foi por anos a referência em orquestração de pipelines de dados, mas sua arquitetura monolítica e complexidade operacional têm motivado a busca por alternativas mais modernas. O Airflow exige infraestrutura dedicada (banco de dados, filas, workers), configuração de operadores específicos para cada ferramenta e manutenção constante de dependências. Para equipes pequenas ou pipelines dinâmicos, essa sobrecarga muitas vezes supera os benefícios.

O Prefect surgiu em 2018 com uma filosofia radicalmente diferente: tratar pipelines como código Python nativo, sem a rigidez de DAGs estáticos. Sua proposta é simplificar a orquestração eliminando a necessidade de operadores especializados — qualquer função Python pode se tornar uma tarefa. A ferramenta foi projetada para cenários onde a flexibilidade e a prototipação rápida são prioridades, como startups, times de dados enxutos e projetos que exigem experimentação constante.

Casos de uso ideais incluem pipelines de dados que mudam frequentemente, fluxos com lógica condicional complexa e equipes que preferem escrever Python puro em vez de aprender DSLs proprietárias. O Prefect brilha especialmente quando você precisa de orquestração sem o peso de um Airflow.

2. Arquitetura e componentes fundamentais do Prefect

A arquitetura do Prefect é enxuta e modular. Seus dois blocos fundamentais são:

  • Flow (fluxo): um container lógico que agrupa tarefas e define a ordem de execução. Criado com o decorador @flow.
  • Task (tarefa): uma unidade de trabalho atômica, definida com @task. Pode receber parâmetros, retornar valores e ter dependências implícitas (baseadas em passagem de argumentos) ou explícitas (via wait_for).

O orquestrador pode ser o servidor Prefect open source (executado localmente ou em um container Docker) ou o Prefect Cloud (versão gerenciada). Agentes executam tarefas em workers — mas ao contrário do Airflow, você não precisa manter workers fixos. O Prefect suporta execução serverless com runners que sobem sob demanda.

3. Construindo pipelines com Prefect: conceitos práticos

A definição de tarefas é intuitiva:

from prefect import task, flow

@task
def extrair_dados(url: str) -> list:
    import requests
    response = requests.get(url)
    return response.json()

@task
def transformar(dados: list) -> list:
    return [item for item in dados if item["ativo"]]

@flow
def pipeline_principal(url: str):
    dados = extrair_dados(url)
    resultado = transformar(dados)
    return resultado

Dependências são implícitas: a tarefa transformar só executa após extrair_dados porque recebe seu retorno como argumento. Para loops e mapeamento dinâmico, use map:

@task
def processar_item(item: dict) -> dict:
    # lógica de processamento
    return item

@flow
def pipeline_map(items: list):
    resultados = processar_item.map(items)  # paralelização automática
    return resultados

Tratamento de falhas é nativo: defina retries e retry_delay_seconds no decorador da tarefa, e configure timeouts com timeout_seconds.

4. Gerenciamento de estado e execução

O Prefect implementa uma máquina de estados completa: Pending, Running, Success, Failed, Cached, entre outros. Cada execução de tarefa ou fluxo é registrada com timestamp e metadados.

O caching inteligente permite reutilizar resultados de tarefas quando os inputs são idênticos. Isso acelera reprocessamentos e economiza recursos:

@task(cache_key_fn=lambda url: url, cache_expiration=timedelta(hours=1))
def extrair_dados(url: str):
    # resultado será cacheado por 1 hora

Checkpoints salvam o estado intermediário de fluxos longos. Se uma execução falhar, você pode retomá-la a partir do ponto de falha, sem reprocessar tarefas bem-sucedidas.

5. Agendamento, gatilhos e observabilidade

Agendamento é configurado com schedules cron ou intervalos:

from prefect import flow
from datetime import timedelta

@flow
def pipeline_schedule():
    pass

# Agendamento via código
pipeline_schedule.serve(
    cron="0 8 * * *",  # todos os dias às 8h
    name="Pipeline Diário"
)

Gatilhos baseados em eventos permitem iniciar fluxos quando arquivos chegam ou APIs retornam dados. O dashboard nativo do servidor Prefect exibe execuções em tempo real, com logs centralizados e gráficos de dependência. Integrações com Slack, Teams e webhooks enviam notificações automáticas em falhas ou conclusões.

6. Comparação direta: Prefect vs Airflow

Aspecto Prefect Airflow
Definição de pipeline Fluxo dinâmico em Python DAG estático (arquivo Python)
Curva de aprendizado Baixa (Python puro) Média-alta (operadores + DSL)
Execução Serverless ou agente leve Workers fixos + banco + fila
Caching Nativo e flexível Limitado (XComs)
Retry/tratamento de falhas Decoradores simples Configuração complexa
Escalabilidade Escala sob demanda Requer infraestrutura dedicada

O modelo de fluxo dinâmico do Prefect permite criar pipelines que se adaptam em tempo de execução — algo impossível com DAGs estáticos do Airflow. Para equipes que valorizam simplicidade e velocidade, a escolha é clara.

7. Exemplo completo: pipeline de ETL com Prefect

Pipeline que extrai dados de uma API, transforma com pandas e carrega em banco SQLite:

from prefect import task, flow
from prefect.logging import get_run_logger
import pandas as pd
import sqlite3
from datetime import datetime

@task(retries=3, retry_delay_seconds=10)
def extrair_api(url: str) -> pd.DataFrame:
    import requests
    response = requests.get(url)
    response.raise_for_status()
    return pd.DataFrame(response.json())

@task
def transformar(df: pd.DataFrame) -> pd.DataFrame:
    df["data_processamento"] = datetime.now()
    df = df.dropna(subset=["valor"])
    return df

@task
def carregar_sqlite(df: pd.DataFrame, db_path: str, tabela: str):
    conn = sqlite3.connect(db_path)
    df.to_sql(tabela, conn, if_exists="replace", index=False)
    conn.close()

@flow(name="ETL Vendas", log_prints=True)
def pipeline_etl():
    logger = get_run_logger()
    url = "https://api.exemplo.com/vendas"
    db_path = "vendas.db"
    tabela = "vendas_diarias"

    df_bruto = extrair_api(url)
    logger.info(f"Extraídos {len(df_bruto)} registros")

    df_limpo = transformar(df_bruto)
    logger.info(f"Transformados {len(df_limpo)} registros")

    carregar_sqlite(df_limpo, db_path, tabela)
    logger.info("Dados carregados com sucesso")

# Execução local
if __name__ == "__main__":
    pipeline_etl()

Para deploy no servidor Prefect, execute:

prefect deploy pipeline_etl.py:etl -n "ETL Produção" --cron "0 */6 * * *"

8. Considerações finais e próximos passos

Escolha o Prefect quando precisar de flexibilidade, baixa complexidade operacional e prototipação rápida. É ideal para equipes pequenas e médias, pipelines dinâmicos e cenários onde o custo de manter um Airflow não se justifica.

Para migrar do Airflow, comece identificando pipelines com lógica condicional complexa — esses são os que mais se beneficiam. Reescreva gradualmente, mantendo ambos sistemas em paralelo até validar a nova implementação.

Explore a documentação oficial, participe da comunidade no Discord e experimente templates prontos no GitHub. O Prefect está em evolução constante, com novas features sendo adicionadas a cada release.

Referências