Redis com Python: cache e filas

1. Introdução ao Redis e sua integração com Python

Redis é um banco de dados em memória, open-source, que funciona como estrutura de chave-valor. Sua principal característica é a velocidade: operações são concluídas em milissegundos, tornando-o ideal para cache, filas, sessões e contadores. Diferente de bancos relacionais, o Redis armazena dados na RAM, com persistência opcional em disco.

Para integrar Redis com Python, utilizamos a biblioteca redis-py. A instalação é simples:

pip install redis

Se você não tem Redis instalado localmente, use Docker:

docker run --name redis -p 6379:6379 -d redis:7-alpine

Conecte-se ao Redis de duas formas principais:

import redis

# Conexão direta
r = redis.Redis(host='localhost', port=6379, db=0)

# Ou via URL
r = redis.from_url('redis://localhost:6379/0')

# Testando a conexão
print(r.ping())  # True

2. Operações básicas com Redis em Python

Os comandos fundamentais do Redis são mapeados diretamente para métodos Python:

# String operations
r.set('nome', 'Alice')
print(r.get('nome'))  # b'Alice'
print(r.exists('nome'))  # 1

# Expiração
r.expire('nome', 60)  # expira em 60 segundos
r.setex('temp', 30, 'valor temporário')  # set + expire

# Delete
r.delete('nome')

Trabalhando com diferentes tipos de dados:

# Listas
r.lpush('fila', 'tarefa1', 'tarefa2')
r.rpush('fila', 'tarefa3')
print(r.lrange('fila', 0, -1))  # [b'tarefa2', b'tarefa1', b'tarefa3']

# Hashes
r.hset('usuario:1', mapping={'nome': 'João', 'idade': 30})
print(r.hgetall('usuario:1'))

# Sets
r.sadd('tags', 'python', 'redis', 'cache')
print(r.smembers('tags'))

# Sorted Sets
r.zadd('ranking', {'Alice': 100, 'Bob': 85, 'Carol': 92})
print(r.zrange('ranking', 0, -1, withscores=True))

Para serializar objetos Python complexos, use JSON ou pickle:

import json
import pickle

# JSON (recomendado para interoperabilidade)
dados = {'usuario': 42, 'itens': [1, 2, 3]}
r.set('cache:json', json.dumps(dados))
recuperado = json.loads(r.get('cache:json'))

# Pickle (para objetos Python arbitrários)
r.set('cache:obj', pickle.dumps(dados))
recuperado_pickle = pickle.loads(r.get('cache:obj'))

3. Implementando cache com Redis

A estratégia mais comum é o cache-aside: o aplicativo verifica o cache antes de consultar a fonte primária.

import time
import json

def get_user_data(user_id):
    cache_key = f"user:{user_id}:data"

    # Tenta obter do cache
    cached = r.get(cache_key)
    if cached:
        print("Cache HIT")
        return json.loads(cached)

    print("Cache MISS - consultando banco...")
    # Simula consulta ao banco
    time.sleep(1)
    user_data = {
        "id": user_id,
        "nome": f"Usuário {user_id}",
        "email": f"user{user_id}@email.com"
    }

    # Armazena no cache com TTL de 5 minutos
    r.setex(cache_key, 300, json.dumps(user_data))
    return user_data

# Teste
print(get_user_data(1))  # Cache MISS
print(get_user_data(1))  # Cache HIT

Configuração de TTL com setex e psetex:

# setex: TTL em segundos
r.setex('chave:segundos', 60, 'valor')

# psetex: TTL em milissegundos
r.psetex('chave:ms', 3000, 'valor')

4. Padrões avançados de cache

Cache invalidation com namespaces:

def invalidate_user_cache(user_id):
    """Remove todas as chaves de cache relacionadas a um usuário"""
    pattern = f"user:{user_id}:*"
    for key in r.scan_iter(match=pattern):
        r.delete(key)

# Invalidação seletiva
invalidate_user_cache(1)

Pipelines para operações em lote:

# Sem pipeline - múltiplas requisições
for i in range(100):
    r.set(f"key:{i}", f"value:{i}")

# Com pipeline - uma única requisição
pipe = r.pipeline()
for i in range(100):
    pipe.set(f"key:{i}", f"value:{i}")
pipe.execute()  # Executa todas de uma vez

# Pipeline com transação
pipe = r.pipeline(transaction=True)
pipe.incr('contador')
pipe.expire('contador', 60)
pipe.execute()

5. Redis como sistema de filas

Redis pode atuar como fila FIFO simples usando listas:

# Produtor
def enfileirar_tarefa(tarefa):
    r.lpush('fila:tarefas', json.dumps(tarefa))
    print(f"Tarefa enfileirada: {tarefa}")

# Consumidor
def processar_tarefa():
    # lpop: não bloqueante
    tarefa = r.lpop('fila:tarefas')
    if tarefa:
        dados = json.loads(tarefa)
        print(f"Processando: {dados}")
        return dados
    return None

# Consumidor bloqueante (espera até 30s)
def consumidor_bloqueante():
    while True:
        tarefa = r.blpop('fila:tarefas', timeout=30)
        if tarefa:
            _, dados = tarefa
            print(f"Processando: {json.loads(dados)}")

Monitoramento da fila:

tamanho = r.llen('fila:tarefas')
print(f"Tamanho da fila: {tamanho}")

6. Filas com Redis Streams

Redis Streams oferece funcionalidades mais robustas para filas:

import time

# Produtor - adiciona mensagens ao stream
for i in range(5):
    mensagem_id = r.xadd('stream:tarefas', 
                         {'tarefa': f'tarefa_{i}', 'status': 'pendente'})
    print(f"Mensagem {mensagem_id} adicionada")
    time.sleep(0.5)

# Consumidor simples
mensagens = r.xread({'stream:tarefas': '0'}, count=10, block=0)
for stream, msgs in mensagens:
    for msg_id, dados in msgs:
        print(f"Processando {msg_id}: {dados}")
        r.xdel('stream:tarefas', msg_id)  # Remove após processar

# Grupos de consumidores
r.xgroup_create('stream:tarefas', 'grupo_processadores', id='0', mkstream=True)

def consumidor_grupo(nome_grupo, nome_consumidor):
    while True:
        resultados = r.xreadgroup(nome_grupo, nome_consumidor,
                                  {'stream:tarefas': '>'}, count=1, block=1000)
        if resultados:
            for stream, msgs in resultados:
                for msg_id, dados in msgs:
                    print(f"[{nome_consumidor}] Processando: {dados}")
                    r.xack('stream:tarefas', nome_grupo, msg_id)

Comparação: Streams são mais adequados para produção por oferecerem persistência, grupos de consumidores e confirmação de processamento. Listas são mais simples e performáticas para filas temporárias.

7. Tratamento de erros e boas práticas

Pool de conexões para gerenciar múltiplas conexões:

from redis import ConnectionPool

pool = ConnectionPool(host='localhost', port=6379, db=0, max_connections=10)
r = redis.Redis(connection_pool=pool)

# Reutilize a conexão em toda a aplicação

Tratamento de exceções:

from redis.exceptions import ConnectionError, TimeoutError

try:
    r.ping()
except ConnectionError:
    print("Redis indisponível - usando fallback")
    # Lógica de fallback
except TimeoutError:
    print("Timeout na conexão Redis")

Monitoramento:

# Métricas de cache
cache_hits = 0
cache_misses = 0

def get_with_metrics(key):
    global cache_hits, cache_misses
    if r.exists(key):
        cache_hits += 1
        return r.get(key)
    cache_misses += 1
    return None

print(f"Hit rate: {cache_hits/(cache_hits+cache_misses):.2%}")

8. Exemplo completo: sistema de cache e fila integrado

Aplicação Flask com cache de respostas e fila de tarefas:

from flask import Flask, jsonify, request
import redis
import json
import time
import threading

app = Flask(__name__)
r = redis.Redis(connection_pool=ConnectionPool())

# Cache de respostas de API externa
def get_external_data(id):
    cache_key = f"api:data:{id}"
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)

    # Simula chamada externa
    time.sleep(2)
    data = {"id": id, "resultado": f"dados do item {id}"}
    r.setex(cache_key, 60, json.dumps(data))
    return data

@app.route('/dados/<int:id>')
def dados(id):
    data = get_external_data(id)
    return jsonify(data)

# Fila de tarefas assíncronas
@app.route('/processar', methods=['POST'])
def enfileirar():
    tarefa = request.json
    r.lpush('fila:processamento', json.dumps(tarefa))
    return jsonify({"status": "enfileirado"}), 202

def worker():
    """Consumidor da fila"""
    while True:
        tarefa = r.brpop('fila:processamento', timeout=5)
        if tarefa:
            _, dados = tarefa
            dados = json.loads(dados)
            print(f"Processando: {dados}")
            # Simula processamento
            time.sleep(3)
            # Salva resultado
            r.setex(f"resultado:{dados['id']}", 600, 
                    json.dumps({"status": "concluído"}))

# Inicia worker em thread separada
threading.Thread(target=worker, daemon=True).start()

if __name__ == '__main__':
    app.run(debug=True)

Teste do fluxo completo:

# Teste de cache
curl http://localhost:5000/dados/1  # Primeira chamada: 2s
curl http://localhost:5000/dados/1  # Segunda chamada: instantâneo

# Teste de fila
curl -X POST http://localhost:5000/processar \
  -H "Content-Type: application/json" \
  -d '{"id": 1, "tipo": "relatorio"}'

O sistema utiliza cache para acelerar respostas repetidas e fila para processamento assíncrono, demonstrando os dois principais casos de uso do Redis com Python.

Referências