Como aplicar o padrão competing consumers para processamento paralelo
1. Fundamentos do Padrão Competing Consumers
O padrão competing consumers é uma arquitetura de mensageria onde múltiplos consumidores competem por mensagens em uma mesma fila, garantindo que cada mensagem seja processada por exatamente um consumidor. O objetivo principal é balancear a carga de trabalho entre consumidores concorrentes, permitindo processamento paralelo e escalabilidade horizontal.
Diferentemente do padrão publish-subscribe, onde cada mensagem é entregue a todos os assinantes, no competing consumers cada mensagem é consumida uma única vez. No modelo point-to-point tradicional, um único consumidor processa a fila; no competing consumers, múltiplos consumidores dividem o trabalho.
Cenários típicos incluem:
- Processamento de pedidos em e-commerce
- Ingestão e transformação de logs em tempo real
- Processamento de jobs em lote
- Envio de notificações em massa
2. Arquitetura e Componentes Principais
A arquitetura do padrão competing consumers é composta por três elementos fundamentais:
Fila como ponto central de desacoplamento: A fila (ex: RabbitMQ, Apache Kafka, Amazon SQS) atua como buffer entre produtores e consumidores. Ela armazena mensagens até que sejam processadas, permitindo que produtores e consumidores operem em velocidades diferentes.
Papel do produtor: O produtor enfileira mensagens com payloads estruturados. É crucial implementar idempotência no lado do produtor para evitar duplicatas em caso de falhas de rede.
Papel do consumidor: Cada consumidor processa mensagens de forma independente, utilizando confirmação explícita (ack) para sinalizar sucesso ou rejeição (nack) para indicar falha.
3. Configuração de Concorrência e Escalabilidade
A configuração de concorrência depende do modelo de execução:
# Configuração de thread pool para consumidores
consumidores:
numero_instancias: 3
threads_por_instancia: 10
prefetch_count: 5
batch_size: 100
timeout_segundos: 30
Estratégias de escalonamento horizontal:
- Adicionar consumidores quando a fila ultrapassar um threshold
- Remover consumidores ociosos para economizar recursos
- Utilizar auto-scaling groups em ambientes cloud
Controle de throughput:
- Prefetch count: Número máximo de mensagens não confirmadas por consumidor
- Batch size: Quantidade de mensagens processadas em lote
- Timeouts: Limite de tempo para processamento de cada mensagem
4. Garantia de Processamento e Tolerância a Falhas
Para garantir processamento confiável, implemente:
Mecanismos de retry e dead-letter queues (DLQ):
# Configuração de retry com backoff exponencial
retry:
max_tentativas: 3
backoff_inicial: 1s
backoff_maximo: 30s
dlq: "fila_erros"
Idempotência no consumidor: Utilize identificadores únicos (correlation IDs) para detectar e ignorar mensagens duplicadas durante reprocessamentos.
Tratamento de mensagens lentas: Implemente heartbeats para sinalizar que o consumidor ainda está ativo e configure redelivery automático para mensagens não confirmadas dentro do timeout.
5. Monitoramento e Observabilidade
Métricas essenciais para operação do padrão:
# Métricas de monitoramento
metricas:
- nome: "fila_lag"
descricao: "Número de mensagens não processadas"
alerta: "> 10000"
- nome: "taxa_processamento"
descricao: "Mensagens processadas por segundo"
alerta: "< 50"
- nome: "erros_por_consumidor"
descricao: "Taxa de erros por instância"
alerta: "> 5%"
- nome: "dlq_tamanho"
descricao: "Mensagens na fila de dead letter"
alerta: "> 100"
Logs estruturados com correlation IDs permitem rastrear o fluxo completo de cada mensagem desde a produção até o consumo.
Alertas para degradação: Configure notificações para consumidores parados, fila crescendo rapidamente ou DLQ acumulando mensagens.
6. Exemplo Prático com Código
Estrutura do produtor:
# produtor.py
import json
import uuid
from fila import FilaMensageria
fila = FilaMensageria("fila_pedidos")
def produzir_mensagem(pedido):
mensagem = {
"correlation_id": str(uuid.uuid4()),
"tipo": "pedido_criado",
"payload": {
"pedido_id": pedido["id"],
"cliente": pedido["cliente"],
"itens": pedido["itens"],
"total": pedido["total"]
},
"timestamp": "2024-01-15T10:30:00Z"
}
fila.enfileirar(json.dumps(mensagem))
print(f"Mensagem enfileirada: {mensagem['correlation_id']}")
# Exemplo de uso
pedido_exemplo = {
"id": 12345,
"cliente": "Maria Silva",
"itens": ["produto_a", "produto_b"],
"total": 150.00
}
produzir_mensagem(pedido_exemplo)
Estrutura do consumidor:
# consumidor.py
import json
import time
from fila import FilaMensageria
from threading import Thread
fila = FilaMensageria("fila_pedidos")
def processar_mensagem(mensagem):
try:
dados = json.loads(mensagem)
correlation_id = dados["correlation_id"]
print(f"Processando mensagem {correlation_id}")
# Simula processamento
time.sleep(0.5)
# Confirma processamento bem-sucedido
fila.confirmar(mensagem)
print(f"Mensagem {correlation_id} processada com sucesso")
except Exception as erro:
print(f"Erro ao processar mensagem: {erro}")
fila.rejeitar(mensagem, requeue=False)
def consumir_mensagens():
while True:
mensagem = fila.receber(prefetch_count=5)
if mensagem:
processar_mensagem(mensagem)
else:
time.sleep(1)
# Configuração de concorrência com 10 threads
NUM_THREADS = 10
threads = []
for i in range(NUM_THREADS):
thread = Thread(target=consumir_mensagens, name=f"Consumidor-{i}")
thread.daemon = True
threads.append(thread)
thread.start()
# Aguarda threads (em produção, use graceful shutdown)
for thread in threads:
thread.join()
Configuração de concorrência:
# config.yaml
fila:
nome: "fila_pedidos"
tipo: "rabbitmq"
host: "localhost"
porta: 5672
consumidor:
threads: 10
prefetch_count: 5
batch_size: 100
timeout: 30
retry_max: 3
dlq: "fila_pedidos_dlq"
7. Desafios e Boas Práticas Avançadas
Ordenação parcial de mensagens: Utilize partições por chave (sharding) para garantir que mensagens relacionadas sejam processadas em ordem:
# Configuração de partições por chave
particoes:
- chave: "cliente_id"
- chave: "pedido_id"
- numero_particoes: 10
Evitar starvation: Implemente fairness entre consumidores utilizando prioridades ou pesos:
# Configuração de fairness
fairness:
modo: "round_robin"
pesos:
consumidor_alta_prioridade: 3
consumidor_baixa_prioridade: 1
timeout_starvation: 60
Testes de resiliência: Simule falhas de consumidor e picos de carga:
# Testes de resiliência
testes:
- nome: "falha_consumidor"
descricao: "Simular queda de 2 consumidores"
acao: "matar_processo"
- nome: "pico_carga"
descricao: "Produzir 10000 mensagens em 1 segundo"
acao: "aumentar_producao"
- nome: "lentidao"
descricao: "Consumidor com delay de 10s por mensagem"
acao: "injetar_lentidao"
Boas práticas adicionais:
- Implemente circuit breaker para evitar sobrecarga
- Utilize health checks para detectar consumidores mortos
- Configure graceful shutdown para drenar mensagens em processamento
- Monitore a taxa de rejeição para identificar problemas no consumidor
Referências
-
Padrão Competing Consumers - Microsoft Azure Architecture Center — Documentação oficial da Microsoft sobre o padrão, com exemplos de implementação e considerações de design.
-
RabbitMQ: Consumer Prefetch — Guia oficial do RabbitMQ sobre configuração de prefetch count e balanceamento de carga entre consumidores.
-
Apache Kafka: Consumer Groups — Documentação oficial do Kafka sobre grupos de consumidores e rebalanceamento de partições.
-
Amazon SQS: Competing Consumers Pattern — Tutorial da AWS sobre implementação do padrão com SQS e Lambda para processamento paralelo.
-
Padrões de Mensageria: Competing Consumers vs Publish-Subscribe — Artigo clássico do catálogo Enterprise Integration Patterns explicando as diferenças entre padrões de mensageria.