Como usar NATS para mensageria leve e rápida entre serviços

1. Introdução ao NATS e seus fundamentos

NATS (Neural Autonomic Transport System) é um sistema de mensageria open-source projetado para alta performance, baixa latência e simplicidade operacional. Diferente de soluções como RabbitMQ (focado em roteamento complexo com AMQP) ou Kafka (otimizado para streams massivos e replay), o NATS prioriza a velocidade de entrega e a leveza, sendo ideal para comunicação entre microsserviços em tempo real.

Os principais conceitos do NATS incluem:

  • Publish/Subscribe: mensagens são publicadas em tópicos e todos os assinantes recebem uma cópia.
  • Requisição-Resposta: padrão síncrono onde um publisher aguarda a resposta de um subscriber.
  • Queue Groups: distribuição de carga entre múltiplos workers que competem por mensagens de um mesmo tópico.

Comparado ao RabbitMQ, NATS é mais simples (sem exchanges complexas) e mais rápido. Comparado ao Kafka, NATS é mais leve (sem persistência por padrão) e adequado para cenários onde a perda eventual de mensagens é aceitável. Para ambientes que exigem entrega garantida, o NATS oferece o módulo JetStream.

2. Instalação e configuração do servidor NATS

A instalação mais rápida é via Docker:

docker pull nats:latest
docker run -p 4222:4222 -p 8222:8222 nats

Para instalação local com binário (Linux/macOS):

curl -L https://github.com/nats-io/nats-server/releases/download/v2.10.22/nats-server-v2.10.22-linux-amd64.tar.gz -o nats.tar.gz
tar -xzf nats.tar.gz
cd nats-server-v2.10.22-linux-amd64
./nats-server

Configuração básica com autenticação por token (server.conf):

port: 4222
http_port: 8222

authorization {
  token: "meu-token-secreto"
}

log_file: "/var/log/nats-server.log"

Executando cluster básico (3 nós):

nats-server -p 4222 -cluster nats://0.0.0.0:6222 -routes nats://user:pass@nó1:6222,nats://user:pass@nó2:6222

3. Conectando serviços com clientes NATS em Python

Instale a biblioteca oficial:

pip install nats-py

Exemplo básico de conexão e publish/subscribe:

import asyncio
import nats

async def main():
    nc = await nats.connect("nats://localhost:4222")

    # Assinante
    async def msg_handler(msg):
        print(f"Recebido: {msg.data.decode()} no tópico {msg.subject}")

    sub = await nc.subscribe("pedidos.novos", cb=msg_handler)

    # Publicador
    await nc.publish("pedidos.novos", b"Novo pedido #12345")

    await asyncio.sleep(1)
    await nc.close()

asyncio.run(main())

Tratamento de reconexão automática:

async def main():
    nc = await nats.connect(
        "nats://localhost:4222",
        reconnect_time_wait=2,
        max_reconnect_attempts=10,
        error_cb=lambda e: print(f"Erro: {e}"),
        disconnected_cb=lambda: print("Desconectado"),
        reconnected_cb=lambda: print("Reconectado")
    )

4. Padrões de comunicação: requisição-resposta e filas

Requisição-resposta (serviço de validação):

# Cliente
async def valida_pedido(pedido_id):
    msg = await nc.request("validacao.pedido", f"validar:{pedido_id}".encode(), timeout=5)
    return msg.data.decode()

# Servidor
async def handler(msg):
    resposta = "aprovado" if "12345" in msg.data.decode() else "rejeitado"
    await msg.respond(resposta.encode())

await nc.subscribe("validacao.pedido", cb=handler)

Queue Groups para balanceamento de carga entre workers:

async def worker(worker_id):
    async def handler(msg):
        print(f"Worker {worker_id} processando: {msg.data.decode()}")
        await asyncio.sleep(1)
        await msg.respond(b"processado")

    await nc.subscribe("fila.processamento", queue="workers", cb=handler)
    await asyncio.Future()

# Inicie 3 workers concorrentes
await asyncio.gather(
    worker(1), worker(2), worker(3)
)

5. Garantias de entrega e persistência com JetStream

JetStream adiciona persistência e replay ao NATS. Exemplo de stream durável:

# Criar stream
js = nc.jetstream()
await js.add_stream(name="pedidos", subjects=["pedidos.*"])

# Publicar com persistência
ack = await js.publish("pedidos.novos", b"Pedido persistente", stream="pedidos")

# Consumidor durável
await js.subscribe(
    "pedidos.novos",
    stream="pedidos",
    durable="consumidor-1",
    cb=msg_handler
)

Diferenças principais:
- Core NATS: sem persistência, entrega best-effort, máximo desempenho.
- JetStream: persistência em disco, replay, entrega garantida, consumidores duráveis.

6. Segurança e boas práticas em ambientes de produção

Autenticação com TLS:

# Configuração do servidor
tls {
  cert_file: "/certs/server.crt"
  key_file: "/certs/server.key"
  ca_file: "/certs/ca.crt"
}

authorization {
  users = [
    {user: "servico-a", password: "senha-segura", permissions: {publish: "pedidos.>", subscribe: "notificacoes.>"}},
    {user: "servico-b", password: "outra-senha", permissions: {publish: "notificacoes.>", subscribe: "pedidos.>"}}
  ]
}

Monitoramento via endpoint HTTP:

http_port: 8222
# Acesse http://localhost:8222 para métricas

7. Casos de uso reais e integração com microsserviços

Exemplo: sistema de notificações de e-commerce com NATS:

# Serviço de pedidos
async def notificar_estoque(pedido):
    await nc.publish("estoque.reserva", f"reservar:{pedido['id']}:{pedido['item']}".encode())

# Serviço de estoque
async def handler(msg):
    dados = msg.data.decode().split(":")
    if dados[0] == "reservar":
        print(f"Reservando item {dados[2]} para pedido {dados[1]}")
        await msg.respond(b"reservado")

# Serviço de notificação
async def handler(msg):
    dados = msg.data.decode()
    if "reservado" in dados:
        await nc.publish("email.enviar", f"Pedido confirmado: {dados}".encode())

Integração com Prometheus (exportar métricas):

nats-server -m 8222
# Configure o Prometheus para scrapar http://localhost:8222/metrics

8. Considerações finais e próximos passos

NATS é excelente para comunicação leve entre microsserviços, IoT, e sistemas que exigem baixa latência. Suas limitações incluem a falta de roteamento complexo (como RabbitMQ) e a necessidade de JetStream para persistência (ao contrário do Kafka que já nasce com isso). Para cenários de streaming massivo com retenção longa, Kafka ainda é superior.

Checklist para adoção:
- [ ] Definir tópicos com padrão hierárquico (ex: dominio.acao.id)
- [ ] Escolher entre Core NATS (performance) e JetStream (garantia)
- [ ] Configurar autenticação e TLS desde o início
- [ ] Implementar health checks e reconexão automática
- [ ] Monitorar com Prometheus/Grafana

Referências