Como implementar rate limiting por IP com Redis e tolerância a proxies
1. Fundamentos do Rate Limiting e o Papel do Redis
Rate limiting é uma técnica de controle de tráfego que limita o número de requisições que um cliente pode fazer em um determinado intervalo de tempo. É essencial para prevenir abusos, ataques de força bruta, e garantir a estabilidade de APIs e serviços web. Sem ele, um único cliente mal-intencionado ou com um bug pode sobrecarregar o servidor, afetando todos os outros usuários.
O Redis é a ferramenta ideal para implementar rate limiting por várias razões:
- Operações atômicas: Comandos como
INCReEXPIREgarantem que não haja condições de corrida. - Baixa latência: Opera em memória, com respostas na ordem de microssegundos.
- Expiração automática: Chaves podem ter TTL (time-to-live), eliminando a necessidade de limpeza manual.
- Estruturas de dados versáteis: Strings, Sorted Sets e Hashes permitem implementar diferentes estratégias.
As estratégias mais comuns incluem:
- Fixed Window: Divide o tempo em janelas fixas (ex: 100 requisições por minuto). Simples, mas pode permitir picos no final de uma janela e início da próxima.
- Sliding Window Log: Mantém um log de timestamps de cada requisição. Preciso, mas consome mais memória.
- Sliding Window Counter: Combina dois contadores (janela atual e anterior) para aproximar o sliding window com menor uso de memória.
- Token Bucket: Um bucket com tokens que são consumidos a cada requisição e recarregados a uma taxa constante.
2. Identificação Correta do IP Real em Ambientes com Proxies
Em arquiteturas modernas, as requisições raramente chegam diretamente ao servidor. Elas passam por proxies reversos (Nginx, HAProxy), CDNs (Cloudflare, Akamai) e balanceadores de carga. O IP que o servidor vê (REMOTE_ADDR) é o do último proxy, não o do cliente real.
Os cabeçalhos HTTP que carregam o IP original são:
- X-Forwarded-For: Lista de IPs separados por vírgula. O primeiro é o cliente original, os demais são proxies.
- X-Real-IP: Usado por alguns proxies (ex: Nginx) para enviar o IP real diretamente.
O desafio é que esses cabeçalhos podem ser falsificados por clientes maliciosos. A solução é confiar apenas em proxies conhecidos (whitelist) e extrair o IP correto:
def get_real_ip(request, trusted_proxies):
"""
Extrai o IP real do cliente considerando proxies confiáveis.
Args:
request: Objeto de requisição com headers e remote_addr
trusted_proxies: Lista de IPs de proxies confiáveis (CIDR ou IPs)
Returns:
String com o IP real do cliente
"""
# Verifica se há X-Forwarded-For
xff = request.headers.get('X-Forwarded-For')
if xff:
# Divide a lista de IPs
ips = [ip.strip() for ip in xff.split(',')]
# Remove proxies confiáveis do final da lista
while ips and ips[-1] in trusted_proxies:
ips.pop()
# Se sobrou algum IP, o primeiro é o cliente original
if ips:
return ips[0]
# Fallback para X-Real-IP (se presente)
xri = request.headers.get('X-Real-IP')
if xri and xri not in trusted_proxies:
return xri
# Último recurso: IP da conexão direta
return request.remote_addr
3. Implementação Básica com Sliding Window Log no Redis
A estratégia Sliding Window Log usa um Sorted Set no Redis para armazenar timestamps de cada requisição. A chave é o IP do cliente, e cada membro do set é um timestamp único (ex: requisição:timestamp). O score é o próprio timestamp, permitindo consultas por intervalo.
Comandos Redis essenciais:
ZADD: Adiciona um membro com scoreZREMRANGEBYSCORE: Remove membros com score em um intervaloZCOUNT: Conta membros com score em um intervaloEXPIRE: Define TTL para a chave
import redis
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def is_rate_limited_sliding_window(ip, limit, window_seconds):
"""
Verifica se o IP excedeu o limite de requisições usando Sliding Window Log.
Args:
ip: IP do cliente
limit: Número máximo de requisições permitidas
window_seconds: Tamanho da janela em segundos
Returns:
Boolean: True se estiver limitado, False caso contrário
"""
key = f"ratelimit:sliding:{ip}"
now = time.time()
window_start = now - window_seconds
# Pipeline para atomicidade
pipe = r.pipeline()
# Remove timestamps antigos (fora da janela)
pipe.zremrangebyscore(key, 0, window_start)
# Conta requisições na janela atual
pipe.zcard(key)
# Adiciona timestamp atual
pipe.zadd(key, {f"req:{now}": now})
# Define TTL para limpeza automática
pipe.expire(key, window_seconds)
results = pipe.execute()
current_count = results[1] # Resultado de zcard
# Se já excedeu o limite, remove a requisição que acabamos de adicionar
if current_count > limit:
pipe2 = r.pipeline()
pipe2.zrem(key, f"req:{now}")
pipe2.execute()
return True
return False
4. Estratégia Avançada: Sliding Window Counters com Redis
O Sliding Window Log é preciso, mas consome muita memória para IPs com muitas requisições. O Sliding Window Counter resolve isso usando apenas dois contadores por janela: um para a janela atual e outro para a anterior.
A ideia é dividir o tempo em janelas fixas (ex: 1 segundo). Para cada requisição, calculamos em qual janela ela cai. Mantemos um contador para a janela atual e outro para a anterior. Quando uma requisição chega, calculamos um peso proporcional ao tempo decorrido na janela atual:
def is_rate_limited_sliding_counter(ip, limit, window_seconds):
"""
Verifica se o IP excedeu o limite usando Sliding Window Counter.
Args:
ip: IP do cliente
limit: Número máximo de requisições permitidas
window_seconds: Tamanho da janela em segundos (ex: 60)
Returns:
Boolean: True se estiver limitado, False caso contrário
"""
key = f"ratelimit:counter:{ip}"
now = time.time()
# Divide o tempo em janelas de 1 segundo (granularidade ajustável)
current_window = int(now)
previous_window = current_window - 1
# Pesos para a janela anterior (proporcional ao tempo decorrido)
elapsed = now - current_window
weight_previous = 1 - elapsed # Quanto mais perto do final, menor o peso
pipe = r.pipeline()
# Incrementa contador da janela atual
pipe.hincrby(key, current_window, 1)
# Obtém contadores de ambas as janelas
pipe.hget(key, current_window)
pipe.hget(key, previous_window)
# Define TTL para a chave (2 janelas para segurança)
pipe.expire(key, window_seconds + 2)
results = pipe.execute()
current_count = int(results[1] or 0)
previous_count = int(results[2] or 0)
# Calcula requisições estimadas na janela deslizante
estimated = current_count + (previous_count * weight_previous)
if estimated > limit:
# Reverte o incremento
r.hincrby(key, current_window, -1)
return True
return False
5. Tolerância a Proxies: Whitelist e Headers Dinâmicos
Para lidar com proxies de forma confiável, precisamos de uma whitelist de IPs confiáveis. Isso inclui:
- IPs internos da infraestrutura (ex: range 10.0.0.0/8)
- IPs de CDNs conhecidos (Cloudflare, Akamai, Fastly)
- IPs de balanceadores de carga
A estratégia de fallback deve considerar que o cabeçalho X-Forwarded-For pode estar ausente ou ter sido falsificado:
# Whitelist de proxies confiáveis (exemplo com Cloudflare)
TRUSTED_PROXIES = {
'173.245.48.0/20',
'103.21.244.0/22',
'103.22.200.0/22',
# ... outros ranges
}
# Cache Redis para evitar parsing repetitivo
PROXY_CACHE_KEY = "proxy_cache"
def resolve_client_ip(request):
"""
Resolve o IP real do cliente com cache em Redis.
"""
# Chave de cache baseada no IP do proxy
proxy_ip = request.remote_addr
cache_key = f"{PROXY_CACHE_KEY}:{proxy_ip}"
# Verifica cache
cached = r.get(cache_key)
if cached:
return cached
# Processa headers
real_ip = get_real_ip(request, TRUSTED_PROXIES)
# Armazena em cache por 5 minutos
r.setex(cache_key, 300, real_ip)
return real_ip
6. Tratamento de Exceções e Cenários de Borda
IPs Privados e IPv6
IPs privados (RFC 1918: 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16) nunca devem ser considerados como IPs de clientes reais em produção. IPv6 mapeados em IPv4 (ex: ::ffff:192.168.1.1) devem ser normalizados.
Granularidade: Rota vs. Global
Rate limiting pode ser aplicado globalmente (por IP) ou por rota (endpoint). A abordagem por rota é mais justa, mas requer chaves compostas:
def rate_limit_by_route(ip, route, limit, window):
key = f"ratelimit:route:{route}:{ip}"
# ... mesma lógica dos exemplos anteriores
Respostas Adequadas
Quando uma requisição é bloqueada, a resposta deve incluir:
- Status HTTP: 429 Too Many Requests
- Cabeçalho Retry-After: Tempo em segundos até poder tentar novamente
- Logging: Registrar o IP, rota e timestamp para análise
def rate_limit_response(retry_after):
return {
'status': 429,
'headers': {
'Retry-After': str(retry_after),
'Content-Type': 'application/json'
},
'body': {
'error': 'Too Many Requests',
'retry_after': retry_after
}
}
7. Monitoramento e Ajuste Dinâmico dos Limites
Métricas no Redis
Para monitorar a eficácia do rate limiting, armazene métricas em chaves separadas:
# Incrementa contadores de métricas
r.incr(f"metrics:ratelimit:blocked:{ip}")
r.incr(f"metrics:ratelimit:allowed:{ip}")
# Taxa de bloqueio por IP
blocked = int(r.get(f"metrics:ratelimit:blocked:{ip}") or 0)
allowed = int(r.get(f"metrics:ratelimit:allowed:{ip}") or 0)
rate = blocked / (blocked + allowed) if (blocked + allowed) > 0 else 0
Adaptive Rate Limiting
Limites podem ser ajustados dinamicamente com base na carga do servidor ou no comportamento do cliente:
def adaptive_limit(ip, base_limit, server_load):
"""
Ajusta o limite baseado na carga do servidor (0.0 a 1.0).
"""
if server_load > 0.8:
return int(base_limit * 0.5) # Reduz 50%
elif server_load > 0.6:
return int(base_limit * 0.8) # Reduz 20%
return base_limit
8. Boas Práticas de Segurança e Performance
Pool de Conexões Redis
Use um pool de conexões para evitar overhead de criação de conexões:
pool = redis.ConnectionPool(host='localhost', port=6379, max_connections=20)
r = redis.Redis(connection_pool=pool)
Proteção contra Ataques ao Redis
- Limite o tamanho de Sorted Sets: remova entradas antigas frequentemente
- Use
MAXMEMORYno Redis para evitar estouro de memória - Implemente circuit breaker: se o Redis ficar indisponível, permita requisições temporariamente
Versionamento de Chaves
Para migrar entre estratégias sem interromper o serviço:
STRATEGY_VERSION = 2
key = f"ratelimit:v{STRATEGY_VERSION}:{ip}"
Isso permite que versões antigas expirem naturalmente enquanto a nova estratégia é adotada gradualmente.
Referências
- Redis Sorted Sets Documentation — Documentação oficial sobre Sorted Sets, usados na implementação de Sliding Window Log.
- Cloudflare IP Ranges — Lista atualizada de IPs de proxies Cloudflare para whitelist.
- HTTP Rate Limiting with Redis and Python — Tutorial prático sobre rate limiting com Redis e Python, incluindo exemplos de código.
- NGINX Rate Limiting Guide — Guia oficial da NGINX sobre rate limiting, incluindo configuração de proxies e X-Forwarded-For.
- Redis Rate Limiting Patterns — Padrões de rate limiting documentados no site oficial do Redis, incluindo exemplos com INCR e EXPIRE.