Truques para lidar com logs volumosos em produção
Logs são a memória viva de qualquer sistema em produção. Quando algo quebra, são eles que contam a história. Mas quando o volume dispara — milhares de linhas por segundo —, o logging vira um problema maior que o próprio bug. Sem estratégia, você afoga o disco, explode a latência e perde o sinal no meio do ruído. Este artigo reúne truques práticos para domar logs volumosos sem sacrificar a visibilidade.
1. Estratégias de Amostragem e Filtragem Inteligente
Nem todo log merece ser salvo. A primeira linha de defesa é decidir o que entra e o que fica de fora.
Amostragem adaptativa: Em vez de logar 100% das requisições, amostre com base em latência e erros. Se a latência está baixa e não há erros, amostre 1 em cada 100. Se o p95 ultrapassa 500ms, amostre 1 em cada 10. Se há erro 5xx, logue tudo.
if latency_p95 > 500ms or error_rate > 0.01:
sample_rate = 1.0
elif latency_p95 > 200ms:
sample_rate = 0.1
else:
sample_rate = 0.01
if random.random() < sample_rate:
logger.info("request processed", extra=request_data)
Deduplicação: Logs repetitivos (mesma mensagem, mesmo stack trace) em curto intervalo são inúteis. Use um cache LRU para detectar duplicatas e apenas incremente um contador.
cache = LRUCache(maxsize=1000, ttl=60)
def log_deduplicated(message, level="info"):
key = hash(message)
if key in cache:
cache[key]["count"] += 1
else:
cache[key] = {"count": 1, "level": level}
logger.log(level, message)
Níveis dinâmicos: Em produção, deixe debug desligado por padrão. Ative sob demanda por serviço via feature flag ou variável de ambiente. Um endpoint /debug/on?service=auth&ttl=300 resolve.
2. Estruturação e Compactação em Tempo Real
Logs soltos em texto puro são pesados e difíceis de parsear. A estruturação reduz o tamanho e acelera consultas.
JSON com campos mínimos: Use sempre timestamp, level, service, message e trace_id. Evite aninhamentos profundos — eles inflam o payload.
{
"timestamp": "2025-04-10T14:32:01.123Z",
"level": "error",
"service": "payment-api",
"message": "timeout ao conectar no gateway",
"trace_id": "abc123",
"duration_ms": 3021
}
Compressão inline: Antes de escrever no disco, compacte com gzip. Um log de 10KB vira ~2KB. Em volumes altos, isso reduz I/O em 80%.
import gzip
def write_log_compressed(file, log_line):
compressed = gzip.compress(log_line.encode())
file.write(compressed + b"\n")
Bufferização: Escrever linha por linha mata o disco. Acumule em buffer de 64KB ou 100ms (o que vier primeiro) e faça batch write.
buffer = []
last_flush = time.time()
def append_log(line):
buffer.append(line)
if len(buffer) >= 100 or (time.time() - last_flush) > 0.1:
flush_buffer()
def flush_buffer():
with open("logs.gz", "ab") as f:
for line in buffer:
f.write(gzip.compress(line.encode()) + b"\n")
buffer.clear()
last_flush = time.time()
3. Roteamento e Sharding de Logs por Critério
Logs de erro não devem competir com logs de debug pelo mesmo arquivo. Separe por fluxo.
Streams por criticidade: Erro e warning vão para um tópico Kafka de alta prioridade. Info e debug vão para um tópico de baixa prioridade, que pode ser descartado em picos.
if level in ("error", "critical"):
kafka_producer.send("logs-high-prio", log_json)
else:
kafka_producer.send("logs-low-prio", log_json)
Sharding por tenant/serviço: Em sistemas multi-tenant, crie um arquivo ou índice por tenant. Isso isola ruído e facilita rotação individual.
Filas com backpressure: Use Kafka ou RabbitMQ como amortecedor. Se o consumidor de logs estiver lento, a fila acumula sem travar a aplicação. Configure max.in.flight.requests e retries para evitar perda.
4. Rotação e Retenção Agressiva com Políticas Inteligentes
Disco não é infinito. Rotacione antes que encha.
Rotação por tamanho e tempo: Use logrotate ou bibliotecas nativas. Exemplo: rodar a cada 100MB ou a cada 6h, com compressão automática.
/var/log/myapp/*.log {
size 100M
rotate 30
compress
delaycompress
missingok
notifempty
}
Retenção diferenciada: Erros merecem 90 dias. Debug pode sumir em 7. Separe os arquivos por nível para políticas distintas.
Sumarização de logs antigos: Para logs com mais de 30 dias, agregue métricas (contagem de erros por hora, latência média) e descarte os detalhes. Um job diário faz isso.
# Job diário de sumarização
for each hour in last_30_days:
errors = count_errors(hour)
avg_latency = avg_latency(hour)
save_summary(hour, errors, avg_latency)
delete_raw_logs_older_than(30)
5. Indexação e Consulta Otimizada
Logs sem índice são inúteis. Mas indexar tudo é caro. Seja seletivo.
Índices invertidos mínimos: No Elasticsearch, indexe apenas timestamp, level, service e trace_id. Deixe message como text sem índice ou com index: false se não for buscar por palavra-chave.
Time-series databases: Para logs temporais (métricas de erro por segundo), use InfluxDB ou TimescaleDB. Eles comprimem séries temporais melhor que Elasticsearch.
Pré-agregação em tempo real: Em vez de contar erros no momento da consulta, mantenha counters atualizados a cada minuto.
# Atualiza a cada 60s
error_counter = Counter("app_errors_total", ["service", "status_code"])
error_counter.labels(service="payment", status_code="500").inc()
6. Monitoramento de Volume e Alertas Proativos
Se o volume de logs dobrar em 5 minutos, algo está errado — e você precisa saber antes do disco encher.
Métricas de taxa: Exponha logs_total e logs_bytes_total por serviço. Configure alertas para crescimento >50% em 10 minutos.
Anomalias de pico: Use média móvel dos últimos 7 dias. Se o volume atual ultrapassa 3 desvios padrão, dispare alerta.
Circuit breaker para debug: Se a taxa de logs de debug ultrapassar 1000/s, desligue automaticamente o nível debug por 5 minutos. Reative gradualmente.
if debug_rate > 1000 and not circuit_open:
circuit_open = True
logger.setLevel("INFO")
schedule_reopen(300) # reativa em 5 min
7. Ferramentas e Pipelines Modernos para Alta Escala
A ferramenta certa reduz custo e complexidade.
Loki + Promtail: Leve e barato. Loki indexa apenas metadados (labels), não o conteúdo. Ideal para logs estruturados. Custa uma fração do Elasticsearch.
Elasticsearch com ILM: Configure hot-warm-cold phases. Logs recentes ficam em SSD (hot), após 7 dias vão para HDD (warm), após 30 dias para cold storage com replica 0.
Soluções serverless: CloudWatch Logs e Azure Log Analytics cobram por volume ingerido e armazenado. Use filtros de subscription para descartar logs irrelevantes antes da ingestão.
# Filtro CloudWatch: descarta logs de health check
if "healthcheck" in log_message:
return # não envia para o CloudWatch
Logs volumosos não são um problema técnico — são um problema de decisão. O truque não é gerar menos logs, mas gerar os logs certos, no formato certo, no lugar certo. Com amostragem inteligente, estruturação, roteamento e ferramentas modernas, você mantém a visibilidade sem quebrar o banco (ou o disco).
Referências
- Documentação oficial do Loki — Guia completo de configuração do Loki para logging leve e escalável, incluindo Promtail e rótulos.
- Elasticsearch Index Lifecycle Management (ILM) — Documentação oficial sobre políticas de ciclo de vida para gerenciar retenção e custos.
- Logging Best Practices - Google Cloud — Práticas recomendadas do Google para logging estruturado, amostragem e monitoramento.
- The Log: What every software engineer should know about real-time data's unifying abstraction — Artigo clássico de Jay Kreps sobre logs como abstração central em sistemas distribuídos.
- Grafana Loki: LogQL documentation — Referência da linguagem de consulta LogQL para buscas eficientes em logs volumosos.
- InfluxDB documentation - Time series data — Documentação do InfluxDB para armazenamento e consulta de séries temporais a partir de logs.