Estratégias de consistência eventual em sistemas distribuídos

1. Fundamentos da Consistência Eventual

1.1. Definição e contexto no Teorema CAP

O Teorema CAP (Consistency, Availability, Partition Tolerance) estabelece que um sistema distribuído pode oferecer apenas duas das três propriedades simultaneamente. A consistência eventual emerge como uma escolha pragmática quando priorizamos disponibilidade e tolerância a partições (AP). Nesse modelo, o sistema garante que, se nenhuma nova atualização for feita a um item de dados, eventualmente todas as leituras retornarão o último valor atualizado.

1.2. Diferença entre consistência forte, fraca e eventual

Tipo Latência Disponibilidade Garantia
Forte Alta Baixa Leituras sempre retornam o último valor
Fraca Média Média Sem garantias temporais
Eventual Baixa Alta Convergência após cessar atualizações

A consistência eventual sacrifica a garantia de leituras imediatamente consistentes em troca de menor latência e maior disponibilidade, sendo ideal para aplicações onde dados obsoletos por curtos períodos são aceitáveis.

1.3. Exemplo prático: replicação assíncrona em Cassandra

-- Configuração de replicação no Cassandra com consistência eventual
CREATE KEYSPACE ecommerce WITH replication = {
  'class': 'NetworkTopologyStrategy',
  'datacenter1': 3
};

-- Inserção com consistência eventual (ONE)
INSERT INTO orders (order_id, status, total)
VALUES ('ORD-001', 'pending', 299.99)
USING CONSISTENCY ONE;

-- Leitura eventualmente consistente
SELECT * FROM orders WHERE order_id = 'ORD-001'
USING CONSISTENCY ONE;

2. Modelos de Consistência Eventual

2.1. Consistência de leitura após escrita (read-your-writes)

Este modelo garante que um cliente sempre veja suas próprias escritas. Implementação típica:

// Pseudocódigo para read-your-writes
class SessionStore {
  Map<String, Timestamp> lastWrites = new HashMap<>();

  void write(key, value) {
    timestamp = now();
    replicas.sendAsync(key, value, timestamp);
    lastWrites.put(key, timestamp);
  }

  Value read(key) {
    timestamp = lastWrites.getOrDefault(key, MIN_TIMESTAMP);
    return replicas.readWithTimestamp(key, timestamp);
  }
}

2.2. Consistência causal

A consistência causal preserva a ordenação de eventos relacionados, sem exigir um relógio global. Eventos concorrentes podem ser vistos em qualquer ordem.

// Exemplo de operações causalmente consistentes
Evento A: Usuário cria post "Olá mundo"
Evento B: Usuário comenta no post "Olá mundo"
Evento C: Outro usuário vê o comentário antes do post original

// Ordem causal: A -> B -> C
// Sem consistência causal: C pode ser visto antes de A e B

2.3. Consistência de sessão

Garantias dentro de uma sessão de usuário:

// Configuração de sessão no DynamoDB
Session session = dynamoDBClient.createSession();
session.setConsistencyLevel(ConsistencyLevel.EVENTUAL);

// Operações dentro da sessão mantêm consistência local
session.put("user:123", "carrinho", ["item1", "item2"]);
session.get("user:123", "carrinho"); // Retorna ["item1", "item2"]

3. Técnicas de Resolução de Conflitos

3.1. Estratégia de último escritor vence (LWW) e vetores de versão

// Vetores de versão no Riak
Object objeto = {
  "chave": "doc-001",
  "valor": "versão 3",
  "vetor_versao": {
    "nó1": 2,
    "nó2": 1,
    "nó3": 1
  }
}

// Resolução de conflito com LWW
if (timestamp_A > timestamp_B) {
  return valor_A;  // Último escritor vence
} else {
  return valor_B;
}

3.2. CRDTs (Conflict-Free Replicated Data Types)

CRDTs garantem convergência matemática sem coordenação:

// G-Counter (Grow-only Counter) CRDT
class GCounter {
  Map<String, Integer> counts = new HashMap<>();

  void increment(String node) {
    counts.merge(node, 1, Integer::sum);
  }

  int value() {
    return counts.values().stream().mapToInt(Integer::intValue).sum();
  }

  void merge(GCounter other) {
    other.counts.forEach((node, count) -> {
      counts.merge(node, count, Math::max);
    });
  }
}

3.3. Resolução manual de conflitos

// Estratégia de merge em sistemas de dados
function resolveConflito(versaoA, versaoB) {
  if (versaoA.timestamp > versaoB.timestamp + TOLERANCIA) {
    return versaoA;
  }
  if (versaoB.timestamp > versaoA.timestamp + TOLERANCIA) {
    return versaoB;
  }
  // Conflito real: merge manual ou notificação
  return mergeManual(versaoA, versaoB);
}

4. Mecanismos de Propagação e Sincronização

4.1. Gossip protocols

// Algoritmo Gossip simplificado
class GossipNode {
  Set<Update> state = new HashSet<>();

  void gossip() {
    Node peer = selectRandomPeer();
    peer.receiveUpdates(this.state);
  }

  void receiveUpdates(Set<Update> updates) {
    for (Update update : updates) {
      if (update.timestamp > this.state.get(update.key).timestamp) {
        this.state.put(update.key, update);
      }
    }
  }
}

4.2. Anti-entropia e reconciliação periódica

// Mecanismo de anti-entropia
class AntiEntropy {
  void reconcile(Node a, Node b) {
    MerkleTree treeA = a.buildMerkleTree();
    MerkleTree treeB = b.buildMerkleTree();

    Set<String> diffKeys = treeA.diff(treeB);

    for (String key : diffKeys) {
      Value vA = a.get(key);
      Value vB = b.get(key);
      Value resolved = resolve(vA, vB);
      a.put(key, resolved);
      b.put(key, resolved);
    }
  }
}

4.3. Quóruns de leitura e escrita ajustáveis

// Configuração de quorum no Cassandra
// N = número total de réplicas
// W = quorum de escrita
// R = quorum de leitura

// Consistência eventual: W=1, R=1
// Consistência forte: W=QUORUM, R=QUORUM

// Exemplo: cluster com 3 réplicas
CONSISTENCY ONE;     // W=1, R=1 - eventual
CONSISTENCY QUORUM;   // W=2, R=2 - forte
CONSISTENCY ALL;      // W=3, R=3 - mais forte

5. Integração com Padrões de Arquitetura Distribuída

5.1. Event sourcing e CQRS

// Event Sourcing com consistência eventual
class OrderService {
  void createOrder(Order order) {
    OrderCreated event = new OrderCreated(order);
    eventStore.append(event);  // Escrita eventual
  }

  Order getOrder(String id) {
    List<Event> events = eventStore.getEvents(id);
    return Order.rebuild(events);  // Leitura eventual
  }
}

// CQRS: separação de comandos e consultas
class OrderCommandHandler {
  void handle(CreateOrder command) {
    eventStore.append(new OrderCreated(command));
  }
}

class OrderQueryHandler {
  OrderProjection getOrder(String id) {
    return readModel.get(id);  // Pode estar desatualizado
  }
}

5.2. Sagas e compensações

// Saga para pedido com consistência eventual
class OrderSaga {
  void processOrder(Order order) {
    try {
      reserveInventory(order);     // Passo 1
      processPayment(order);       // Passo 2
      shipOrder(order);            // Passo 3
      markOrderCompleted(order);   // Passo 4
    } catch (Exception e) {
      compensate(order);           // Rollback eventual
    }
  }

  void compensate(Order order) {
    cancelShipping(order);
    refundPayment(order);
    releaseInventory(order);
  }
}

5.3. Uso de filas de mensagens

// Kafka para replicação assíncrona
Properties props = new Properties();
props.put("acks", "1");  // Consistência eventual

KafkaProducer<String, Order> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("orders", order));

// Consumidor eventualmente consistente
KafkaConsumer<String, Order> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));

while (true) {
  ConsumerRecords<String, Order> records = consumer.poll(100);
  for (ConsumerRecord<String, Order> record : records) {
    processOrder(record.value());  // Pode estar desatualizado
  }
}

6. Desafios e Estratégias de Mitigação

6.1. Janelas de inconsistência

// Medição da janela de inconsistência
class InconsistencyMonitor {
  long maxLatency = 0;

  void measurePropagation(Update update) {
    long propagationTime = System.currentTimeMillis() - update.timestamp;
    maxLatency = Math.max(maxLatency, propagationTime);
  }

  boolean isWithinTolerance(long toleranceMs) {
    return maxLatency <= toleranceMs;
  }
}

6.2. Stale reads e divergência de estado

// Detecção de stale reads
class StaleReadDetector {
  boolean isStale(Value value, Timestamp expected) {
    return value.timestamp < expected;
  }

  Value correctStaleRead(String key, Value staleValue) {
    return replicaManager.getLatest(key);
  }
}

6.3. Garantias de entrega em falhas de rede

// Retry com backoff exponencial
class ReliableDelivery {
  void sendWithRetry(Message msg, int maxRetries) {
    int attempt = 0;
    while (attempt < maxRetries) {
      try {
        broker.send(msg);
        return;
      } catch (NetworkException e) {
        attempt++;
        Thread.sleep(Math.pow(2, attempt) * 100);
      }
    }
    throw new DeliveryFailedException();
  }
}

7. Monitoramento e Observabilidade da Consistência

7.1. Métricas de divergência

// Métricas para monitoramento
class ConsistencyMetrics {
  // Latência de propagação
  Histogram propagationLatency = Histogram.build()
    .name("propagation_latency_ms")
    .register();

  // Taxa de conflitos
  Counter conflictRate = Counter.build()
    .name("conflict_rate_total")
    .register();

  void recordPropagation(long latencyMs) {
    propagationLatency.observe(latencyMs);
  }
}

7.2. Ferramentas de rastreamento distribuído

// OpenTelemetry para rastreamento de consistência
Span span = tracer.spanBuilder("write_operation")
  .setAttribute("consistency.level", "eventual")
  .startSpan();

try (Scope scope = span.makeCurrent()) {
  database.write(key, value);
  span.setStatus(StatusCode.OK);
} catch (Exception e) {
  span.recordException(e);
  span.setStatus(StatusCode.ERROR);
} finally {
  span.end();
}

7.3. Testes de invariantes e Chaos Engineering

// Teste de invariante para consistência eventual
@Test
void testEventualConsistency() {
  // Simular falha de rede
  networkSimulator.partition("node-1", "node-2");

  database.write("key", "value1");
  database.write("key", "value2");

  networkSimulator.heal();

  // Verificar convergência eventual
  await().atMost(5, SECONDS).until(() -> {
    return database.read("key").equals("value2");
  });
}

Referências