Como versionar pipelines de dados com git e ferramentas específicas
1. Fundamentos do versionamento de pipelines de dados
1.1. Diferenças entre versionamento de código tradicional e pipelines de dados
O versionamento de pipelines de dados difere significativamente do versionamento de código tradicional. Enquanto no desenvolvimento de software convencional versionamos principalmente código-fonte, em pipelines de dados precisamos versionar também schemas, metadados, configurações de transformação e, crucialmente, os próprios datasets.
# Estrutura de pipeline tradicional vs. versionamento de dados
# Código tradicional versiona apenas:
src/
main.py
utils.py
# Pipeline de dados versiona também:
data/
raw/ # Dados brutos (via DVC)
processed/ # Dados transformados (via DVC)
transformations/
etl_job.py # Código ETL
schema.yaml # Definição de schema
config/
pipeline_config.yaml # Configurações específicas
1.2. Desafios específicos: dependências de dados, schemas mutáveis e reprodutibilidade
Os principais desafios incluem:
- Dependências de dados: pipelines dependem de datasets que mudam constantemente
- Schemas mutáveis: alterações em fontes de dados podem quebrar transformações
- Reprodutibilidade: executar o mesmo pipeline em momentos diferentes deve produzir resultados idênticos
# Exemplo de dependência de dados versionada
# pipeline_config.yaml
version: "2.1.0"
dependencies:
- dataset: "vendas_2024"
version: "2024-03-15"
- model: "previsao_demanda"
version: "v1.2.3"
1.3. Estrutura de repositório ideal para projetos de dados
A escolha entre monorepo e multirepo depende do tamanho da equipe e complexidade:
# Estrutura monorepo recomendada para pipelines de dados
data-pipeline/
├── .dvc/ # Configuração DVC
├── data/
│ ├── raw/ # Dados brutos (DVC)
│ └── processed/ # Dados processados (DVC)
├── src/
│ ├── extract/ # Scripts de extração
│ ├── transform/ # Scripts de transformação
│ └── load/ # Scripts de carga
├── dags/
│ ├── airflow/ # DAGs do Airflow
│ └── prefect/ # Fluxos do Prefect
├── tests/
│ ├── unit/ # Testes unitários
│ └── integration/ # Testes de integração
├── infra/
│ ├── docker/ # Dockerfiles
│ └── k8s/ # Manifests Kubernetes
├── .gitignore
├── dvc.yaml # Pipeline DVC
├── requirements.txt
└── README.md
2. Estratégias de branching para pipelines de dados
2.1. Git Flow adaptado para pipelines
# Estrutura de branches para pipelines de dados
main # Produção estável
├── develop # Integração de features
├── staging # Ambiente de staging
├── feature/* # Novas transformações
├── fix/* # Correções em pipelines
└── data/* # Atualizações de datasets
2.2. Branches para ambientes isolados
# Comandos para gerenciar ambientes
git checkout -b feature/nova_transformacao develop
# Desenvolve e testa localmente
git push origin feature/nova_transformacao
# Cria merge request para develop
git checkout develop
git merge feature/nova_transformacao
# Deploy para staging via CI/CD
git tag v2.1.0-rc1 staging
git push origin v2.1.0-rc1
2.3. Merge requests e code review
# Checklist para code review de pipelines
- [ ] Testes unitários passam
- [ ] Testes de integração com dados de staging
- [ ] Schema validation executado
- [ ] DVC lock file atualizado
- [ ] Dockerfile testado localmente
- [ ] Documentação de alterações no CHANGELOG
3. Versionamento de artefatos e dependências do pipeline
3.1. Gerenciamento de dependências
# requirements.txt versionado
pandas==2.0.3
apache-airflow==2.7.1
dvc==3.0.0
prefect==2.10.0
great-expectations==0.17.0
3.2. Versionamento de containers Docker
# Dockerfile com tags semânticas
FROM python:3.11-slim
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY src/ /app/src
COPY dags/ /app/dags
# Tagging strategy
docker build -t pipeline-etl:v2.1.0 .
docker tag pipeline-etl:v2.1.0 pipeline-etl:commit-abc1234
3.3. Controle de versão de datasets com DVC
# Inicializando DVC no projeto
dvc init
git commit -m "Initialize DVC"
# Adicionando dados ao versionamento
dvc add data/raw/vendas_2024.csv
git add data/raw/vendas_2024.csv.dvc
git commit -m "Add vendas 2024 dataset"
# Versionando diferentes versões do dataset
dvc commit -m "Update vendas 2024 with March data"
git tag data-vendas-2024-v2
4. Ferramentas específicas para versionamento de pipelines
4.1. DVC para versionamento de dados, modelos e métricas
# dvc.yaml - Pipeline versionado
stages:
preprocess:
cmd: python src/preprocess.py
deps:
- data/raw/vendas_2024.csv
outs:
- data/processed/vendas_clean.csv
train:
cmd: python src/train.py
deps:
- data/processed/vendas_clean.csv
outs:
- models/previsao_vendas.pkl
metrics:
- metrics/accuracy.json:
cache: false
# Executando pipeline versionado
dvc repro
dvc metrics diff
dvc push
4.2. Git LFS para arquivos grandes
# Configurando Git LFS
git lfs track "*.parquet"
git lfs track "*.csv"
git lfs track "*.pkl"
git add .gitattributes
git commit -m "Configure Git LFS for data files"
# Uso normal do git
git add data/large_dataset.parquet
git commit -m "Add large dataset"
4.3. CI/CD para automação
# .github/workflows/pipeline-tests.yml
name: Pipeline Tests
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
lfs: true
- uses: actions/setup-python@v4
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run unit tests
run: pytest tests/unit/
- name: Validate pipeline
run: dvc repro --dry
- name: Check metrics
run: dvc metrics diff --show-json
5. Integração com ferramentas de orquestração
5.1. Versionamento de DAGs no Airflow
# dags/etl_pipeline.py versionado
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'etl_vendas',
default_args=default_args,
description='Pipeline ETL versionado',
schedule_interval='@daily',
catchup=False,
version='2.1.0',
) as dag:
extract = PythonOperator(
task_id='extract_data',
python_callable=extract_function,
op_kwargs={'version': '{{ dag.dag_version }}'}
)
transform = PythonOperator(
task_id='transform_data',
python_callable=transform_function,
)
load = PythonOperator(
task_id='load_data',
python_callable=load_function,
)
extract >> transform >> load
5.2. Git hooks para validação
# .git/hooks/pre-commit
#!/bin/bash
echo "Running pre-commit checks..."
# Validar schema dos dados
python scripts/validate_schema.py
if [ $? -ne 0 ]; then
echo "Schema validation failed!"
exit 1
fi
# Verificar DVC lock file
dvc status
if [ $? -ne 0 ]; then
echo "DVC lock file out of sync!"
exit 1
fi
# Executar linter específico para pipeline
pylint src/ dags/
if [ $? -ne 0 ]; then
echo "Linting failed!"
exit 1
fi
echo "All checks passed!"
5.3. Estratégias de deploy
# Deploy blue-green com versionamento
# Deploy da versão atual (blue)
kubectl apply -f infra/k8s/pipeline-blue.yaml
# Testar versão blue
curl http://pipeline-blue.internal/health
# Deploy nova versão (green)
kubectl apply -f infra/k8s/pipeline-green.yaml
# Switch para green após validação
kubectl patch service pipeline -p '{"spec":{"selector":{"version":"green"}}}'
# Rollback rápido se necessário
kubectl patch service pipeline -p '{"spec":{"selector":{"version":"blue"}}}'
6. Boas práticas de versionamento para reprodutibilidade
6.1. Uso de tags e releases
# Criando releases semânticas
git tag -a v2.1.0 -m "Release v2.1.0 - New sales prediction model"
git push origin v2.1.0
# Tags para datasets específicos
git tag -a data-vendas-2024-v3 -m "Updated sales data with Q1 2024"
git push origin data-vendas-2024-v3
# Tags para modelos
git tag -a model-previsao-v2 -m "Improved prediction accuracy by 15%"
git push origin model-previsao-v2
6.2. Commits semânticos e CHANGELOG
# Padrão de commits semânticos
feat(pipeline): add new transformation for customer segmentation
fix(extract): handle null values in date columns
data(dataset): update sales data with March 2024 records
docs(readme): add deployment instructions for pipeline v2
chore(deps): update DVC to version 3.0
# CHANGELOG.md automático
# [2.1.0] - 2024-03-15
### Added
- New customer segmentation transformation
- Support for real-time data streaming
### Changed
- Updated sales prediction model (accuracy +15%)
- Optimized data extraction for large datasets
### Fixed
- Null value handling in date columns
- Memory leak in transformation pipeline
6.3. Estratégias de rollback
# Rollback de pipeline com DVC
# 1. Identificar versão estável anterior
git log --oneline --all | grep "Release v2.0.0"
# 2. Reverter código
git revert HEAD --no-commit
# 3. Reverter dados com DVC
dvc checkout data/raw/vendas_2024.csv.dvc
# 4. Reverter modelo
dvc checkout models/previsao_vendas.pkl.dvc
# 5. Commitar rollback
git commit -m "revert(pipeline): rollback to v2.0.0 due to accuracy issues"
# Rollback de deploy com Kubernetes
kubectl rollout undo deployment/pipeline-etl
kubectl rollout status deployment/pipeline-etl
Referências
- DVC Documentation: Data Version Control — Documentação oficial do DVC para versionamento de datasets, modelos e métricas em pipelines de dados
- Git LFS Official Documentation — Guia completo sobre Git Large File Storage para versionamento de arquivos grandes em pipelines
- Apache Airflow: Best Practices for DAG Versioning — Melhores práticas para versionamento de DAGs e pipelines no Airflow
- GitHub Actions: CI/CD for Data Pipelines — Tutorial oficial para configurar CI/CD em pipelines de dados com GitHub Actions
- Prefect Documentation: Versioning and Deployments — Guia de versionamento e deploy de fluxos de dados com Prefect
- Dagster: Pipeline Versioning and Reproducibility — Documentação sobre versionamento de pipelines e reprodutibilidade no Dagster
- Semantic Versioning Specification — Especificação oficial de versionamento semântico para releases de pipelines