Como versionar pipelines de dados com git e ferramentas específicas

1. Fundamentos do versionamento de pipelines de dados

1.1. Diferenças entre versionamento de código tradicional e pipelines de dados

O versionamento de pipelines de dados difere significativamente do versionamento de código tradicional. Enquanto no desenvolvimento de software convencional versionamos principalmente código-fonte, em pipelines de dados precisamos versionar também schemas, metadados, configurações de transformação e, crucialmente, os próprios datasets.

# Estrutura de pipeline tradicional vs. versionamento de dados
# Código tradicional versiona apenas:
src/
  main.py
  utils.py

# Pipeline de dados versiona também:
data/
  raw/          # Dados brutos (via DVC)
  processed/    # Dados transformados (via DVC)
transformations/
  etl_job.py    # Código ETL
  schema.yaml   # Definição de schema
config/
  pipeline_config.yaml  # Configurações específicas

1.2. Desafios específicos: dependências de dados, schemas mutáveis e reprodutibilidade

Os principais desafios incluem:

  • Dependências de dados: pipelines dependem de datasets que mudam constantemente
  • Schemas mutáveis: alterações em fontes de dados podem quebrar transformações
  • Reprodutibilidade: executar o mesmo pipeline em momentos diferentes deve produzir resultados idênticos
# Exemplo de dependência de dados versionada
# pipeline_config.yaml
version: "2.1.0"
dependencies:
  - dataset: "vendas_2024"
    version: "2024-03-15"
  - model: "previsao_demanda"
    version: "v1.2.3"

1.3. Estrutura de repositório ideal para projetos de dados

A escolha entre monorepo e multirepo depende do tamanho da equipe e complexidade:

# Estrutura monorepo recomendada para pipelines de dados
data-pipeline/
├── .dvc/                    # Configuração DVC
├── data/
│   ├── raw/                 # Dados brutos (DVC)
│   └── processed/           # Dados processados (DVC)
├── src/
│   ├── extract/             # Scripts de extração
│   ├── transform/           # Scripts de transformação
│   └── load/                # Scripts de carga
├── dags/
│   ├── airflow/             # DAGs do Airflow
│   └── prefect/             # Fluxos do Prefect
├── tests/
│   ├── unit/                # Testes unitários
│   └── integration/         # Testes de integração
├── infra/
│   ├── docker/              # Dockerfiles
│   └── k8s/                 # Manifests Kubernetes
├── .gitignore
├── dvc.yaml                 # Pipeline DVC
├── requirements.txt
└── README.md

2. Estratégias de branching para pipelines de dados

2.1. Git Flow adaptado para pipelines

# Estrutura de branches para pipelines de dados
main          # Produção estável
├── develop   # Integração de features
├── staging   # Ambiente de staging
├── feature/* # Novas transformações
├── fix/*     # Correções em pipelines
└── data/*    # Atualizações de datasets

2.2. Branches para ambientes isolados

# Comandos para gerenciar ambientes
git checkout -b feature/nova_transformacao develop
# Desenvolve e testa localmente
git push origin feature/nova_transformacao
# Cria merge request para develop
git checkout develop
git merge feature/nova_transformacao
# Deploy para staging via CI/CD
git tag v2.1.0-rc1 staging
git push origin v2.1.0-rc1

2.3. Merge requests e code review

# Checklist para code review de pipelines
- [ ] Testes unitários passam
- [ ] Testes de integração com dados de staging
- [ ] Schema validation executado
- [ ] DVC lock file atualizado
- [ ] Dockerfile testado localmente
- [ ] Documentação de alterações no CHANGELOG

3. Versionamento de artefatos e dependências do pipeline

3.1. Gerenciamento de dependências

# requirements.txt versionado
pandas==2.0.3
apache-airflow==2.7.1
dvc==3.0.0
prefect==2.10.0
great-expectations==0.17.0

3.2. Versionamento de containers Docker

# Dockerfile com tags semânticas
FROM python:3.11-slim
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY src/ /app/src
COPY dags/ /app/dags

# Tagging strategy
docker build -t pipeline-etl:v2.1.0 .
docker tag pipeline-etl:v2.1.0 pipeline-etl:commit-abc1234

3.3. Controle de versão de datasets com DVC

# Inicializando DVC no projeto
dvc init
git commit -m "Initialize DVC"

# Adicionando dados ao versionamento
dvc add data/raw/vendas_2024.csv
git add data/raw/vendas_2024.csv.dvc
git commit -m "Add vendas 2024 dataset"

# Versionando diferentes versões do dataset
dvc commit -m "Update vendas 2024 with March data"
git tag data-vendas-2024-v2

4. Ferramentas específicas para versionamento de pipelines

4.1. DVC para versionamento de dados, modelos e métricas

# dvc.yaml - Pipeline versionado
stages:
  preprocess:
    cmd: python src/preprocess.py
    deps:
      - data/raw/vendas_2024.csv
    outs:
      - data/processed/vendas_clean.csv
  train:
    cmd: python src/train.py
    deps:
      - data/processed/vendas_clean.csv
    outs:
      - models/previsao_vendas.pkl
    metrics:
      - metrics/accuracy.json:
          cache: false

# Executando pipeline versionado
dvc repro
dvc metrics diff
dvc push

4.2. Git LFS para arquivos grandes

# Configurando Git LFS
git lfs track "*.parquet"
git lfs track "*.csv"
git lfs track "*.pkl"
git add .gitattributes
git commit -m "Configure Git LFS for data files"

# Uso normal do git
git add data/large_dataset.parquet
git commit -m "Add large dataset"

4.3. CI/CD para automação

# .github/workflows/pipeline-tests.yml
name: Pipeline Tests
on: [push, pull_request]
jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
        with:
          lfs: true
      - uses: actions/setup-python@v4
      - name: Install dependencies
        run: pip install -r requirements.txt
      - name: Run unit tests
        run: pytest tests/unit/
      - name: Validate pipeline
        run: dvc repro --dry
      - name: Check metrics
        run: dvc metrics diff --show-json

5. Integração com ferramentas de orquestração

5.1. Versionamento de DAGs no Airflow

# dags/etl_pipeline.py versionado
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'etl_vendas',
    default_args=default_args,
    description='Pipeline ETL versionado',
    schedule_interval='@daily',
    catchup=False,
    version='2.1.0',
) as dag:

    extract = PythonOperator(
        task_id='extract_data',
        python_callable=extract_function,
        op_kwargs={'version': '{{ dag.dag_version }}'}
    )

    transform = PythonOperator(
        task_id='transform_data',
        python_callable=transform_function,
    )

    load = PythonOperator(
        task_id='load_data',
        python_callable=load_function,
    )

    extract >> transform >> load

5.2. Git hooks para validação

# .git/hooks/pre-commit
#!/bin/bash
echo "Running pre-commit checks..."

# Validar schema dos dados
python scripts/validate_schema.py
if [ $? -ne 0 ]; then
    echo "Schema validation failed!"
    exit 1
fi

# Verificar DVC lock file
dvc status
if [ $? -ne 0 ]; then
    echo "DVC lock file out of sync!"
    exit 1
fi

# Executar linter específico para pipeline
pylint src/ dags/
if [ $? -ne 0 ]; then
    echo "Linting failed!"
    exit 1
fi

echo "All checks passed!"

5.3. Estratégias de deploy

# Deploy blue-green com versionamento
# Deploy da versão atual (blue)
kubectl apply -f infra/k8s/pipeline-blue.yaml

# Testar versão blue
curl http://pipeline-blue.internal/health

# Deploy nova versão (green)
kubectl apply -f infra/k8s/pipeline-green.yaml

# Switch para green após validação
kubectl patch service pipeline -p '{"spec":{"selector":{"version":"green"}}}'

# Rollback rápido se necessário
kubectl patch service pipeline -p '{"spec":{"selector":{"version":"blue"}}}'

6. Boas práticas de versionamento para reprodutibilidade

6.1. Uso de tags e releases

# Criando releases semânticas
git tag -a v2.1.0 -m "Release v2.1.0 - New sales prediction model"
git push origin v2.1.0

# Tags para datasets específicos
git tag -a data-vendas-2024-v3 -m "Updated sales data with Q1 2024"
git push origin data-vendas-2024-v3

# Tags para modelos
git tag -a model-previsao-v2 -m "Improved prediction accuracy by 15%"
git push origin model-previsao-v2

6.2. Commits semânticos e CHANGELOG

# Padrão de commits semânticos
feat(pipeline): add new transformation for customer segmentation
fix(extract): handle null values in date columns
data(dataset): update sales data with March 2024 records
docs(readme): add deployment instructions for pipeline v2
chore(deps): update DVC to version 3.0

# CHANGELOG.md automático
# [2.1.0] - 2024-03-15
### Added
- New customer segmentation transformation
- Support for real-time data streaming
### Changed
- Updated sales prediction model (accuracy +15%)
- Optimized data extraction for large datasets
### Fixed
- Null value handling in date columns
- Memory leak in transformation pipeline

6.3. Estratégias de rollback

# Rollback de pipeline com DVC
# 1. Identificar versão estável anterior
git log --oneline --all | grep "Release v2.0.0"

# 2. Reverter código
git revert HEAD --no-commit

# 3. Reverter dados com DVC
dvc checkout data/raw/vendas_2024.csv.dvc

# 4. Reverter modelo
dvc checkout models/previsao_vendas.pkl.dvc

# 5. Commitar rollback
git commit -m "revert(pipeline): rollback to v2.0.0 due to accuracy issues"

# Rollback de deploy com Kubernetes
kubectl rollout undo deployment/pipeline-etl
kubectl rollout status deployment/pipeline-etl

Referências