Truques para lidar com logs volumosos em produção

Logs são a memória viva de qualquer sistema em produção. Quando algo quebra, são eles que contam a história. Mas quando o volume dispara — milhares de linhas por segundo —, o logging vira um problema maior que o próprio bug. Sem estratégia, você afoga o disco, explode a latência e perde o sinal no meio do ruído. Este artigo reúne truques práticos para domar logs volumosos sem sacrificar a visibilidade.

1. Estratégias de Amostragem e Filtragem Inteligente

Nem todo log merece ser salvo. A primeira linha de defesa é decidir o que entra e o que fica de fora.

Amostragem adaptativa: Em vez de logar 100% das requisições, amostre com base em latência e erros. Se a latência está baixa e não há erros, amostre 1 em cada 100. Se o p95 ultrapassa 500ms, amostre 1 em cada 10. Se há erro 5xx, logue tudo.

if latency_p95 > 500ms or error_rate > 0.01:
    sample_rate = 1.0
elif latency_p95 > 200ms:
    sample_rate = 0.1
else:
    sample_rate = 0.01

if random.random() < sample_rate:
    logger.info("request processed", extra=request_data)

Deduplicação: Logs repetitivos (mesma mensagem, mesmo stack trace) em curto intervalo são inúteis. Use um cache LRU para detectar duplicatas e apenas incremente um contador.

cache = LRUCache(maxsize=1000, ttl=60)

def log_deduplicated(message, level="info"):
    key = hash(message)
    if key in cache:
        cache[key]["count"] += 1
    else:
        cache[key] = {"count": 1, "level": level}
        logger.log(level, message)

Níveis dinâmicos: Em produção, deixe debug desligado por padrão. Ative sob demanda por serviço via feature flag ou variável de ambiente. Um endpoint /debug/on?service=auth&ttl=300 resolve.

2. Estruturação e Compactação em Tempo Real

Logs soltos em texto puro são pesados e difíceis de parsear. A estruturação reduz o tamanho e acelera consultas.

JSON com campos mínimos: Use sempre timestamp, level, service, message e trace_id. Evite aninhamentos profundos — eles inflam o payload.

{
  "timestamp": "2025-04-10T14:32:01.123Z",
  "level": "error",
  "service": "payment-api",
  "message": "timeout ao conectar no gateway",
  "trace_id": "abc123",
  "duration_ms": 3021
}

Compressão inline: Antes de escrever no disco, compacte com gzip. Um log de 10KB vira ~2KB. Em volumes altos, isso reduz I/O em 80%.

import gzip

def write_log_compressed(file, log_line):
    compressed = gzip.compress(log_line.encode())
    file.write(compressed + b"\n")

Bufferização: Escrever linha por linha mata o disco. Acumule em buffer de 64KB ou 100ms (o que vier primeiro) e faça batch write.

buffer = []
last_flush = time.time()

def append_log(line):
    buffer.append(line)
    if len(buffer) >= 100 or (time.time() - last_flush) > 0.1:
        flush_buffer()

def flush_buffer():
    with open("logs.gz", "ab") as f:
        for line in buffer:
            f.write(gzip.compress(line.encode()) + b"\n")
    buffer.clear()
    last_flush = time.time()

3. Roteamento e Sharding de Logs por Critério

Logs de erro não devem competir com logs de debug pelo mesmo arquivo. Separe por fluxo.

Streams por criticidade: Erro e warning vão para um tópico Kafka de alta prioridade. Info e debug vão para um tópico de baixa prioridade, que pode ser descartado em picos.

if level in ("error", "critical"):
    kafka_producer.send("logs-high-prio", log_json)
else:
    kafka_producer.send("logs-low-prio", log_json)

Sharding por tenant/serviço: Em sistemas multi-tenant, crie um arquivo ou índice por tenant. Isso isola ruído e facilita rotação individual.

Filas com backpressure: Use Kafka ou RabbitMQ como amortecedor. Se o consumidor de logs estiver lento, a fila acumula sem travar a aplicação. Configure max.in.flight.requests e retries para evitar perda.

4. Rotação e Retenção Agressiva com Políticas Inteligentes

Disco não é infinito. Rotacione antes que encha.

Rotação por tamanho e tempo: Use logrotate ou bibliotecas nativas. Exemplo: rodar a cada 100MB ou a cada 6h, com compressão automática.

/var/log/myapp/*.log {
    size 100M
    rotate 30
    compress
    delaycompress
    missingok
    notifempty
}

Retenção diferenciada: Erros merecem 90 dias. Debug pode sumir em 7. Separe os arquivos por nível para políticas distintas.

Sumarização de logs antigos: Para logs com mais de 30 dias, agregue métricas (contagem de erros por hora, latência média) e descarte os detalhes. Um job diário faz isso.

# Job diário de sumarização
for each hour in last_30_days:
    errors = count_errors(hour)
    avg_latency = avg_latency(hour)
    save_summary(hour, errors, avg_latency)
delete_raw_logs_older_than(30)

5. Indexação e Consulta Otimizada

Logs sem índice são inúteis. Mas indexar tudo é caro. Seja seletivo.

Índices invertidos mínimos: No Elasticsearch, indexe apenas timestamp, level, service e trace_id. Deixe message como text sem índice ou com index: false se não for buscar por palavra-chave.

Time-series databases: Para logs temporais (métricas de erro por segundo), use InfluxDB ou TimescaleDB. Eles comprimem séries temporais melhor que Elasticsearch.

Pré-agregação em tempo real: Em vez de contar erros no momento da consulta, mantenha counters atualizados a cada minuto.

# Atualiza a cada 60s
error_counter = Counter("app_errors_total", ["service", "status_code"])
error_counter.labels(service="payment", status_code="500").inc()

6. Monitoramento de Volume e Alertas Proativos

Se o volume de logs dobrar em 5 minutos, algo está errado — e você precisa saber antes do disco encher.

Métricas de taxa: Exponha logs_total e logs_bytes_total por serviço. Configure alertas para crescimento >50% em 10 minutos.

Anomalias de pico: Use média móvel dos últimos 7 dias. Se o volume atual ultrapassa 3 desvios padrão, dispare alerta.

Circuit breaker para debug: Se a taxa de logs de debug ultrapassar 1000/s, desligue automaticamente o nível debug por 5 minutos. Reative gradualmente.

if debug_rate > 1000 and not circuit_open:
    circuit_open = True
    logger.setLevel("INFO")
    schedule_reopen(300)  # reativa em 5 min

7. Ferramentas e Pipelines Modernos para Alta Escala

A ferramenta certa reduz custo e complexidade.

Loki + Promtail: Leve e barato. Loki indexa apenas metadados (labels), não o conteúdo. Ideal para logs estruturados. Custa uma fração do Elasticsearch.

Elasticsearch com ILM: Configure hot-warm-cold phases. Logs recentes ficam em SSD (hot), após 7 dias vão para HDD (warm), após 30 dias para cold storage com replica 0.

Soluções serverless: CloudWatch Logs e Azure Log Analytics cobram por volume ingerido e armazenado. Use filtros de subscription para descartar logs irrelevantes antes da ingestão.

# Filtro CloudWatch: descarta logs de health check
if "healthcheck" in log_message:
    return  # não envia para o CloudWatch

Logs volumosos não são um problema técnico — são um problema de decisão. O truque não é gerar menos logs, mas gerar os logs certos, no formato certo, no lugar certo. Com amostragem inteligente, estruturação, roteamento e ferramentas modernas, você mantém a visibilidade sem quebrar o banco (ou o disco).

Referências