Redis Streams: processamento de eventos

1. Introdução aos Redis Streams e sua relação com Bancos de Dados

1.1. O que são Redis Streams: estrutura de dados baseada em log append-only

Redis Streams é uma estrutura de dados introduzida no Redis 5.0 que implementa um log de eventos append-only. Cada entrada no stream é composta por um identificador único (ID) e um conjunto de pares chave-valor (field-value). Diferentemente de listas ou pub/sub, os streams oferecem persistência, ordenação temporal garantida e suporte nativo a grupos de consumidores.

1.2. Comparação conceitual: Streams vs. Tabelas de eventos em SQL (event sourcing)

Enquanto tabelas SQL armazenam estado atual, streams Redis armazenam sequências imutáveis de eventos. Em um banco relacional, uma tabela de eventos (event sourcing) registra mudanças com timestamps e versões. Redis Streams oferecem desempenho superior para ingestão de alta frequência (milhares de eventos por segundo) e consumo concorrente, porém sem suporte a consultas ad-hoc complexas como JOINs ou agregações SQL.

1.3. Casos de uso típicos: filas de mensagens, ingestão de eventos, pipelines de dados

Redis Streams são ideais para filas de mensagens confiáveis, ingestão de eventos de sensores IoT, pipelines de dados em tempo real, sistemas de notificação e processamento de pedidos em e-commerce.

2. Estrutura e Comandos Fundamentais dos Streams

2.1. Identificadores de mensagens (ID automático vs. manual) e ordenação temporal

Cada mensagem recebe um ID no formato <timestamp>-<sequencial>. O Redis gera automaticamente IDs baseados no relógio do servidor, garantindo ordenação global. IDs manuais podem ser usados para replicação ou integração com sistemas externos.

2.2. Comandos essenciais: XADD, XRANGE, XREVRANGE, XLEN

# Adicionar evento ao stream
XADD pedidos * evento "criado" pedido_id 12345 cliente "João"

# Listar todos os eventos (do início ao fim)
XRANGE pedidos - +

# Listar eventos em ordem reversa (mais recentes primeiro)
XREVRANGE pedidos + -

# Obter tamanho do stream
XLEN pedidos

2.3. Grupos de consumidores (Consumer Groups): conceito e criação com XGROUP CREATE

Grupos de consumidores permitem distribuir mensagens entre múltiplos workers, garantindo que cada mensagem seja processada por apenas um consumidor dentro do grupo.

# Criar grupo de consumidores
XGROUP CREATE pedidos grupo_processamento $

# Listar grupos existentes
XINFO GROUPS pedidos

3. Produção e Consumo de Eventos: Padrões de Leitura

3.1. Consumo independente: XREAD com bloqueio e sem bloqueio

# Leitura sem bloqueio (retorna imediatamente se não houver mensagens)
XREAD COUNT 10 STREAMS pedidos 0

# Leitura com bloqueio (aguarda até 5 segundos por novas mensagens)
XREAD BLOCK 5000 COUNT 10 STREAMS pedidos $

3.2. Consumo em grupo: XREADGROUP e distribuição de carga entre workers

# Consumidor 1 lê do grupo
XREADGROUP GROUP grupo_processamento worker_1 COUNT 5 STREAMS pedidos >

# Consumidor 2 lê do grupo (receberá mensagens diferentes)
XREADGROUP GROUP grupo_processamento worker_2 COUNT 5 STREAMS pedidos >

O símbolo > indica que o consumidor deseja apenas mensagens nunca entregues a outros membros do grupo.

3.3. Acknowledgment de mensagens: XACK e garantia de entrega (at-least-once)

# Confirmar processamento de mensagem específica
XACK pedidos grupo_processamento 1735689600000-0

Sem o XACK, a mensagem permanece na Pending Entries List (PEL) e pode ser reentregue, garantindo entrega pelo menos uma vez (at-least-once).

4. Gerenciamento de Estado e Persistência no Consumo

4.1. Pending Entries List (PEL): rastreamento de mensagens não confirmadas

A PEL mantém todas as mensagens entregues mas não confirmadas por cada consumidor. Isso permite rastrear falhas e reprocessar mensagens.

# Ver mensagens pendentes do consumidor worker_1
XPENDING pedidos grupo_processamento - + 10 worker_1

4.2. Recuperação de falhas: XCLAIM para reatribuir mensagens pendentes

Quando um consumidor falha, outro pode reivindicar suas mensagens pendentes após um período de inatividade.

# Reivindicar mensagens pendentes há mais de 60 segundos
XCLAIM pedidos grupo_processamento worker_2 60000 1735689600000-0

4.3. Dead Letter Queue (DLQ) manual: tratamento de mensagens falhas repetidamente

Para mensagens que falham repetidamente, implementa-se uma DLQ movendo-as para um stream separado.

# Mover para DLQ após 3 tentativas
XADD pedidos_dlq * original_id 1735689600000-0 motivo "timeout"
XDEL pedidos 1735689600000-0

5. Estratégias de Limpeza e Controle de Tamanho do Stream

5.1. Trimming: XTRIM com estratégias MAXLEN e MINID

# Manter apenas os 1000 eventos mais recentes
XTRIM pedidos MAXLEN 1000

# Manter apenas eventos após ID específico
XTRIM pedidos MINID 1735689600000

# Trimming aproximado (melhor performance)
XTRIM pedidos MAXLEN ~ 1000

5.2. Políticas de retenção: quanto tempo manter eventos no stream

Recomenda-se definir políticas baseadas em tempo (ex: 7 dias) usando MINID com IDs baseados em timestamp, combinado com jobs de limpeza periódica.

5.3. Diferenças entre trimming aproximado (~) e exato: performance vs. precisão

O trimming exato garante o limite especificado mas pode ser mais lento (O(N)). O aproximado (~) permite que o Redis remova nós inteiros do radix tree, sendo mais eficiente para streams grandes.

6. Integração com Bancos de Dados Relacionais (SQL)

6.1. Padrão de arquitetura: stream como buffer de eventos antes da persistência em SQL

Redis Stream atua como buffer de alta velocidade, absorvendo picos de eventos. Um consumidor assíncrono persiste os dados em PostgreSQL/MySQL, desacoplando a ingestão da persistência.

6.2. Sincronização assíncrona: consumidor que insere dados em tabelas PostgreSQL/MySQL

# Consumidor em Python (exemplo conceitual)
import redis
import psycopg2

r = redis.Redis()
conn = psycopg2.connect("dbname=pedidos")

while True:
    mensagens = r.xreadgroup("grupo_processamento", "persistidor", {"pedidos": ">"}, count=100, block=5000)
    for stream, msgs in mensagens:
        for msg_id, data in msgs:
            conn.execute(
                "INSERT INTO eventos_pedido (evento, pedido_id, cliente, timestamp) VALUES (%s, %s, %s, NOW())",
                (data[b'evento'].decode(), data[b'pedido_id'].decode(), data[b'cliente'].decode())
            )
            r.xack("pedidos", "grupo_processamento", msg_id)
    conn.commit()

6.3. Garantia de consistência eventual: reconciliando dados entre Redis Stream e banco SQL

A consistência eventual é alcançada através do padrão "outbox": eventos são primeiro escritos no stream, depois persistidos no SQL. Em caso de falha, o PEL permite reprocessamento até que todos os eventos sejam confirmados no banco.

7. Monitoramento, Observabilidade e Boas Práticas

7.1. Métricas-chave: tamanho do stream, número de mensagens pendentes, lag do consumidor

As principais métricas a monitorar são: XLEN (tamanho atual), XPENDING (mensagens não confirmadas), e o lag entre o último evento produzido e o último consumido.

7.2. Comandos de diagnóstico: XINFO STREAM, XINFO GROUPS, XINFO CONSUMERS

# Informações detalhadas do stream
XINFO STREAM pedidos

# Informações dos grupos de consumidores
XINFO GROUPS pedidos

# Informações de consumidores específicos
XINFO CONSUMERS pedidos grupo_processamento

7.3. Boas práticas: nomeação de streams, tamanho de batch, configuração de timeout

  • Nomeie streams com prefixos de domínio: pedidos:eventos, notificacoes:email
  • Use batches de 50-200 mensagens por leitura para balancear latência e throughput
  • Configure timeouts de bloqueio entre 1-5 segundos para evitar conexões ociosas
  • Sempre implemente XACK após processamento bem-sucedido

8. Exemplo Prático: Pipeline de Eventos com Redis Streams e PostgreSQL

8.1. Cenário: sistema de pedidos com eventos de criação e atualização

Sistema de e-commerce onde pedidos geram eventos: criado, atualizado, pago, enviado. O stream Redis absorve eventos em tempo real, e um consumidor persiste em PostgreSQL.

8.2. Código de produção (produtor) e consumo (consumidor) com acknowledgment

# Produtor (exemplo em Python)
import redis
import json

r = redis.Redis()
pedido = {"pedido_id": 12345, "cliente": "Maria", "total": 250.00}
r.xadd("pedidos:eventos", {"evento": "criado", "dados": json.dumps(pedido)})

# Consumidor com acknowledgment
while True:
    resultados = r.xreadgroup("grupo_pedidos", "worker_1", {"pedidos:eventos": ">"}, count=10, block=2000)
    for stream, mensagens in resultados:
        for msg_id, dados in mensagens:
            try:
                processar_pedido(dados)  # Função que persiste no PostgreSQL
                r.xack("pedidos:eventos", "grupo_pedidos", msg_id)
            except Exception as e:
                # Registra falha e move para DLQ após 3 tentativas
                r.xadd("pedidos:dlq", {"msg_id": msg_id, "erro": str(e)})

8.3. Persistência final em tabela SQL e consulta dos eventos armazenados

-- Tabela de eventos no PostgreSQL
CREATE TABLE eventos_pedido (
    id SERIAL PRIMARY KEY,
    evento VARCHAR(50),
    pedido_id INTEGER,
    dados JSONB,
    criado_em TIMESTAMP DEFAULT NOW()
);

-- Consulta: histórico completo de um pedido
SELECT evento, dados, criado_em
FROM eventos_pedido
WHERE pedido_id = 12345
ORDER BY criado_em;

Referências