Como usar o padrão inbox para garantir entrega de mensagens recebidas

1. Fundamentos do padrão inbox em sistemas distribuídos

Em sistemas distribuídos, a comunicação assíncrona entre serviços é essencial para escalabilidade e desacoplamento. No entanto, essa abordagem introduz um desafio crítico: como garantir que uma mensagem recebida seja processada exatamente uma vez, mesmo diante de falhas de rede, reinicializações de serviços ou concorrência?

O padrão inbox (ou "caixa de entrada") resolve esse problema ao persistir cada mensagem recebida em um armazenamento durável antes de qualquer processamento. Diferente de padrões como outbox (que foca no envio confiável) ou saga (que coordena transações distribuídas), o inbox garante que mensagens não sejam perdidas ou processadas múltiplas vezes.

Comparação com outros padrões:

Padrão Foco Garantia
Inbox Recebimento Entrega exatamente uma vez
Outbox Envio Envio pelo menos uma vez
Saga Consistência Compensação de falhas
Retry Recuperação Repetição com backoff

2. Arquitetura e componentes do padrão inbox

A arquitetura típica consiste em três componentes principais:

  1. Tabela/coleção inbox: Armazena mensagens recebidas com campos como message_id, payload, status, created_at e processed_at
  2. Consumidor: Responsável por realizar polling ou receber push de mensagens, persistindo-as antes do processamento
  3. Processador: Lê da tabela inbox, executa a lógica de negócio e marca como processado

Estrutura da tabela inbox em SQL:

CREATE TABLE inbox (
    message_id UUID PRIMARY KEY,
    payload JSONB NOT NULL,
    status VARCHAR(20) DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT NOW(),
    processed_at TIMESTAMP,
    retry_count INT DEFAULT 0,
    last_error TEXT
);

CREATE INDEX idx_inbox_pending ON inbox (status, created_at)
    WHERE status = 'pending';

3. Implementação prática do processamento de mensagens recebidas

O fluxo completo segue estas etapas:

Recepção → Persistência → Processamento → Remoção/Arquivamento

Exemplo de código: Consumidor com polling e persistência

# Pseudocódigo: Consumidor de fila com padrão inbox

function consume_messages():
    while True:
        # 1. Receber mensagem da fila (ex: RabbitMQ, Kafka)
        message = queue.receive()

        # 2. Verificar duplicidade
        if exists_in_inbox(message.message_id):
            ack_message(message)  # Confirmar recebimento
            continue

        # 3. Persistir na tabela inbox
        db.execute("""
            INSERT INTO inbox (message_id, payload, status)
            VALUES ($1, $2, 'pending')
        """, message.message_id, message.payload)

        # 4. Confirmar para a fila (ack)
        ack_message(message)

        # 5. Processamento assíncrono
        process_pending_messages()

function process_pending_messages():
    messages = db.query("""
        SELECT * FROM inbox
        WHERE status = 'pending'
        ORDER BY created_at
        LIMIT 100
        FOR UPDATE SKIP LOCKED
    """)

    for msg in messages:
        try:
            # Executar lógica de negócio
            process_business_logic(msg.payload)

            # Marcar como processado
            db.execute("""
                UPDATE inbox
                SET status = 'processed', processed_at = NOW()
                WHERE message_id = $1
            """, msg.message_id)
        except Exception as e:
            # Registrar falha
            db.execute("""
                UPDATE inbox
                SET status = 'failed', retry_count = retry_count + 1,
                    last_error = $1
                WHERE message_id = $2
            """, str(e), msg.message_id)

4. Garantia de idempotência e ordenação

A idempotência é alcançada através de identificadores únicos (message_id) e validação antes da inserção.

Estratégias de desduplicação:

  1. Chave única: message_id como PK impede inserções duplicadas
  2. Janela temporal: Verificar se mensagem com mesmo conteúdo foi recebida nos últimos N minutos

Exemplo de lógica de idempotência

function process_message(message):
    # Tentar inserir com chave única
    try:
        db.execute("""
            INSERT INTO inbox (message_id, payload, status)
            VALUES ($1, $2, 'pending')
        """, message.id, message.payload)
    except DuplicateKeyError:
        # Mensagem já processada anteriormente
        log.info("Mensagem duplicada ignorada: " + message.id)
        return

    # Processamento normal
    result = execute_business_logic(message.payload)

    # Garantir que processamento seja idempotente
    if result == "already_processed":
        # Caso a lógica de negócio detecte duplicata
        mark_as_processed(message.id)

Tratamento de mensagens fora de ordem:

function reorder_messages():
    messages = db.query("""
        SELECT * FROM inbox
        WHERE status = 'pending'
        ORDER BY sequence_number ASC
    """)

    for msg in messages:
        if msg.sequence_number == expected_sequence:
            process_message(msg)
            expected_sequence += 1
        else:
            # Armazenar em buffer temporário
            buffer[msg.sequence_number] = msg

5. Tratamento de falhas e recuperação

Mecanismos de retry com backoff exponencial

function retry_failed_messages():
    failed = db.query("""
        SELECT * FROM inbox
        WHERE status = 'failed'
        AND retry_count < 5
        AND (NOW() - updated_at) > 
            INTERVAL '1 minute' * POWER(2, retry_count)
    """)

    for msg in failed:
        try:
            process_business_logic(msg.payload)
            mark_as_processed(msg.message_id)
        except Exception as e:
            if msg.retry_count >= 5:
                move_to_dlq(msg)  # Dead Letter Queue

Transações atômicas para evitar perda de dados:

# Usar transação para garantir atomicidade
BEGIN;
    INSERT INTO inbox (message_id, payload, status)
    VALUES ($1, $2, 'pending');

    COMMIT;  -- ou ROLLBACK em caso de falha

6. Otimizações e boas práticas para produção

Batch processing para alta vazão

function batch_process():
    messages = db.query("""
        UPDATE inbox SET status = 'processing'
        WHERE message_id IN (
            SELECT message_id FROM inbox
            WHERE status = 'pending'
            ORDER BY created_at
            LIMIT 500
            FOR UPDATE SKIP LOCKED
        )
        RETURNING *
    """)

    for msg in messages:
        process_business_logic(msg.payload)
        mark_as_processed(msg.message_id)

Expurgo de registros processados:

-- Job agendado para remover registros antigos
DELETE FROM inbox
WHERE status = 'processed'
AND processed_at < NOW() - INTERVAL '7 days';

Escalabilidade horizontal com particionamento:

-- Particionamento por data
CREATE TABLE inbox (
    message_id UUID,
    payload JSONB,
    status VARCHAR(20),
    created_at DATE
) PARTITION BY RANGE (created_at);

-- Criar partições mensais
CREATE TABLE inbox_2024_01 PARTITION OF inbox
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

7. Relação com outros padrões do ecossistema DDD

O padrão inbox complementa outros padrões de Domain-Driven Design:

  • Outbox: Enquanto o inbox garante recebimento confiável, o outbox assegura que eventos sejam enviados exatamente uma vez
  • Process Manager: Pode usar inbox para coordenar estados de longas transações
  • Saga: O inbox fornece persistência para mensagens de compensação

Exemplo de fluxo completo: Evento de domínio → inbox → processamento

# Serviço A publica evento de domínio
function publish_event(event):
    # Usar outbox para garantir envio
    outbox.insert(event)
    event_bus.publish(event)

# Serviço B consome evento via inbox
function consume_event(event):
    # 1. Inbox persiste antes de processar
    inbox.insert(event.id, event.payload)

    # 2. Processamento do evento
    if event.type == "OrderCreated":
        process_order(event.payload)

    # 3. Marcar como processado
    inbox.mark_processed(event.id)

Referências