Como usar o padrão inbox para garantir entrega de mensagens recebidas
1. Fundamentos do padrão inbox em sistemas distribuídos
Em sistemas distribuídos, a comunicação assíncrona entre serviços é essencial para escalabilidade e desacoplamento. No entanto, essa abordagem introduz um desafio crítico: como garantir que uma mensagem recebida seja processada exatamente uma vez, mesmo diante de falhas de rede, reinicializações de serviços ou concorrência?
O padrão inbox (ou "caixa de entrada") resolve esse problema ao persistir cada mensagem recebida em um armazenamento durável antes de qualquer processamento. Diferente de padrões como outbox (que foca no envio confiável) ou saga (que coordena transações distribuídas), o inbox garante que mensagens não sejam perdidas ou processadas múltiplas vezes.
Comparação com outros padrões:
| Padrão | Foco | Garantia |
|---|---|---|
| Inbox | Recebimento | Entrega exatamente uma vez |
| Outbox | Envio | Envio pelo menos uma vez |
| Saga | Consistência | Compensação de falhas |
| Retry | Recuperação | Repetição com backoff |
2. Arquitetura e componentes do padrão inbox
A arquitetura típica consiste em três componentes principais:
- Tabela/coleção inbox: Armazena mensagens recebidas com campos como
message_id,payload,status,created_ateprocessed_at - Consumidor: Responsável por realizar polling ou receber push de mensagens, persistindo-as antes do processamento
- Processador: Lê da tabela inbox, executa a lógica de negócio e marca como processado
Estrutura da tabela inbox em SQL:
CREATE TABLE inbox (
message_id UUID PRIMARY KEY,
payload JSONB NOT NULL,
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT NOW(),
processed_at TIMESTAMP,
retry_count INT DEFAULT 0,
last_error TEXT
);
CREATE INDEX idx_inbox_pending ON inbox (status, created_at)
WHERE status = 'pending';
3. Implementação prática do processamento de mensagens recebidas
O fluxo completo segue estas etapas:
Recepção → Persistência → Processamento → Remoção/Arquivamento
Exemplo de código: Consumidor com polling e persistência
# Pseudocódigo: Consumidor de fila com padrão inbox
function consume_messages():
while True:
# 1. Receber mensagem da fila (ex: RabbitMQ, Kafka)
message = queue.receive()
# 2. Verificar duplicidade
if exists_in_inbox(message.message_id):
ack_message(message) # Confirmar recebimento
continue
# 3. Persistir na tabela inbox
db.execute("""
INSERT INTO inbox (message_id, payload, status)
VALUES ($1, $2, 'pending')
""", message.message_id, message.payload)
# 4. Confirmar para a fila (ack)
ack_message(message)
# 5. Processamento assíncrono
process_pending_messages()
function process_pending_messages():
messages = db.query("""
SELECT * FROM inbox
WHERE status = 'pending'
ORDER BY created_at
LIMIT 100
FOR UPDATE SKIP LOCKED
""")
for msg in messages:
try:
# Executar lógica de negócio
process_business_logic(msg.payload)
# Marcar como processado
db.execute("""
UPDATE inbox
SET status = 'processed', processed_at = NOW()
WHERE message_id = $1
""", msg.message_id)
except Exception as e:
# Registrar falha
db.execute("""
UPDATE inbox
SET status = 'failed', retry_count = retry_count + 1,
last_error = $1
WHERE message_id = $2
""", str(e), msg.message_id)
4. Garantia de idempotência e ordenação
A idempotência é alcançada através de identificadores únicos (message_id) e validação antes da inserção.
Estratégias de desduplicação:
- Chave única:
message_idcomo PK impede inserções duplicadas - Janela temporal: Verificar se mensagem com mesmo conteúdo foi recebida nos últimos N minutos
Exemplo de lógica de idempotência
function process_message(message):
# Tentar inserir com chave única
try:
db.execute("""
INSERT INTO inbox (message_id, payload, status)
VALUES ($1, $2, 'pending')
""", message.id, message.payload)
except DuplicateKeyError:
# Mensagem já processada anteriormente
log.info("Mensagem duplicada ignorada: " + message.id)
return
# Processamento normal
result = execute_business_logic(message.payload)
# Garantir que processamento seja idempotente
if result == "already_processed":
# Caso a lógica de negócio detecte duplicata
mark_as_processed(message.id)
Tratamento de mensagens fora de ordem:
function reorder_messages():
messages = db.query("""
SELECT * FROM inbox
WHERE status = 'pending'
ORDER BY sequence_number ASC
""")
for msg in messages:
if msg.sequence_number == expected_sequence:
process_message(msg)
expected_sequence += 1
else:
# Armazenar em buffer temporário
buffer[msg.sequence_number] = msg
5. Tratamento de falhas e recuperação
Mecanismos de retry com backoff exponencial
function retry_failed_messages():
failed = db.query("""
SELECT * FROM inbox
WHERE status = 'failed'
AND retry_count < 5
AND (NOW() - updated_at) >
INTERVAL '1 minute' * POWER(2, retry_count)
""")
for msg in failed:
try:
process_business_logic(msg.payload)
mark_as_processed(msg.message_id)
except Exception as e:
if msg.retry_count >= 5:
move_to_dlq(msg) # Dead Letter Queue
Transações atômicas para evitar perda de dados:
# Usar transação para garantir atomicidade
BEGIN;
INSERT INTO inbox (message_id, payload, status)
VALUES ($1, $2, 'pending');
COMMIT; -- ou ROLLBACK em caso de falha
6. Otimizações e boas práticas para produção
Batch processing para alta vazão
function batch_process():
messages = db.query("""
UPDATE inbox SET status = 'processing'
WHERE message_id IN (
SELECT message_id FROM inbox
WHERE status = 'pending'
ORDER BY created_at
LIMIT 500
FOR UPDATE SKIP LOCKED
)
RETURNING *
""")
for msg in messages:
process_business_logic(msg.payload)
mark_as_processed(msg.message_id)
Expurgo de registros processados:
-- Job agendado para remover registros antigos
DELETE FROM inbox
WHERE status = 'processed'
AND processed_at < NOW() - INTERVAL '7 days';
Escalabilidade horizontal com particionamento:
-- Particionamento por data
CREATE TABLE inbox (
message_id UUID,
payload JSONB,
status VARCHAR(20),
created_at DATE
) PARTITION BY RANGE (created_at);
-- Criar partições mensais
CREATE TABLE inbox_2024_01 PARTITION OF inbox
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
7. Relação com outros padrões do ecossistema DDD
O padrão inbox complementa outros padrões de Domain-Driven Design:
- Outbox: Enquanto o inbox garante recebimento confiável, o outbox assegura que eventos sejam enviados exatamente uma vez
- Process Manager: Pode usar inbox para coordenar estados de longas transações
- Saga: O inbox fornece persistência para mensagens de compensação
Exemplo de fluxo completo: Evento de domínio → inbox → processamento
# Serviço A publica evento de domínio
function publish_event(event):
# Usar outbox para garantir envio
outbox.insert(event)
event_bus.publish(event)
# Serviço B consome evento via inbox
function consume_event(event):
# 1. Inbox persiste antes de processar
inbox.insert(event.id, event.payload)
# 2. Processamento do evento
if event.type == "OrderCreated":
process_order(event.payload)
# 3. Marcar como processado
inbox.mark_processed(event.id)
Referências
- Padrão Inbox - Microsoft Azure Architecture Center — Documentação oficial da Microsoft sobre o padrão inbox e sua implementação em arquiteturas de microsserviços
- Transactional Outbox Pattern - Martin Fowler — Análise detalhada do padrão outbox, complementar ao inbox, por Martin Fowler
- Reliable Message Delivery in Distributed Systems - AWS Documentation — Guia prático da AWS sobre padrões de entrega confiável de mensagens
- Idempotency Patterns - Stripe API Reference — Implementação de idempotência em APIs, conceito fundamental para o padrão inbox
- Handling Duplicate Messages - RabbitMQ Documentation — Estratégias para lidar com mensagens duplicadas em sistemas de mensageria
- Exactly-Once Processing in Kafka - Confluent Documentation — Guia da Confluent sobre processamento exatamente uma vez em Kafka
- Event Sourcing and CQRS - Greg Young — Documento seminal sobre padrões de eventos e consistência eventual