Batch processing no K8s: Kubernetes Batch e KubeFlow

1. Fundamentos do Batch Processing no Kubernetes

1.1 O que são workloads batch e por que rodá-los no K8s

Workloads batch são tarefas computacionais que executam de início ao fim, processam um volume finito de dados e, ao concluir, encerram-se automaticamente. Diferentemente de serviços web que permanecem ativos 24/7, jobs batch consomem recursos apenas durante sua execução.

Rodar batch processing no Kubernetes oferece vantagens estratégicas:
- Infraestrutura unificada: mesmo cluster para microsserviços e jobs batch
- Elasticidade nativa: escalonamento horizontal sob demanda
- Gerenciamento de falhas integrado: restart policies, backoff limits e TTLs
- Isolamento por contêineres: dependências encapsuladas, reprodutibilidade

1.2 Diferenças entre jobs interativos, serviços contínuos e jobs batch

Tipo Comportamento Exemplo
Serviço contínuo (Deployment) Mantém N pods sempre ativos API REST, web server
Job interativo (Pod direto) CLI ou debug temporário kubectl run -it --rm
Job batch Executa até completar, então termina ETL diário, treinamento de ML

1.3 Arquitetura do scheduler do Kubernetes para jobs: limitações e oportunidades

O scheduler padrão do K8s aloca pods individualmente. Para jobs batch, isso gera limitações:

  • Scheduling de gang: jobs distribuídos (ex: MPI, PyTorch DDP) exigem que todos os pods iniciem simultaneamente
  • Priorização: jobs batch podem competir com serviços críticos
  • Escalonamento de recursos: jobs podem exigir alocação explícita de GPUs ou grandes volumes de memória

Oportunidades surgem com projetos como Volcano, Kueue e KubeFlow, que estendem o scheduler nativo.

2. Kubernetes Batch API: Jobs, CronJobs e Work Queues

2.1 Criando e gerenciando Jobs: paralelismo, completions e backoff

Um Job Kubernetes executa um ou mais pods até que um número específico de conclusões bem-sucedidas seja atingido.

apiVersion: batch/v1
kind: Job
metadata:
  name: processador-lotes
spec:
  completions: 10
  parallelism: 3
  backoffLimit: 4
  ttlSecondsAfterFinished: 3600
  template:
    spec:
      containers:
      - name: worker
        image: alpine:3.19
        command: ["sh", "-c", "sleep 10 && echo 'Lote processado'"]
      restartPolicy: Never
  • completions: 10 — total de execuções bem-sucedidas
  • parallelism: 3 — até 3 pods simultâneos
  • backoffLimit: 4 — número máximo de tentativas antes de marcar como Failed
  • ttlSecondsAfterFinished — limpeza automática após conclusão

2.2 CronJobs: agendamento recorrente de tarefas batch

CronJobs criam Jobs em intervalos definidos por expressão cron.

apiVersion: batch/v1
kind: CronJob
metadata:
  name: backup-diario
spec:
  schedule: "0 2 * * *"
  startingDeadlineSeconds: 300
  concurrencyPolicy: Forbid
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - name: backup
            image: busybox:1.36
            command: ["sh", "-c", "pg_dump -h db -U user mydb > /backup/dump.sql"]
          restartPolicy: Never
  • concurrencyPolicy: Forbid — impede execução simultânea
  • startingDeadlineSeconds — tolerância máxima de atraso

2.3 Padrões avançados: work queues com Redis/RabbitMQ e index jobs

Work Queue com Redis: jobs consomem tarefas de uma fila externa.

apiVersion: batch/v1
kind: Job
metadata:
  name: redis-worker
spec:
  parallelism: 5
  completions: 1
  template:
    spec:
      containers:
      - name: worker
        image: redis:7-alpine
        command: ["redis-cli", "-h", "redis-service", "brpop", "task-queue", "0"]
      restartPolicy: Never

Index Jobs: cada pod recebe um índice único via variável de ambiente JOB_COMPLETION_INDEX.

apiVersion: batch/v1
kind: Job
metadata:
  name: processa-particoes
spec:
  completions: 4
  parallelism: 2
  completionMode: Indexed
  template:
    spec:
      containers:
      - name: worker
        image: alpine:3.19
        env:
        - name: JOB_COMPLETION_INDEX
          valueFrom:
            fieldRef:
              fieldPath: metadata.labels['batch.kubernetes.io/job-completion-index']
        command: ["sh", "-c", "echo 'Processando partição $JOB_COMPLETION_INDEX'"]
      restartPolicy: Never

3. Gerenciamento de Recursos e Escalabilidade para Batch

3.1 Resource Quotas, LimitRanges e prioridades para workloads batch

ResourceQuota por namespace:

apiVersion: v1
kind: ResourceQuota
metadata:
  name: batch-quota
  namespace: batch-ns
spec:
  hard:
    requests.cpu: "10"
    requests.memory: "32Gi"
    limits.cpu: "20"
    limits.memory: "64Gi"
    count/jobs.batch: "50"

PriorityClass para priorizar jobs sobre serviços:

apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: batch-high
value: 1000
globalDefault: false
description: "Prioridade alta para jobs batch críticos"

3.2 Escalonamento eficiente: Kueue, Volcano e scheduling de gang

Volcano implementa scheduling de gang para jobs distribuídos:

apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
  name: treinamento-distribuido
spec:
  minAvailable: 4
  schedulerName: volcano
  tasks:
  - replicas: 4
    name: worker
    template:
      spec:
        containers:
        - name: worker
          image: pytorch/pytorch:2.0.1-cuda11.7
          command: ["python", "train.py"]
        resources:
          requests:
            nvidia.com/gpu: 1
  • minAvailable: 4 — todos os 4 workers devem ser alocados simultaneamente
  • schedulerName: volcano — delega ao scheduler Volcano

3.3 Tratamento de falhas, retry policies e TTLs para jobs

spec:
  backoffLimit: 6
  activeDeadlineSeconds: 3600
  ttlSecondsAfterFinished: 86400
  • backoffLimit: tentativas exponenciais (1s, 2s, 4s, 8s...)
  • activeDeadlineSeconds: timeout total do job
  • ttlSecondsAfterFinished: limpeza automática após sucesso ou falha

4. KubeFlow: Plataforma de ML Pipelines e Batch

4.1 Visão geral do ecossistema KubeFlow

KubeFlow é uma plataforma MLOps que integra:

  • Pipelines: orquestração de DAGs de componentes batch
  • Training Operator: suporte nativo a TensorFlow, PyTorch, MXNet, MPI
  • Katib: AutoML e hyperparameter tuning
  • KFServing: deploy serverless de modelos

4.2 KubeFlow Pipelines: orquestração de DAGs batch

Pipeline declarado em Python com o SDK do KubeFlow:

import kfp
from kfp import dsl, components

@dsl.component
def preprocess(data_path: str) -> str:
    return f"/processed/{data_path.split('/')[-1]}"

@dsl.component
def train(processed_data: str, epochs: int) -> str:
    return f"/models/model_{epochs}e.pkl"

@dsl.pipeline(name="batch-ml-pipeline")
def ml_pipeline(data_path: str = "/data/raw", epochs: int = 10):
    prep = preprocess(data_path=data_path)
    train_model = train(processed_data=prep.output, epochs=epochs)

kfp.compiler.Compiler().compile(ml_pipeline, "pipeline.yaml")

4.3 KubeFlow Training Operator: suporte a frameworks

PyTorchJob para treinamento distribuído:

apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
  name: pytorch-dist-train
spec:
  pytorchReplicaSpecs:
    Master:
      replicas: 1
      template:
        spec:
          containers:
          - name: pytorch
            image: pytorch/pytorch:2.0.1-cuda11.7
            command: ["python", "-m", "torch.distributed.run", "train.py"]
    Worker:
      replicas: 3
      template:
        spec:
          containers:
          - name: pytorch
            image: pytorch/pytorch:2.0.1-cuda11.7
            command: ["python", "-m", "torch.distributed.run", "train.py"]
          resources:
            limits:
              nvidia.com/gpu: 1

5. Exemplos Práticos com Código

5.1 Deploy de um Job Kubernetes simples com paralelismo

# job-paralelo.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: etl-diario
spec:
  completions: 20
  parallelism: 5
  backoffLimit: 3
  template:
    spec:
      containers:
      - name: etl-worker
        image: python:3.11-slim
        command: ["python", "-c", "
import time, os
idx = os.environ.get('JOB_COMPLETION_INDEX', '0')
print(f'Worker {idx}: iniciando ETL...')
time.sleep(5)
print(f'Worker {idx}: concluído')
"]
      restartPolicy: Never
kubectl apply -f job-paralelo.yaml
kubectl get jobs --watch
kubectl logs job/etl-diario

5.2 Pipeline KubeFlow completo: pré-processamento, treino, avaliação

# pipeline_completo.py
import kfp
from kfp import dsl

@dsl.component(base_image="python:3.11-slim")
def load_data(url: str) -> str:
    return f"/data/raw/{url.split('/')[-1]}"

@dsl.component(base_image="python:3.11-slim", packages_to_install=["pandas", "scikit-learn"])
def preprocess(data_path: str) -> str:
    import pandas as pd
    df = pd.read_csv(data_path)
    df.to_parquet("/data/processed/dataset.parquet")
    return "/data/processed/dataset.parquet"

@dsl.component(base_image="python:3.11-slim", packages_to_install=["scikit-learn", "joblib"])
def train(processed_path: str, model_path: str) -> str:
    from sklearn.ensemble import RandomForestClassifier
    import pandas as pd
    import joblib
    df = pd.read_parquet(processed_path)
    model = RandomForestClassifier().fit(df.drop("target", axis=1), df["target"])
    joblib.dump(model, f"{model_path}/model.pkl")
    return f"{model_path}/model.pkl"

@dsl.pipeline(name="ml-batch-pipeline")
def ml_batch(url: str = "https://example.com/dataset.csv"):
    data = load_data(url=url)
    processed = preprocess(data_path=data.output)
    trained = train(processed_path=processed.output, model_path="/models")

kfp.compiler.Compiler().compile(ml_batch, "ml_batch_pipeline.yaml")

5.3 Uso de Volcano para scheduling de gang em jobs distribuídos

# volcano-gang-job.yaml
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
  name: mpi-gang
spec:
  minAvailable: 4
  schedulerName: volcano
  queue: default
  tasks:
  - replicas: 1
    name: launcher
    template:
      spec:
        containers:
        - image: mpioperator/openmpi:latest
          name: mpi-launcher
          command: ["mpirun", "--allow-run-as-root", "-np", "3", "/app/mpi_hello"]
  - replicas: 3
    name: worker
    template:
      spec:
        containers:
        - image: mpioperator/openmpi:latest
          name: mpi-worker
          command: ["sleep", "infinity"]
        resources:
          requests:
            cpu: "2"
            memory: "4Gi"

6. Monitoramento, Logs e Observabilidade em Batch

6.1 Coleta de logs e métricas específicas para jobs batch

Prometheus + kube-state-metrics: métricas como kube_job_status_succeeded, kube_job_status_failed, kube_job_status_active.

Fluentd para logs centralizados:

apiVersion: v1
kind: ConfigMap
metadata:
  name: fluentd-batch-config
data:
  fluent.conf: |
    <source>
      @type tail
      path /var/log/containers/*.log
      tag kubernetes.*
      <parse>
        @type json
      </parse>
    </source>
    <match kubernetes.var.log.containers.**batch**>
      @type elasticsearch
      host elasticsearch-logging
      port 9200
      logstash_format true
    </match>

6.2 Alertas e notificações para falhas e deadlines de jobs

# prometheus-alert.yaml
groups:
- name: batch-alerts
  rules:
  - alert: JobFailing
    expr: kube_job_status_failed > 0
    for: 5m
    annotations:
      summary: "Job {{ $labels.job_name }} falhou"
  - alert: JobStuckActive
    expr: kube_job_status_active > 0 and time() - kube_job_status_start_time > 3600
    annotations:
      summary: "Job {{ $labels.job_name }} ativo por mais de 1 hora"

6.3 Visualização de pipelines KubeFlow no Kubeflow Dashboard

O Kubeflow Dashboard oferece:
- Visualização gráfica de DAGs de pipelines
- Histórico de execuções com artefatos e parâmetros
- Comparação de experimentos e runs

7. Casos de Uso e Considerações de Produção

7.1 Exemplos reais

  • ETL: extração, transformação e carga de dados em data warehouses
  • Treinamento de modelos: jobs distribuídos com GPU para deep learning
  • Processamento de imagens/vídeo: transcodificação, análise frame-a-frame
  • Relatórios financeiros: cálculos batch noturnos com deadlines rígidos

7.2 Estratégias de custo e otimização

Spot instances via priorityClassName:

apiVersion: v1
kind: PriorityClass
metadata:
  name: batch-spot
value: 100
globalDefault: false
description: "Jobs tolerantes a interrupção"

# Node affinity para spot nodes
spec:
  affinity:
    nodeAffinity:
      preferredDuringScheduling:
      - weight: 100
        preference:
          matchExpressions:
          - key: "spot"
            operator: In
            values: ["true"]

7.3 Integração com armazenamento persistente

PVC compartilhado entre jobs:

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: batch-data
spec:
  accessModes:
  - ReadWriteMany
  resources:
    requests:
      storage: 100Gi
  storageClassName: nfs-client

MinIO para dados de entrada/saída:

env:
- name: S3_ENDPOINT
  value: "http://minio-service:9000"
- name: S3_ACCESS_KEY
  valueFrom:
    secretKeyRef:
      name: minio-creds
      key: access-key
- name: S3_SECRET_KEY
  valueFrom:
    secretKeyRef:
      name: minio-creds
      key: secret-key

Referências

ação sobre operadores de treinamento distribuído (TFJob, PyTorchJob, MPIJob)

Conclusão

Batch processing no Kubernetes evoluiu significativamente desde os simples Jobs e CronJobs. Com ferramentas como Kueue, Volcano e KubeFlow, é possível orquestrar desde tarefas ETL simples até pipelines complexos de machine learning com centenas de workers distribuídos.

A escolha entre Kubernetes Batch nativo e KubeFlow depende do cenário:
- Kubernetes Batch é ideal para jobs independentes, cronogramas fixos e workloads que não exigem orquestração complexa de DAGs
- KubeFlow brilha em cenários de ML, com pipelines reutilizáveis, experimentação e integração nativa com frameworks de treinamento

Para produção, considere:
- Utilizar Volcano ou Kueue para evitar starvation e melhorar utilização de recursos
- Implementar monitoramento com Prometheus e alertas para falhas
- Adotar spot instances para reduzir custos em jobs tolerantes a interrupção
- Garantir persistência de dados com PVCs compartilhados ou armazenamento externo (S3/MinIO)

Com essas ferramentas e padrões, sua plataforma Kubernetes estará pronta para executar workloads batch de forma eficiente, escalável e resiliente.