Mensageria com RabbitMQ: exchanges, filas e padrões de roteamento

1. Fundamentos do RabbitMQ e o Modelo AMQP

RabbitMQ é um broker de mensagens open-source que implementa o protocolo AMQP 0-9-1, amplamente utilizado em arquiteturas de microsserviços e sistemas distribuídos. Seu papel principal é desacoplar produtores e consumidores de mensagens, permitindo comunicação assíncrona, tolerante a falhas e escalável.

O modelo AMQP 0-9-1 define três componentes principais:

  • Mensagens: pacotes de dados com payload (body) e metadados (headers, propriedades)
  • Exchanges: pontos de entrada que recebem mensagens e as roteiam para filas
  • Filas: buffers que armazenam mensagens até serem consumidas
  • Bindings: regras que conectam exchanges a filas, definindo critérios de roteamento

Para instalar o RabbitMQ localmente, utilize Docker:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

O plugin de gerenciamento fica disponível em http://localhost:15672 (usuário: guest, senha: guest).

2. Exchanges: Tipos, Comportamento e Casos de Uso

Exchanges são responsáveis por receber mensagens dos produtores e direcioná-las para filas com base em regras de roteamento. O RabbitMQ oferece quatro tipos principais:

Direct Exchange

Roteia mensagens para filas cuja chave de roteamento (routing key) corresponde exatamente à chave especificada na mensagem.

# Declaração de exchange direct
channel.exchange_declare(exchange='logs_direct', exchange_type='direct')

# Publicação com routing key específica
channel.basic_publish(exchange='logs_direct', routing_key='error', body='Erro crítico no sistema')

Topic Exchange

Permite roteamento flexível usando padrões com curingas: * substitui uma palavra, # substitui zero ou mais palavras.

# Declaração de exchange topic
channel.exchange_declare(exchange='logs_topic', exchange_type='topic')

# Binding com padrão: recebe mensagens de qualquer serviço com severidade error
channel.queue_bind(exchange='logs_topic', queue='error_queue', routing_key='*.error')

Fanout Exchange

Realiza broadcast: toda mensagem recebida é enviada para todas as filas vinculadas, ignorando a chave de roteamento.

# Declaração de exchange fanout
channel.exchange_declare(exchange='broadcast', exchange_type='fanout')

# Todas as filas vinculadas receberão a mensagem
channel.basic_publish(exchange='broadcast', routing_key='', body='Mensagem para todos')

Headers Exchange

Roteia mensagens com base em atributos do cabeçalho (headers), não na chave de roteamento.

# Declaração de exchange headers
channel.exchange_declare(exchange='headers_exchange', exchange_type='headers')

# Binding com argumentos de cabeçalho
args = {'x-match': 'all', 'format': 'json', 'type': 'user'}
channel.queue_bind(exchange='headers_exchange', queue='json_users', arguments=args)

3. Filas e Mensagens: Ciclo de Vida e Propriedades Essenciais

Declaração de Filas

Filas podem ser configuradas com propriedades que definem seu comportamento:

# Fila durável (sobrevive a reinicializações do broker)
channel.queue_declare(queue='tasks', durable=True)

# Fila exclusiva (deletada quando a conexão é fechada)
channel.queue_declare(queue='temp_queue', exclusive=True)

# Fila com TTL (mensagens expiram após 60 segundos)
channel.queue_declare(queue='ttl_queue', arguments={'x-message-ttl': 60000})

Propriedades da Mensagem

Cada mensagem pode incluir propriedades que controlam sua entrega:

# Mensagem persistente (não perdida em falhas do broker)
properties = pika.BasicProperties(delivery_mode=2, priority=5)
channel.basic_publish(exchange='', routing_key='queue', body='dados', properties=properties)

Modos de Entrega

  • Push (Consumer): consumidor registra callback e recebe mensagens automaticamente
  • Pull (Basic.Get): consumidor solicita mensagens sob demanda
# Modo Push
channel.basic_consume(queue='tasks', on_message_callback=callback, auto_ack=True)

# Modo Pull
method_frame, header_frame, body = channel.basic_get(queue='tasks')

4. Padrões de Roteamento e Binding

Publish/Subscribe com Fanout

Ideal para notificações em tempo real onde todos os consumidores devem receber a mesma mensagem:

# Produtor publica em exchange fanout
channel.basic_publish(exchange='notifications', routing_key='', body='Novo evento')

# Cada consumidor cria fila anônima e vincula à exchange
result = channel.queue_declare(queue='', exclusive=True)
channel.queue_bind(exchange='notifications', queue=result.method.queue)

Routing com Direct

Seleção seletiva baseada em severidade ou tipo:

# Bindings com diferentes routing keys
channel.queue_bind(exchange='logs_direct', queue='critical_queue', routing_key='critical')
channel.queue_bind(exchange='logs_direct', queue='warning_queue', routing_key='warning')

Topics para Roteamento Hierárquico

Permite criar padrões complexos como usuario.cadastro.sucesso:

# Binding: recebe todos os eventos de cadastro
channel.queue_bind(exchange='events', queue='cadastro_queue', routing_key='usuario.cadastro.*')

5. Garantias de Entrega e Confiabilidade

Publisher Confirms

Garante que o broker recebeu a mensagem:

channel.confirm_delivery()
try:
    channel.basic_publish(exchange='', routing_key='queue', body='dados', mandatory=True)
    print("Mensagem confirmada")
except pika.exceptions.UnroutableError:
    print("Mensagem não roteada")

Consumer Acknowledgements

Controle manual de confirmação para processamento seguro:

def callback(ch, method, properties, body):
    try:
        processar(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)  # Confirma sucesso
    except Exception:
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)  # Rejeita e reenvia

Dead Letter Exchange (DLX)

Mensagens rejeitadas ou expiradas são redirecionadas para fila de mensagens mortas:

# Configuração da fila com DLX
args = {
    'x-dead-letter-exchange': 'dlx_exchange',
    'x-dead-letter-routing-key': 'dead_letter'
}
channel.queue_declare(queue='main_queue', arguments=args)

6. Padrões de Integração com RabbitMQ

Work Queues (Distribuição de Tarefas)

Múltiplos consumidores competem por mensagens, ideal para processamento paralelo:

# Configuração para distribuição justa (fair dispatch)
channel.basic_qos(prefetch_count=1)

RPC com Filas de Callback

Implementação de chamada remota síncrona sobre mensageria:

# Produtor envia mensagem com correlation_id e fila de callback
props = pika.BasicProperties(
    reply_to='rpc_callback_queue',
    correlation_id=str(uuid.uuid4())
)
channel.basic_publish(exchange='', routing_key='rpc_queue', properties=props, body='request')

Padrão Saga para Transações Distribuídas

Orquestração de transações compensatórias em microsserviços:

# Fila para eventos de compensação (rollback)
channel.queue_declare(queue='saga_compensation', durable=True)
channel.queue_bind(exchange='saga_events', queue='saga_compensation', routing_key='compensate.*')

7. Boas Práticas, Monitoramento e Performance

Gerenciamento de Conexões e Canais

Reutilize conexões e canais para evitar overhead:

# Pool de canais (recomendado para alta concorrência)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

Monitoramento com Management Plugin

Métricas essenciais no dashboard web (porta 15672):

  • Queue depth: profundidade da fila (mensagens não consumidas)
  • Message rates: taxas de publicação e consumo
  • Consumer count: número de consumidores ativos
  • Unroutable messages: mensagens sem binding correspondente

Configuração de Cluster e Alta Disponibilidade

# Configuração de fila quorum para alta disponibilidade
args = {'x-queue-type': 'quorum'}
channel.queue_declare(queue='ha_queue', arguments=args, durable=True)

Comparação com Alternativas

Característica RabbitMQ Kafka NATS
Modelo Fila + Exchange Log distribuído Pub/Sub leve
Persistência Opcional Obrigatória Opcional
Ordenação Por fila Por partição Por tópico
Throughput Médio Alto Muito alto

Referências