Como projetar sistemas de recomendação com atualização incremental em tempo real
1. Fundamentos e desafios da atualização em tempo real
Sistemas de recomendação tradicionais operam em modo batch: modelos são treinados a cada 24 horas em dados históricos completos, gerando predições estáticas até o próximo ciclo. Em contraste, sistemas com atualização incremental em tempo real processam eventos à medida que ocorrem, ajustando embeddings, pesos e similaridades em frações de segundo.
Os principais desafios incluem:
- Latência: o tempo entre o evento do usuário e a atualização do modelo deve ser inferior a 100ms
- Throughput: capacidade de processar milhares de eventos por segundo sem degradação
- Consistência: garantir que atualizações parciais não corrompam o estado do modelo
- Trade-off precisão-velocidade: modelos incrementais sacrificam algum grau de otimalidade para responder instantaneamente a mudanças de comportamento
2. Arquitetura de dados para streaming de eventos
A base de qualquer sistema incremental é um pipeline de streaming robusto. As fontes de dados típicas incluem cliques em links, visualizações de páginas, compras concluídas e feedback explícito (curtidas, avaliações).
# Exemplo de schema de evento de clique em sistema de recomendação
{
"event_id": "evt_8a3f2c",
"user_id": "usr_4521",
"item_id": "itm_7890",
"timestamp": 1712345678,
"event_type": "click",
"session_duration_ms": 3200,
"context": {
"device": "mobile",
"page_position": 3,
"hour_of_day": 14
}
}
Utilizamos message brokers como Apache Kafka ou RabbitMQ para ingestão contínua. O ETL em tempo real realiza limpeza, normalização e feature engineering incremental:
# Pseudocódigo: feature engineering incremental em stream
def process_event_stream(event):
# Normalizar timestamp para bucket de 5 minutos
time_bucket = floor(event.timestamp / 300) * 300
# Atualizar contagem de cliques por usuário
redis.increment(f"user:{event.user_id}:clicks:{time_bucket}")
# Calcular taxa de clique recente (última hora)
recent_clicks = redis.get_range(f"user:{event.user_id}:clicks", -12, -1)
click_rate = sum(recent_clicks) / len(recent_clicks)
return {
"user_id": event.user_id,
"item_id": event.item_id,
"click_rate_1h": click_rate,
"time_bucket": time_bucket
}
3. Modelagem incremental de preferências do usuário
A fatoração de matrizes incremental atualiza embeddings de usuários e itens usando gradiente descendente online (SGD online). A cada novo evento, ajustamos os vetores latentes sem reprocessar todo o histórico.
# Atualização incremental de embeddings com SGD online
def update_embeddings(user_id, item_id, rating, user_emb, item_emb, lr=0.01):
# Predição atual
pred = dot_product(user_emb[user_id], item_emb[item_id])
error = rating - pred
# Atualização dos embeddings
user_emb[user_id] += lr * (error * item_emb[item_id] - reg * user_emb[user_id])
item_emb[item_id] += lr * (error * user_emb[user_id] - reg * item_emb[item_id])
return user_emb, item_emb
Estratégias de decay temporal são essenciais para dar menos peso a interações antigas. Implementamos um fator exponencial:
# Decay temporal para pesos de interações
def temporal_weight(timestamp, current_time, half_life=86400):
# half_life = 1 dia em segundos
age = current_time - timestamp
return 2 ** (-age / half_life)
4. Algoritmos de recomendação com aprendizado contínuo
Bandits contextuais como LinUCB e Thompson Sampling são ideais para exploração em tempo real. Eles atualizam parâmetros a cada interação:
# Thompson Sampling com atualização incremental
class ThompsonBandit:
def __init__(self, n_arms, alpha=1.0, beta=1.0):
self.alpha = [alpha] * n_arms
self.beta = [beta] * n_arms
def select_arm(self):
samples = [np.random.beta(a, b) for a, b in zip(self.alpha, self.beta)]
return np.argmax(samples)
def update(self, arm, reward):
# reward = 1 para clique, 0 para não clique
self.alpha[arm] += reward
self.beta[arm] += 1 - reward
Para modelos baseados em vizinhança, similaridades entre itens são atualizadas incrementalmente quando novos pares de co-ocorrência surgem:
# Atualização incremental de similaridade Jaccard entre itens
def update_item_similarity(item_a, item_b, interaction_count_a, interaction_count_b, co_occurrence):
co_occurrence[item_a][item_b] += 1
jaccard = co_occurrence[item_a][item_b] / (interaction_count_a + interaction_count_b - co_occurrence[item_a][item_b])
return jaccard
5. Infraestrutura e pipeline de deploy contínuo
A infraestrutura de inferência utiliza cache de embeddings em Redis e índices vetoriais em FAISS para busca rápida de itens similares:
# Pipeline de recomendação com cache e índice vetorial
def get_recommendations(user_id, k=10):
# Buscar embedding do usuário no cache
user_emb = redis.get(f"emb:user:{user_id}")
if user_emb is None:
# Fallback para modelo batch
user_emb = batch_model.get_user_embedding(user_id)
redis.set(f"emb:user:{user_id}", user_emb, ex=300) # TTL de 5 min
# Busca nos índices vetoriais
item_ids, scores = faiss_index.search(user_emb, k)
return item_ids
Versionamento de modelos e rollback automático são implementados com detecção de drift:
# Monitoramento de drift com janela deslizante
def detect_drift(metric_stream, window_size=1000, threshold=0.05):
window = metric_stream[-window_size:]
baseline_mean = np.mean(window[:window_size//2])
recent_mean = np.mean(window[window_size//2:])
if abs(recent_mean - baseline_mean) > threshold:
trigger_rollback()
6. Monitoramento e avaliação em tempo real
Métricas de frescor medem quão rapidamente novos itens aparecem nos rankings. A diversidade temporal garante que recomendações não fiquem estagnadas:
# Métrica de frescor: proporção de itens novos no top-10
def freshness_score(recommendations, current_time, max_age_hours=24):
fresh_items = sum(1 for item in recommendations
if (current_time - item.publish_time) < max_age_hours * 3600)
return fresh_items / len(recommendations)
O algoritmo ADWIN (Adaptive Windowing) detecta concept drift automaticamente:
# Detecção de drift com ADWIN simplificado
class ADWIN:
def __init__(self, delta=0.002):
self.window = []
self.delta = delta
self.total = 0
def add_element(self, value):
self.window.append(value)
self.total += value
self._check_drift()
def _check_drift(self):
if len(self.window) < 10:
return
# Implementação simplificada: verifica mudança na média
mean = self.total / len(self.window)
for cut in range(1, len(self.window)):
left_mean = sum(self.window[:cut]) / cut
right_mean = sum(self.window[cut:]) / (len(self.window) - cut)
if abs(left_mean - right_mean) > self.delta:
self.window = self.window[cut:]
self.total = sum(self.window)
trigger_alert("Drift detectado")
7. Estudo de caso: sistema de recomendação de notícias
Implementamos um pipeline completo: Kafka → Feature Store → Modelo incremental → API REST. A cada 5 minutos, o modelo é atualizado com feedback de cliques:
# Loop principal de atualização incremental (executado a cada 5 min)
def incremental_update_cycle():
# 1. Consumir eventos do Kafka dos últimos 5 min
events = kafka_consumer.poll(timeout_ms=5000)
# 2. Atualizar feature store
for event in events:
feature_store.update_user_features(event.user_id, event)
feature_store.update_item_features(event.item_id, event)
# 3. Atualizar embeddings do modelo
for event in events:
user_emb = model.get_user_embedding(event.user_id)
item_emb = model.get_item_embedding(event.item_id)
model.update_embeddings(user_emb, item_emb, event.rating)
# 4. Reconstruir índice vetorial parcialmente
faiss_index.update_index(model.get_recent_items())
# 5. Atualizar cache de embeddings
redis.flush_pattern("emb:*")
Comparação de performance entre batch e incremental em cenários de pico (100k eventos/min):
| Métrica | Batch (24h) | Incremental (5min) |
|---|---|---|
| Latência de atualização | 24 horas | 5 minutos |
| Precisão (NDCG@10) | 0.72 | 0.68 |
| Frescor (novos itens no top-10) | 12% | 47% |
| Throughput (eventos/s) | 500 | 8500 |
| Consumo de CPU | 95% (pico) | 45% (constante) |
O sistema incremental sacrifica 5% de precisão, mas ganha 35% em frescor e 17x em throughput, tornando-se ideal para domínios onde a relevância temporal é crítica, como notícias e redes sociais.
Referências
- Apache Kafka Documentation: Stream Processing — Documentação oficial sobre processamento de streams com Kafka, base para pipelines de eventos em tempo real
- TensorFlow Recommenders: Incremental Training — Tutorial oficial do Google sobre modelos de recomendação com suporte a treinamento incremental
- FAISS: Efficient Similarity Search — Biblioteca da Meta para índices vetoriais com atualização incremental de embeddings
- ADWIN: Adaptive Windowing for Concept Drift Detection — Artigo seminal sobre detecção de drift com janelas deslizantes adaptativas
- Contextual Bandits with Thompson Sampling — Paper de referência sobre bandits contextuais para exploração online em sistemas de recomendação
- Redis for Real-Time Feature Store — Guia prático sobre uso de Redis como feature store para sistemas de recomendação em tempo real