ETL moderno com dbt: transformação de dados como código versionado

1. Fundamentos do dbt e a mudança de paradigma no ETL

O dbt (data build tool) representa uma mudança fundamental na forma como as equipes de dados abordam a transformação de dados. Diferentemente das ferramentas ETL tradicionais que focam na extração e carga, o dbt adota a filosofia “transform-first”, onde a transformação acontece diretamente no data warehouse utilizando SQL puro combinado com Jinja (uma engine de templates Python).

Essa abordagem, conhecida como “analytics engineering”, trata a transformação de dados como código versionado. Em vez de depender de interfaces gráficas arrastar-e-soltar ou scripts Python complexos, os analistas e engenheiros de dados escrevem SELECTs SQL que são compilados e executados contra o banco de dados. O resultado é um pipeline mais transparente, testável e auditável.

As vantagens sobre o modelo clássico são significativas: redução de custos operacionais, eliminação de movimentação desnecessária de dados, maior performance ao aproveitar o poder de processamento do data warehouse, e a capacidade de versionar cada transformação como faria com código de software.

2. Arquitetura e componentes principais do dbt

Um projeto dbt é organizado em uma estrutura de diretórios padronizada. Os diretórios principais incluem:

  • models/: Onde ficam os arquivos SQL que definem as transformações
  • snapshots/: Para implementar técnicas de captura de mudanças (SCD)
  • tests/: Testes customizados escritos em SQL
  • macros/: Código Jinja reutilizável
  • seeds/: Arquivos CSV que podem ser carregados como tabelas

O arquivo schema.yml (ou schema.yaml) é central para a governança de dados. Nele, você define metadados, documentação e testes para cada modelo:

# schema.yml
version: 2

models:
  - name: stg_orders
    description: "Tabela staging de pedidos"
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('stg_customers')
              field: customer_id

O dbt se conecta com os principais data warehouses do mercado: BigQuery, Snowflake, Redshift, Postgres, Databricks e Trino. A configuração de conexão é feita no arquivo profiles.yml.

3. Versionamento e controle de qualidade com dbt

A integração com Git é nativa no workflow do dbt. Cada modelo SQL é um arquivo de texto que pode ser versionado, revisado em pull requests e submetido a code review. Isso permite que equipes colaborem com segurança, testem mudanças em branches separados e façam rollback quando necessário.

Os testes de dados automatizados são um dos recursos mais poderosos. Além dos testes genéricos (unique, not_null, accepted_values), você pode criar testes customizados:

-- tests/assert_positive_total.sql
-- Teste customizado: verifica se total de pedidos é sempre positivo
SELECT order_id, total_amount
FROM {{ ref('fct_orders') }}
WHERE total_amount < 0

A documentação e linhagem de dados (data lineage) são geradas automaticamente pelo comando dbt docs generate. O resultado é um site estático interativo que mostra as dependências entre modelos, colunas, testes e descrições, facilitando o entendimento do pipeline completo.

4. Construindo pipelines modulares e reutilizáveis

O uso de macros Jinja permite eliminar repetição de código SQL. Por exemplo, uma macro para calcular recência de clientes:

-- macros/recency.sql
{% macro recency(date_column, as_of_date=none) %}
    {% if as_of_date is none %}
        {% set as_of_date = "CURRENT_DATE" %}
    {% endif %}
    DATE_DIFF({{ as_of_date }}, {{ date_column }}, DAY)
{% endmacro %}

As estratégias de materialização definem como cada modelo é armazenado no banco:

  • table: Recria a tabela a cada execução
  • view: Cria uma view (ideal para dados brutos)
  • incremental: Apenas insere novos registros
  • ephemeral: Materialização temporária (CTE)

A organização em camadas é uma boa prática amplamente adotada:

models/
├── staging/       # Dados brutos com limpeza mínima
│   ├── stg_orders.sql
│   └── stg_customers.sql
├── intermediate/  # Lógicas de negócio intermediárias
│   ├── int_order_items.sql
│   └── int_customer_metrics.sql
└── marts/         # Agregações finais para consumo
    └── fct_sales_summary.sql

5. Estratégias de execução e orquestração

A execução local é simples com dbt run, mas o verdadeiro poder está na seleção granular de modelos. Você pode executar apenas modelos específicos usando tags, caminhos ou seletores:

dbt run --select tag:daily_model
dbt run --select models/staging/
dbt run --select +fct_sales_summary  # executa o modelo e suas dependências

Para ambientes de produção, o dbt se integra com orquestradores como Apache Airflow, Prefect e Dagster. Um exemplo de integração com Airflow:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG('dbt_pipeline', start_date=datetime(2024, 1, 1), schedule_interval='@daily') as dag:
    dbt_run = BashOperator(
        task_id='dbt_run',
        bash_command='cd /path/to/project && dbt run --target prod'
    )

O tratamento de dependências é automático: o dbt constrói um DAG (Directed Acyclic Graph) baseado nas referências ref() entre modelos, garantindo que as execuções ocorram na ordem correta.

6. Boas práticas para ambientes de produção

O uso de perfis de conexão com targets separados (dev, prod) isola ambientes e evita que desenvolvedores afetem dados de produção:

# profiles.yml
jaffle_shop:
  target: dev
  outputs:
    dev:
      type: postgres
      schema: dbt_dev
      threads: 4
    prod:
      type: postgres
      schema: analytics
      threads: 8

Snapshots permitem implementar o tipo 2 de dimensão de mudança lenta (SCD), capturando o histórico completo de alterações:

-- snapshots/customers_snapshot.sql
{% snapshot customers_snapshot %}
    {{
        config(
            target_schema='snapshots',
            unique_key='customer_id',
            strategy='timestamp',
            updated_at='updated_at'
        )
    }}
    SELECT * FROM {{ source('raw', 'customers') }}
{% endsnapshot %}

Para monitoramento, dbt debug verifica a conectividade, e a integração com sistemas de alerta (Slack, PagerDuty) pode ser feita através de hooks ou orquestradores.

7. Caso prático: pipeline de transformação de dados de vendas

Vamos construir um pipeline completo de vendas em três camadas.

Modelos staging:

-- models/staging/stg_orders.sql
SELECT
    order_id,
    customer_id,
    order_date,
    total_amount,
    status
FROM {{ source('raw', 'orders') }}
WHERE status != 'cancelled'
-- models/staging/stg_customers.sql
SELECT
    customer_id,
    customer_name,
    city,
    state,
    region
FROM {{ source('raw', 'customers') }}

Modelo intermediário:

-- models/intermediate/int_daily_sales.sql
SELECT
    o.order_date,
    o.customer_id,
    c.region,
    c.state,
    COUNT(DISTINCT o.order_id) AS total_orders,
    SUM(o.total_amount) AS total_revenue
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('stg_customers') }} c
    ON o.customer_id = c.customer_id
GROUP BY 1, 2, 3, 4

Mart final:

-- models/marts/fct_sales_summary.sql
{{
    config(
        materialized='table',
        unique_key='month_year'
    )
}}
SELECT
    region,
    DATE_TRUNC(order_date, MONTH) AS month_year,
    COUNT(DISTINCT customer_id) AS active_customers,
    SUM(total_orders) AS total_orders,
    SUM(total_revenue) AS total_revenue,
    SAFE_DIVIDE(SUM(total_revenue), SUM(total_orders)) AS avg_order_value
FROM {{ ref('int_daily_sales') }}
GROUP BY 1, 2

Testes e documentação:

-- schema.yml (continuação)
models:
  - name: fct_sales_summary
    description: "Resumo mensal de vendas por região"
    tests:
      - dbt_utils.unique_combination_of_columns:
          combination_of_columns:
            - region
            - month_year
    columns:
      - name: total_revenue
        tests:
          - not_null
      - name: avg_order_value
        tests:
          - dbt_utils.accepted_range:
              min_value: 0

Este pipeline pode ser executado com dbt run e documentado com dbt docs generate, gerando um site interativo com linhagem completa dos dados.

Referências