Como projetar sistemas de scheduling distribuído com liderança eleitoral

1. Fundamentos do Scheduling Distribuído com Liderança Eleitoral

1.1. Conceitos-chave

Sistemas de scheduling distribuído coordenam a execução de tarefas em múltiplos nós de um cluster. Quando introduzimos liderança eleitoral, um nó é eleito líder para coordenar a distribuição e o monitoramento de jobs, enquanto os demais atuam como workers. O líder é responsável por manter o estado global do sistema, tomar decisões de agendamento e garantir que cada tarefa seja executada exatamente uma vez.

Os pilares desse modelo incluem:
- Eleição de líder: processo pelo qual os nós escolhem um coordenador único
- Consenso: acordo entre nós sobre o estado do sistema
- Coordenação: sincronização de ações entre líder e workers

1.2. Problemas comuns

O principal desafio é o split-brain, onde partições de rede fazem com que múltiplos nós se considerem líderes simultaneamente. Isso pode levar a execuções duplicadas de tarefas críticas. Outros problemas incluem:

  • Concorrência em tarefas que exigem exclusividade
  • Inconsistência de estado entre nós após falhas
  • Perda de jobs durante failover do líder

1.3. Cenários de uso

  • Cron jobs em clusters Kubernetes
  • Orquestração de jobs em data pipelines (Apache Airflow)
  • Tarefas periódicas em edge computing
  • Manutenção de índices em sistemas de busca distribuídos

2. Algoritmos de Eleição de Líder para Scheduling

2.1. Algoritmo de Bully e Raft

Bully: Quando um nó detecta ausência do líder, inicia eleição enviando mensagens para nós com IDs maiores. O nó com maior ID vence. Simples, mas gera tráfego O(n²).

Raft: Mais robusto, usa termos de liderança e logs replicados. Garante que apenas um líder exista por termo, mesmo com partições de rede. Ideal para sistemas que exigem consistência forte.

2.2. Implementação com ZooKeeper e etcd

Ambos oferecem ephemeral nodes (nós efêmeros) que desaparecem quando o cliente perde conexão. O líder cria um nó efêmero e renova periodicamente um lease. Se falha, o nó some e outro nó assume.

Exemplo com etcd (API v3):

# Worker tenta criar nó líder
etcdctl put /scheduler/leader worker-1 --lease=60s

# Se falhar (nó já existe), outro worker é líder
# Workers monitoram /scheduler/leader com watch

# Heartbeat do líder
etcdctl lease keep-alive <lease-id>

2.3. Estratégias de failover

  • Timeout de líder: se líder não renova lease em 3 heartbeats, assume-se falha
  • Reeleição automática: workers competem pelo nó efêmero
  • Graça de reconexão: líder antigo, ao reconectar, descobre novo líder e vira worker

3. Arquitetura do Sistema de Scheduling Distribuído

3.1. Componentes principais

+----------------+     +----------------+     +----------------+
|    Líder       |     |   Worker 1     |     |   Worker 2     |
| (Coordenador)  |<--->| (Executor)     |<--->| (Executor)     |
+----------------+     +----------------+     +----------------+
        |                       |                       |
        +-----------------------+-----------------------+
                                |
                      +------------------+
                      | Armazenamento    |
                      | de Jobs (etcd)   |
                      +------------------+
  • Líder: recebe submissões, agenda tarefas, distribui para workers
  • Workers: executam tarefas, reportam status
  • Armazenamento: mantém estado persistente dos jobs

3.2. Fluxo de execução

  1. Cliente submete job ao líder via API
  2. Líder persiste job no armazenamento
  3. Líder atribui job a worker específico
  4. Worker executa e reporta conclusão
  5. Líder atualiza estado e agenda próximas execuções

3.3. Modelos de consistência

  • Eventual consistency: adequado para jobs não críticos, maior throughput
  • Strong consistency: necessário para jobs financeiros, usa Raft/ZooKeeper

4. Mecanismos de Garantia de Execução Única

4.1. Controle de concorrência

Locks distribuídos evitam que dois workers executem o mesmo job:

# Worker tenta adquirir lock para job específico
# Usando Redis Redlock ou etcd
lock_key = /locks/job-123
if etcd.try_lock(lock_key, ttl=30s):
    execute_job()
    etcd.unlock(lock_key)
else:
    # Outro worker já está executando
    skip()

4.2. Tolerância a falhas

  • Retentativas: worker tenta novamente em caso de falha transitória
  • Deduplicação: líder mantém cache de jobs concluídos (últimos 5 minutos)
  • Idempotência: jobs projetados para serem seguros mesmo se executados múltiplas vezes

4.3. Checkpointing e recuperação

# Worker salva progresso periodicamente
checkpoint = {"job_id": "job-123", "progress": 75, "timestamp": 1700000000}
etcd.put(f"/checkpoints/{job_id}", serialize(checkpoint))

# Na reinicialização, worker lê último checkpoint
last = etcd.get(f"/checkpoints/{job_id}")
resume_from(last.progress)

5. Balanceamento de Carga e Escalabilidade

5.1. Distribuição dinâmica de tarefas

  • Round-robin: simples, mas ignora capacidade dos workers
  • Hashing consistente: jobs sempre vão para mesmo worker (bom para cache)
  • Filas prioritárias: jobs urgentes executam primeiro

5.2. Escalonamento horizontal

Workers podem ser adicionados/removidos sem interrupção:

# Novo worker registra-se
etcd.put(f"/workers/{worker_id}", {"status": "idle", "capacity": 5})

# Líder detecta novo worker e redistribui jobs
# Worker removido tem jobs reatribuídos após timeout

5.3. Monitoramento de desempenho

Métricas essenciais:
- Latência média de execução por job
- Throughput (jobs/segundo)
- Utilização de CPU/memória por worker
- Taxa de falhas e retentativas

6. Estratégias de Sincronização e Heartbeat

6.1. Heartbeat entre líder e workers

# Worker envia heartbeat a cada 5 segundos
while running:
    etcd.put(f"/heartbeats/{worker_id}", timestamp, lease=15s)
    sleep(5)

# Líder verifica heartbeats
if time.now() - last_heartbeat(worker_id) > 15s:
    mark_as_failed(worker_id)
    redistribute_jobs(worker_id)

6.2. Sincronização de estado

Metadados de jobs (status, worker atribuído, próxima execução) são replicados via etcd. Workers leem apenas, líder escreve.

6.3. Tratamento de partições de rede

  • Quorum mínimo: líder só age se tiver contato com maioria dos workers
  • Modo degradado: workers continuam executando jobs locais, mas não aceitam novos

7. Casos Práticos e Implementação de Referência

7.1. Exemplo com etcd e workers em Go

// Eleição de líder simplificada
func electLeader() {
    lease, _ := client.Grant(ctx, 10)
    resp, err := client.Put(ctx, "/scheduler/leader", myID,
        clientv3.WithLease(lease.ID))
    if err != nil {
        // Outro nó é líder
        watchLeader()
    } else {
        becomeLeader()
        go keepAlive(lease.ID)
    }
}

// Scheduling de tarefa periódica
func scheduleJob(job Job) {
    // Persiste job
    etcd.Put(f"/jobs/{job.ID}", serialize(job))

    // Agenda próxima execução
    nextTime := time.Now().Add(job.Interval)
    etcd.Put(f"/schedule/{nextTime.Unix()}/{job.ID}", "")
}

7.2. Integração com sistemas de filas

Redis ou Kafka podem servir como buffer de jobs:

# Líder publica job no Redis
redis.LPush("job_queue", jobData)

# Workers consomem da fila
jobData = redis.BRPop("job_queue", timeout=5)
execute(jobData)

7.3. Testes de resiliência

# Simular falha de líder
kill -9 $(pgrep scheduler-leader)
# Workers detectam ausência e elegem novo líder em ~15s

# Simular partição de rede
iptables -A INPUT -s worker-3 -j DROP
# Worker-3 fica isolado, jobs são redistribuídos
# Ao restaurar, worker-3 descobre novo líder

Referências