Redis com Python: cache e filas
1. Introdução ao Redis e sua integração com Python
Redis é um banco de dados em memória, open-source, que funciona como estrutura de chave-valor. Sua principal característica é a velocidade: operações são concluídas em milissegundos, tornando-o ideal para cache, filas, sessões e contadores. Diferente de bancos relacionais, o Redis armazena dados na RAM, com persistência opcional em disco.
Para integrar Redis com Python, utilizamos a biblioteca redis-py. A instalação é simples:
pip install redis
Se você não tem Redis instalado localmente, use Docker:
docker run --name redis -p 6379:6379 -d redis:7-alpine
Conecte-se ao Redis de duas formas principais:
import redis
# Conexão direta
r = redis.Redis(host='localhost', port=6379, db=0)
# Ou via URL
r = redis.from_url('redis://localhost:6379/0')
# Testando a conexão
print(r.ping()) # True
2. Operações básicas com Redis em Python
Os comandos fundamentais do Redis são mapeados diretamente para métodos Python:
# String operations
r.set('nome', 'Alice')
print(r.get('nome')) # b'Alice'
print(r.exists('nome')) # 1
# Expiração
r.expire('nome', 60) # expira em 60 segundos
r.setex('temp', 30, 'valor temporário') # set + expire
# Delete
r.delete('nome')
Trabalhando com diferentes tipos de dados:
# Listas
r.lpush('fila', 'tarefa1', 'tarefa2')
r.rpush('fila', 'tarefa3')
print(r.lrange('fila', 0, -1)) # [b'tarefa2', b'tarefa1', b'tarefa3']
# Hashes
r.hset('usuario:1', mapping={'nome': 'João', 'idade': 30})
print(r.hgetall('usuario:1'))
# Sets
r.sadd('tags', 'python', 'redis', 'cache')
print(r.smembers('tags'))
# Sorted Sets
r.zadd('ranking', {'Alice': 100, 'Bob': 85, 'Carol': 92})
print(r.zrange('ranking', 0, -1, withscores=True))
Para serializar objetos Python complexos, use JSON ou pickle:
import json
import pickle
# JSON (recomendado para interoperabilidade)
dados = {'usuario': 42, 'itens': [1, 2, 3]}
r.set('cache:json', json.dumps(dados))
recuperado = json.loads(r.get('cache:json'))
# Pickle (para objetos Python arbitrários)
r.set('cache:obj', pickle.dumps(dados))
recuperado_pickle = pickle.loads(r.get('cache:obj'))
3. Implementando cache com Redis
A estratégia mais comum é o cache-aside: o aplicativo verifica o cache antes de consultar a fonte primária.
import time
import json
def get_user_data(user_id):
cache_key = f"user:{user_id}:data"
# Tenta obter do cache
cached = r.get(cache_key)
if cached:
print("Cache HIT")
return json.loads(cached)
print("Cache MISS - consultando banco...")
# Simula consulta ao banco
time.sleep(1)
user_data = {
"id": user_id,
"nome": f"Usuário {user_id}",
"email": f"user{user_id}@email.com"
}
# Armazena no cache com TTL de 5 minutos
r.setex(cache_key, 300, json.dumps(user_data))
return user_data
# Teste
print(get_user_data(1)) # Cache MISS
print(get_user_data(1)) # Cache HIT
Configuração de TTL com setex e psetex:
# setex: TTL em segundos
r.setex('chave:segundos', 60, 'valor')
# psetex: TTL em milissegundos
r.psetex('chave:ms', 3000, 'valor')
4. Padrões avançados de cache
Cache invalidation com namespaces:
def invalidate_user_cache(user_id):
"""Remove todas as chaves de cache relacionadas a um usuário"""
pattern = f"user:{user_id}:*"
for key in r.scan_iter(match=pattern):
r.delete(key)
# Invalidação seletiva
invalidate_user_cache(1)
Pipelines para operações em lote:
# Sem pipeline - múltiplas requisições
for i in range(100):
r.set(f"key:{i}", f"value:{i}")
# Com pipeline - uma única requisição
pipe = r.pipeline()
for i in range(100):
pipe.set(f"key:{i}", f"value:{i}")
pipe.execute() # Executa todas de uma vez
# Pipeline com transação
pipe = r.pipeline(transaction=True)
pipe.incr('contador')
pipe.expire('contador', 60)
pipe.execute()
5. Redis como sistema de filas
Redis pode atuar como fila FIFO simples usando listas:
# Produtor
def enfileirar_tarefa(tarefa):
r.lpush('fila:tarefas', json.dumps(tarefa))
print(f"Tarefa enfileirada: {tarefa}")
# Consumidor
def processar_tarefa():
# lpop: não bloqueante
tarefa = r.lpop('fila:tarefas')
if tarefa:
dados = json.loads(tarefa)
print(f"Processando: {dados}")
return dados
return None
# Consumidor bloqueante (espera até 30s)
def consumidor_bloqueante():
while True:
tarefa = r.blpop('fila:tarefas', timeout=30)
if tarefa:
_, dados = tarefa
print(f"Processando: {json.loads(dados)}")
Monitoramento da fila:
tamanho = r.llen('fila:tarefas')
print(f"Tamanho da fila: {tamanho}")
6. Filas com Redis Streams
Redis Streams oferece funcionalidades mais robustas para filas:
import time
# Produtor - adiciona mensagens ao stream
for i in range(5):
mensagem_id = r.xadd('stream:tarefas',
{'tarefa': f'tarefa_{i}', 'status': 'pendente'})
print(f"Mensagem {mensagem_id} adicionada")
time.sleep(0.5)
# Consumidor simples
mensagens = r.xread({'stream:tarefas': '0'}, count=10, block=0)
for stream, msgs in mensagens:
for msg_id, dados in msgs:
print(f"Processando {msg_id}: {dados}")
r.xdel('stream:tarefas', msg_id) # Remove após processar
# Grupos de consumidores
r.xgroup_create('stream:tarefas', 'grupo_processadores', id='0', mkstream=True)
def consumidor_grupo(nome_grupo, nome_consumidor):
while True:
resultados = r.xreadgroup(nome_grupo, nome_consumidor,
{'stream:tarefas': '>'}, count=1, block=1000)
if resultados:
for stream, msgs in resultados:
for msg_id, dados in msgs:
print(f"[{nome_consumidor}] Processando: {dados}")
r.xack('stream:tarefas', nome_grupo, msg_id)
Comparação: Streams são mais adequados para produção por oferecerem persistência, grupos de consumidores e confirmação de processamento. Listas são mais simples e performáticas para filas temporárias.
7. Tratamento de erros e boas práticas
Pool de conexões para gerenciar múltiplas conexões:
from redis import ConnectionPool
pool = ConnectionPool(host='localhost', port=6379, db=0, max_connections=10)
r = redis.Redis(connection_pool=pool)
# Reutilize a conexão em toda a aplicação
Tratamento de exceções:
from redis.exceptions import ConnectionError, TimeoutError
try:
r.ping()
except ConnectionError:
print("Redis indisponível - usando fallback")
# Lógica de fallback
except TimeoutError:
print("Timeout na conexão Redis")
Monitoramento:
# Métricas de cache
cache_hits = 0
cache_misses = 0
def get_with_metrics(key):
global cache_hits, cache_misses
if r.exists(key):
cache_hits += 1
return r.get(key)
cache_misses += 1
return None
print(f"Hit rate: {cache_hits/(cache_hits+cache_misses):.2%}")
8. Exemplo completo: sistema de cache e fila integrado
Aplicação Flask com cache de respostas e fila de tarefas:
from flask import Flask, jsonify, request
import redis
import json
import time
import threading
app = Flask(__name__)
r = redis.Redis(connection_pool=ConnectionPool())
# Cache de respostas de API externa
def get_external_data(id):
cache_key = f"api:data:{id}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# Simula chamada externa
time.sleep(2)
data = {"id": id, "resultado": f"dados do item {id}"}
r.setex(cache_key, 60, json.dumps(data))
return data
@app.route('/dados/<int:id>')
def dados(id):
data = get_external_data(id)
return jsonify(data)
# Fila de tarefas assíncronas
@app.route('/processar', methods=['POST'])
def enfileirar():
tarefa = request.json
r.lpush('fila:processamento', json.dumps(tarefa))
return jsonify({"status": "enfileirado"}), 202
def worker():
"""Consumidor da fila"""
while True:
tarefa = r.brpop('fila:processamento', timeout=5)
if tarefa:
_, dados = tarefa
dados = json.loads(dados)
print(f"Processando: {dados}")
# Simula processamento
time.sleep(3)
# Salva resultado
r.setex(f"resultado:{dados['id']}", 600,
json.dumps({"status": "concluído"}))
# Inicia worker em thread separada
threading.Thread(target=worker, daemon=True).start()
if __name__ == '__main__':
app.run(debug=True)
Teste do fluxo completo:
# Teste de cache
curl http://localhost:5000/dados/1 # Primeira chamada: 2s
curl http://localhost:5000/dados/1 # Segunda chamada: instantâneo
# Teste de fila
curl -X POST http://localhost:5000/processar \
-H "Content-Type: application/json" \
-d '{"id": 1, "tipo": "relatorio"}'
O sistema utiliza cache para acelerar respostas repetidas e fila para processamento assíncrono, demonstrando os dois principais casos de uso do Redis com Python.
Referências
- Redis Documentation - Official — Documentação oficial completa do Redis, incluindo comandos, tipos de dados e melhores práticas
- redis-py Documentation — Documentação oficial da biblioteca redis-py, com exemplos de todos os comandos e padrões
- Redis in Action - Cache Patterns — Artigo sobre padrões de cache com Redis, incluindo cache-aside, write-through e estratégias de invalidação
- Redis Streams Tutorial — Tutorial oficial sobre Redis Streams, ideal para implementação de filas robustas
- Using Redis as a Message Queue — Tutorial prático do Real Python sobre filas com Redis e Python, com exemplos de produtores e consumidores
- Redis Connection Pool Best Practices — Discussão técnica sobre gerenciamento de conexões Redis, pools e tratamento de erros
- FastAPI + Redis Cache Example — Exemplo oficial de integração FastAPI com Redis para cache de respostas