Como projetar sistemas de recomendação com atualização incremental em tempo real

1. Fundamentos e desafios da atualização em tempo real

Sistemas de recomendação tradicionais operam em modo batch: modelos são treinados a cada 24 horas em dados históricos completos, gerando predições estáticas até o próximo ciclo. Em contraste, sistemas com atualização incremental em tempo real processam eventos à medida que ocorrem, ajustando embeddings, pesos e similaridades em frações de segundo.

Os principais desafios incluem:
- Latência: o tempo entre o evento do usuário e a atualização do modelo deve ser inferior a 100ms
- Throughput: capacidade de processar milhares de eventos por segundo sem degradação
- Consistência: garantir que atualizações parciais não corrompam o estado do modelo
- Trade-off precisão-velocidade: modelos incrementais sacrificam algum grau de otimalidade para responder instantaneamente a mudanças de comportamento

2. Arquitetura de dados para streaming de eventos

A base de qualquer sistema incremental é um pipeline de streaming robusto. As fontes de dados típicas incluem cliques em links, visualizações de páginas, compras concluídas e feedback explícito (curtidas, avaliações).

# Exemplo de schema de evento de clique em sistema de recomendação
{
  "event_id": "evt_8a3f2c",
  "user_id": "usr_4521",
  "item_id": "itm_7890",
  "timestamp": 1712345678,
  "event_type": "click",
  "session_duration_ms": 3200,
  "context": {
    "device": "mobile",
    "page_position": 3,
    "hour_of_day": 14
  }
}

Utilizamos message brokers como Apache Kafka ou RabbitMQ para ingestão contínua. O ETL em tempo real realiza limpeza, normalização e feature engineering incremental:

# Pseudocódigo: feature engineering incremental em stream
def process_event_stream(event):
    # Normalizar timestamp para bucket de 5 minutos
    time_bucket = floor(event.timestamp / 300) * 300

    # Atualizar contagem de cliques por usuário
    redis.increment(f"user:{event.user_id}:clicks:{time_bucket}")

    # Calcular taxa de clique recente (última hora)
    recent_clicks = redis.get_range(f"user:{event.user_id}:clicks", -12, -1)
    click_rate = sum(recent_clicks) / len(recent_clicks)

    return {
        "user_id": event.user_id,
        "item_id": event.item_id,
        "click_rate_1h": click_rate,
        "time_bucket": time_bucket
    }

3. Modelagem incremental de preferências do usuário

A fatoração de matrizes incremental atualiza embeddings de usuários e itens usando gradiente descendente online (SGD online). A cada novo evento, ajustamos os vetores latentes sem reprocessar todo o histórico.

# Atualização incremental de embeddings com SGD online
def update_embeddings(user_id, item_id, rating, user_emb, item_emb, lr=0.01):
    # Predição atual
    pred = dot_product(user_emb[user_id], item_emb[item_id])
    error = rating - pred

    # Atualização dos embeddings
    user_emb[user_id] += lr * (error * item_emb[item_id] - reg * user_emb[user_id])
    item_emb[item_id] += lr * (error * user_emb[user_id] - reg * item_emb[item_id])

    return user_emb, item_emb

Estratégias de decay temporal são essenciais para dar menos peso a interações antigas. Implementamos um fator exponencial:

# Decay temporal para pesos de interações
def temporal_weight(timestamp, current_time, half_life=86400):
    # half_life = 1 dia em segundos
    age = current_time - timestamp
    return 2 ** (-age / half_life)

4. Algoritmos de recomendação com aprendizado contínuo

Bandits contextuais como LinUCB e Thompson Sampling são ideais para exploração em tempo real. Eles atualizam parâmetros a cada interação:

# Thompson Sampling com atualização incremental
class ThompsonBandit:
    def __init__(self, n_arms, alpha=1.0, beta=1.0):
        self.alpha = [alpha] * n_arms
        self.beta = [beta] * n_arms

    def select_arm(self):
        samples = [np.random.beta(a, b) for a, b in zip(self.alpha, self.beta)]
        return np.argmax(samples)

    def update(self, arm, reward):
        # reward = 1 para clique, 0 para não clique
        self.alpha[arm] += reward
        self.beta[arm] += 1 - reward

Para modelos baseados em vizinhança, similaridades entre itens são atualizadas incrementalmente quando novos pares de co-ocorrência surgem:

# Atualização incremental de similaridade Jaccard entre itens
def update_item_similarity(item_a, item_b, interaction_count_a, interaction_count_b, co_occurrence):
    co_occurrence[item_a][item_b] += 1
    jaccard = co_occurrence[item_a][item_b] / (interaction_count_a + interaction_count_b - co_occurrence[item_a][item_b])
    return jaccard

5. Infraestrutura e pipeline de deploy contínuo

A infraestrutura de inferência utiliza cache de embeddings em Redis e índices vetoriais em FAISS para busca rápida de itens similares:

# Pipeline de recomendação com cache e índice vetorial
def get_recommendations(user_id, k=10):
    # Buscar embedding do usuário no cache
    user_emb = redis.get(f"emb:user:{user_id}")

    if user_emb is None:
        # Fallback para modelo batch
        user_emb = batch_model.get_user_embedding(user_id)
        redis.set(f"emb:user:{user_id}", user_emb, ex=300)  # TTL de 5 min

    # Busca nos índices vetoriais
    item_ids, scores = faiss_index.search(user_emb, k)

    return item_ids

Versionamento de modelos e rollback automático são implementados com detecção de drift:

# Monitoramento de drift com janela deslizante
def detect_drift(metric_stream, window_size=1000, threshold=0.05):
    window = metric_stream[-window_size:]
    baseline_mean = np.mean(window[:window_size//2])
    recent_mean = np.mean(window[window_size//2:])

    if abs(recent_mean - baseline_mean) > threshold:
        trigger_rollback()

6. Monitoramento e avaliação em tempo real

Métricas de frescor medem quão rapidamente novos itens aparecem nos rankings. A diversidade temporal garante que recomendações não fiquem estagnadas:

# Métrica de frescor: proporção de itens novos no top-10
def freshness_score(recommendations, current_time, max_age_hours=24):
    fresh_items = sum(1 for item in recommendations 
                     if (current_time - item.publish_time) < max_age_hours * 3600)
    return fresh_items / len(recommendations)

O algoritmo ADWIN (Adaptive Windowing) detecta concept drift automaticamente:

# Detecção de drift com ADWIN simplificado
class ADWIN:
    def __init__(self, delta=0.002):
        self.window = []
        self.delta = delta
        self.total = 0

    def add_element(self, value):
        self.window.append(value)
        self.total += value
        self._check_drift()

    def _check_drift(self):
        if len(self.window) < 10:
            return
        # Implementação simplificada: verifica mudança na média
        mean = self.total / len(self.window)
        for cut in range(1, len(self.window)):
            left_mean = sum(self.window[:cut]) / cut
            right_mean = sum(self.window[cut:]) / (len(self.window) - cut)
            if abs(left_mean - right_mean) > self.delta:
                self.window = self.window[cut:]
                self.total = sum(self.window)
                trigger_alert("Drift detectado")

7. Estudo de caso: sistema de recomendação de notícias

Implementamos um pipeline completo: Kafka → Feature Store → Modelo incremental → API REST. A cada 5 minutos, o modelo é atualizado com feedback de cliques:

# Loop principal de atualização incremental (executado a cada 5 min)
def incremental_update_cycle():
    # 1. Consumir eventos do Kafka dos últimos 5 min
    events = kafka_consumer.poll(timeout_ms=5000)

    # 2. Atualizar feature store
    for event in events:
        feature_store.update_user_features(event.user_id, event)
        feature_store.update_item_features(event.item_id, event)

    # 3. Atualizar embeddings do modelo
    for event in events:
        user_emb = model.get_user_embedding(event.user_id)
        item_emb = model.get_item_embedding(event.item_id)
        model.update_embeddings(user_emb, item_emb, event.rating)

    # 4. Reconstruir índice vetorial parcialmente
    faiss_index.update_index(model.get_recent_items())

    # 5. Atualizar cache de embeddings
    redis.flush_pattern("emb:*")

Comparação de performance entre batch e incremental em cenários de pico (100k eventos/min):

Métrica Batch (24h) Incremental (5min)
Latência de atualização 24 horas 5 minutos
Precisão (NDCG@10) 0.72 0.68
Frescor (novos itens no top-10) 12% 47%
Throughput (eventos/s) 500 8500
Consumo de CPU 95% (pico) 45% (constante)

O sistema incremental sacrifica 5% de precisão, mas ganha 35% em frescor e 17x em throughput, tornando-se ideal para domínios onde a relevância temporal é crítica, como notícias e redes sociais.

Referências