Como usar message deduplication em filas para garantir exatamente uma entrega

1. Fundamentos da Deduplicação de Mensagens em Filas

1.1. Definição de "exatamente uma entrega" vs. "pelo menos uma"

Em sistemas de mensageria, existem três modelos principais de garantia de entrega:

  • At-most-once (no máximo uma): a mensagem é entregue zero ou uma vez. Pode ser perdida, mas nunca duplicada.
  • At-least-once (pelo menos uma): a mensagem é entregue uma ou mais vezes. Pode haver duplicatas, mas nunca perda.
  • Exactly-once (exatamente uma): a mensagem é entregue exatamente uma vez. É o modelo mais difícil de implementar, pois exige coordenação entre produtor, fila e consumidor.

A deduplicação de mensagens é a técnica central para transformar um sistema "at-least-once" em "exactly-once".

1.2. Causas de duplicação

Duplicatas ocorrem por diversas razões:

Causas comuns de duplicação em filas:
- Retries do produtor após timeout de confirmação
- Falhas de rede que ocultam confirmações bem-sucedidas
- Rebalanceamento de consumidores em grupos
- Processamento lento que excede o visibility timeout
- Falhas no consumidor após processamento mas antes do ACK

1.3. Trade-offs

Abordagem Desempenho Latência Garantia
Sem deduplicação Alto Baixa At-least-once
Deduplicação em cache Médio Média Exactly-once (com janela)
Deduplicação em BD Baixo Alta Exactly-once (persistente)

2. Identificação Única de Mensagens (Message ID)

2.1. Geração de IDs únicos

O primeiro passo é criar um identificador único para cada mensagem.

Exemplo de geração de Message ID em Python:

import uuid
import hashlib
import time

# Abordagem 1: UUID aleatório
message_id = str(uuid.uuid4())

# Abordagem 2: Hash baseado no conteúdo
content_hash = hashlib.sha256(payload.encode()).hexdigest()

# Abordagem 3: ID sequencial com origem
message_id = f"order-service-{timestamp}-{sequence_number}"

2.2. Inclusão do ID no cabeçalho ou payload

Estrutura de mensagem com ID de deduplicação:

{
  "messageId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "timestamp": 1700000000,
  "payload": {
    "orderId": 12345,
    "amount": 99.90
  }
}

2.3. Tratamento de mensagens sem ID explícito

Quando o sistema legado não fornece IDs, use chaves compostas:

Chave composta para deduplicação:
{origem_sistema}:{tipo_evento}:{timestamp_unico}:{sequencia}

Exemplo: "payment-gateway:charge:1700000000:42"

3. Armazenamento de Estado de Deduplicação

3.1. Cache distribuído com TTL

Redis é a escolha mais comum para armazenamento temporário:

Configuração de deduplicação no Redis:

# Conectar ao Redis
redis_client = Redis(host='cache-cluster', port=6379)

# Verificar se mensagem já foi processada
def is_duplicate(message_id):
    return redis_client.exists(f"dedup:{message_id}")

# Marcar como processada com TTL de 24 horas
def mark_processed(message_id):
    redis_client.setex(f"dedup:{message_id}", 86400, "1")

3.2. Banco de dados relacional persistente

Para cenários que exigem histórico permanente:

Tabela de mensagens processadas:

CREATE TABLE processed_messages (
    message_id VARCHAR(64) PRIMARY KEY,
    consumer_id VARCHAR(64),
    processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    status VARCHAR(20),
    payload_hash VARCHAR(64),
    INDEX idx_processed_at (processed_at)
);

3.3. Mecanismos de expiração

Limpeza automática de registros antigos:

-- Remover registros com mais de 7 dias
DELETE FROM processed_messages 
WHERE processed_at < NOW() - INTERVAL 7 DAY;

4. Implementação no Produtor

4.1. Geração e anexação do ID único

Produtor com deduplicação integrada:

import boto3
import uuid
import json

sqs = boto3.client('sqs', region_name='us-east-1')

def send_message_with_dedup(queue_url, payload):
    message_id = str(uuid.uuid4())

    response = sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps({
            "messageId": message_id,
            "payload": payload
        }),
        MessageDeduplicationId=message_id  # AWS SQS nativo
    )
    return response

4.2. Lógica de retry com idempotência

Retry idempotente no produtor:

def send_with_retry(queue_url, payload, max_retries=3):
    message_id = str(uuid.uuid4())

    for attempt in range(max_retries):
        try:
            response = sqs.send_message(
                QueueUrl=queue_url,
                MessageBody=json.dumps({
                    "messageId": message_id,
                    "payload": payload
                }),
                MessageDeduplicationId=message_id
            )
            return response
        except Exception as e:
            if attempt == max_retries - 1:
                raise e
            time.sleep(2 ** attempt)  # Exponential backoff

4.3. Uso de filas com suporte nativo

AWS SQS oferece deduplicação nativa com FIFO queues:

Configuração de fila FIFO no AWS SQS:
- Tipo: FIFO
- Content-based deduplication: ativado
- Deduplication scope: queue
- Message retention period: 4 dias
- Visibility timeout: 30 segundos

5. Implementação no Consumidor

5.1. Verificação de duplicação antes do processamento

Consumidor com verificação de duplicação:

def process_message(message):
    message_id = json.loads(message['Body'])['messageId']

    # Verificar duplicação
    if redis_client.exists(f"dedup:{message_id}"):
        print(f"Mensagem {message_id} já processada. Ignorando.")
        return True  # ACK sem processar

    # Processar mensagem
    try:
        process_business_logic(message)
        mark_processed(message_id)
        return True  # ACK bem-sucedido
    except Exception as e:
        return False  # Não marca como processada

5.2. Transação atômica

Transação atômica com banco de dados:

def process_with_transaction(message):
    message_id = json.loads(message['Body'])['messageId']

    with db.transaction():
        # Verificar duplicação dentro da transação
        if db.exists("SELECT 1 FROM processed_messages WHERE message_id = ?", message_id):
            return

        # Executar lógica de negócio
        update_inventory(message['payload'])

        # Marcar como processada na mesma transação
        db.execute(
            "INSERT INTO processed_messages (message_id) VALUES (?)",
            message_id
        )

5.3. Tratamento de falhas

Estratégia de falhas com DLQ:

def consumer_loop(queue_url, dlq_url):
    while True:
        messages = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10)

        for msg in messages.get('Messages', []):
            try:
                if not process_message(msg):
                    # Falha de processamento - enviar para DLQ
                    sqs.send_message(
                        QueueUrl=dlq_url,
                        MessageBody=msg['Body']
                    )
            except Exception as e:
                # Erro crítico - enviar para DLQ
                sqs.send_message(
                    QueueUrl=dlq_url,
                    MessageBody=json.dumps({
                        "originalMessage": msg['Body'],
                        "error": str(e)
                    })
                )

6. Padrões de Arquitetura para Alta Disponibilidade

6.1. Combinação com Dead Letter Queue

Arquitetura com DLQ para deduplicação:

[Produtor] -> [Fila Principal] -> [Consumidor]
                    |                    |
                    v                    v
              [Redrive Policy]     [DLQ - Duplicatas]

6.2. Estratégias de particionamento

Particionamento do estado de deduplicação:

# Hash ring para distribuir estado
def get_cache_node(message_id):
    hash_value = hash(message_id)
    return cache_nodes[hash_value % len(cache_nodes)]

6.3. Monitoramento de taxa de duplicação

Métricas importantes para monitorar:

- Taxa de duplicação: (mensagens duplicadas / total) * 100
- Latência de verificação: tempo médio de consulta ao cache
- Taxa de expiração: mensagens que expiraram antes do processamento
- Tamanho do cache de deduplicação

7. Casos de Uso e Exemplos Práticos

7.1. Processamento de pagamentos

Cenário: Processamento de cobrança de cartão de crédito

1. Usuário clica em "Comprar"
2. Sistema gera messageId = "payment:user123:order456:timestamp"
3. Mensagem enviada para fila FIFO do SQS
4. Consumidor verifica se messageId já foi processado
5. Se não, executa cobrança e marca como processado
6. Se sim, ignora (evita cobrança duplicada)

7.2. Atualização de inventário

Cenário: Atualização de estoque em e-commerce

message_id = hash(f"inventory:sku123:qty-5:warehouse-1")

7.3. Envio de notificações

Cenário: Notificação de confirmação de pedido

message_id = f"email:user@example.com:order-789:1700000000"

8. Desafios e Boas Práticas

8.1. Limitações de TTL

Mensagens que chegam após o TTL do cache não serão detectadas como duplicatas. Defina TTL baseado no SLA máximo de processamento.

8.2. Consistência eventual vs. consistência forte

Escolha baseada no caso de uso:

- Pagamentos: consistência forte (transação atômica)
- Notificações: consistência eventual (aceitável perder algumas)
- Logs: consistência eventual (duplicatas toleráveis)

8.3. Testes de idempotência

Teste de cenário de falha:

1. Enviar mensagem A com ID único
2. Consumidor inicia processamento
3. Simular falha antes do ACK
4. Mensagem A é reentregue
5. Verificar se processamento ocorre apenas uma vez

Referências