Event sourcing explicado com exemplos concretos em Node.js
1. Fundamentos do Event Sourcing
Event sourcing é um padrão arquitetural onde o estado atual de uma aplicação é derivado de uma sequência imutável de eventos passados. Em vez de armazenar o estado atual de uma entidade (como em CRUD tradicional), cada alteração significativa é registrada como um evento.
Comparação CRUD vs Event Sourcing:
CRUD tradicional:
// Atualiza diretamente o estado
UPDATE accounts SET balance = 150 WHERE id = 'acc-123'
Event Sourcing:
// Apenas registra o evento que ocorreu
{ type: 'DepositoRealizado', aggregateId: 'acc-123', amount: 50, version: 2 }
{ type: 'SaqueRealizado', aggregateId: 'acc-123', amount: 30, version: 3 }
Quando usar: Sistemas financeiros, auditoria, carrinhos de compras, rastreamento de mudanças complexas.
Quando evitar: Aplicações simples com poucas alterações de estado, sistemas de cache, cenários onde performance de leitura é crítica sem necessidade de histórico.
2. Arquitetura Base: O Event Store em Node.js
Estrutura de um evento
// events/event.types.js
class DomainEvent {
constructor(aggregateId, type, data, metadata = {}) {
this.aggregateId = aggregateId;
this.type = type;
this.data = data;
this.metadata = {
...metadata,
timestamp: new Date().toISOString(),
version: 0
};
}
}
Event Store em memória
// events/event-store.js
class InMemoryEventStore {
constructor() {
this.events = [];
}
async saveEvents(aggregateId, events, expectedVersion) {
const existingEvents = await this.getEvents(aggregateId);
if (existingEvents.length !== expectedVersion) {
throw new Error('Conflito de concorrência');
}
events.forEach((event, index) => {
event.metadata.version = expectedVersion + index + 1;
});
this.events.push(...events);
return events;
}
async getEvents(aggregateId) {
return this.events.filter(e => e.aggregateId === aggregateId);
}
}
3. Construindo o Aggregate Root
Aggregate de conta bancária
// aggregates/account.js
class BankAccount {
constructor(id) {
this.id = id;
this.balance = 0;
this.version = 0;
this.changes = [];
}
deposit(amount) {
if (amount <= 0) throw new Error('Valor inválido');
this.applyChange({
aggregateId: this.id,
type: 'DepositoRealizado',
data: { amount }
});
}
withdraw(amount) {
if (amount <= 0) throw new Error('Valor inválido');
if (this.balance < amount) throw new Error('Saldo insuficiente');
this.applyChange({
aggregateId: this.id,
type: 'SaqueRealizado',
data: { amount }
});
}
applyChange(event) {
this.applyEvent(event);
this.changes.push(event);
}
applyEvent(event) {
switch (event.type) {
case 'DepositoRealizado':
this.balance += event.data.amount;
break;
case 'SaqueRealizado':
this.balance -= event.data.amount;
break;
}
this.version = event.metadata.version;
}
static loadFromHistory(events, id) {
const account = new BankAccount(id);
events.forEach(event => account.applyEvent(event));
account.changes = [];
return account;
}
}
4. Projeções e Read Models
Projeção de saldo atual
// projections/account-projection.js
class AccountProjection {
constructor() {
this.accounts = new Map();
}
handleEvent(event) {
switch (event.type) {
case 'DepositoRealizado':
this._updateBalance(event.aggregateId, event.data.amount);
break;
case 'SaqueRealizado':
this._updateBalance(event.aggregateId, -event.data.amount);
break;
}
}
_updateBalance(accountId, delta) {
const current = this.accounts.get(accountId) || { balance: 0, lastUpdated: null };
current.balance += delta;
current.lastUpdated = new Date().toISOString();
this.accounts.set(accountId, current);
}
getAccountSummary(accountId) {
return this.accounts.get(accountId) || { balance: 0, lastUpdated: null };
}
}
CQRS básico com comando e consulta
// command-handler.js
class CommandHandler {
constructor(eventStore, projection) {
this.eventStore = eventStore;
this.projection = projection;
}
async handleDeposit(accountId, amount) {
const events = await this.eventStore.getEvents(accountId);
const account = BankAccount.loadFromHistory(events, accountId);
account.deposit(amount);
await this.eventStore.saveEvents(accountId, account.changes, account.version - account.changes.length);
account.changes.forEach(event => this.projection.handleEvent(event));
return account;
}
}
5. Snapshotting para Performance
Implementação de snapshot
// snapshots/snapshot-manager.js
class SnapshotManager {
constructor(eventStore, snapshotInterval = 10) {
this.eventStore = eventStore;
this.snapshotInterval = snapshotInterval;
this.snapshots = new Map();
}
async takeSnapshot(aggregateId, aggregate) {
if (aggregate.version % this.snapshotInterval === 0) {
this.snapshots.set(aggregateId, {
state: { balance: aggregate.balance },
version: aggregate.version,
timestamp: new Date().toISOString()
});
}
}
async loadWithSnapshot(aggregateId) {
const snapshot = this.snapshots.get(aggregateId);
let events = [];
if (snapshot) {
events = await this.eventStore.getEventsSince(aggregateId, snapshot.version);
} else {
events = await this.eventStore.getEvents(aggregateId);
}
const account = snapshot
? BankAccount.loadFromSnapshot(snapshot, aggregateId)
: new BankAccount(aggregateId);
events.forEach(event => account.applyEvent(event));
return account;
}
}
6. Tratamento de Concorrência e Idempotência
Optimistic locking e idempotency keys
// concurrency/concurrency-handler.js
class ConcurrencyHandler {
constructor(eventStore) {
this.eventStore = eventStore;
this.processedCommands = new Set();
}
async processCommand(command, idempotencyKey) {
if (this.processedCommands.has(idempotencyKey)) {
return { status: 'already_processed' };
}
try {
const events = await this.eventStore.getEvents(command.aggregateId);
const aggregate = BankAccount.loadFromHistory(events, command.aggregateId);
command.execute(aggregate);
await this.eventStore.saveEvents(command.aggregateId, aggregate.changes, aggregate.version - aggregate.changes.length);
this.processedCommands.add(idempotencyKey);
return { status: 'success', aggregate };
} catch (error) {
if (error.message === 'Conflito de concorrência') {
return { status: 'retry_needed' };
}
throw error;
}
}
}
7. Caso Prático Completo: Carrinho de Compras
Eventos e aggregate do carrinho
// aggregates/shopping-cart.js
class ShoppingCart {
constructor(id) {
this.id = id;
this.items = [];
this.status = 'active';
this.version = 0;
this.changes = [];
}
addItem(productId, name, price, quantity) {
if (quantity > 10) throw new Error('Limite de 10 unidades por item');
if (this.status !== 'active') throw new Error('Carrinho finalizado');
this.applyChange({
aggregateId: this.id,
type: 'ItemAdicionado',
data: { productId, name, price, quantity }
});
}
removeItem(productId) {
const item = this.items.find(i => i.productId === productId);
if (!item) throw new Error('Item não encontrado');
this.applyChange({
aggregateId: this.id,
type: 'ItemRemovido',
data: { productId }
});
}
finalize() {
if (this.items.length === 0) throw new Error('Carrinho vazio');
this.applyChange({
aggregateId: this.id,
type: 'CarrinhoFinalizado',
data: { total: this.getTotal() }
});
}
applyEvent(event) {
switch (event.type) {
case 'ItemAdicionado':
this.items.push(event.data);
break;
case 'ItemRemovido':
this.items = this.items.filter(i => i.productId !== event.data.productId);
break;
case 'CarrinhoFinalizado':
this.status = 'finalized';
break;
}
this.version = event.metadata.version;
}
getTotal() {
return this.items.reduce((sum, item) => sum + (item.price * item.quantity), 0);
}
static loadFromHistory(events, id) {
const cart = new ShoppingCart(id);
events.forEach(event => cart.applyEvent(event));
cart.changes = [];
return cart;
}
}
Projeção do carrinho para exibição
// projections/cart-projection.js
class CartProjection {
constructor() {
this.carts = new Map();
}
handleEvent(event) {
const cart = this.carts.get(event.aggregateId) || { items: [], total: 0, status: 'active' };
switch (event.type) {
case 'ItemAdicionado':
cart.items.push(event.data);
cart.total += event.data.price * event.data.quantity;
break;
case 'ItemRemovido':
const removedItem = cart.items.find(i => i.productId === event.data.productId);
if (removedItem) {
cart.total -= removedItem.price * removedItem.quantity;
cart.items = cart.items.filter(i => i.productId !== event.data.productId);
}
break;
case 'CarrinhoFinalizado':
cart.status = 'finalized';
break;
}
this.carts.set(event.aggregateId, cart);
}
getCartSummary(cartId) {
return this.carts.get(cartId) || { items: [], total: 0, status: 'active' };
}
}
Testes unitários simulando replay de eventos
// tests/cart-test.js
const assert = require('assert');
// Simula o replay de eventos para testar o aggregate
const events = [
{ type: 'ItemAdicionado', aggregateId: 'cart-1', data: { productId: 'p1', name: 'Livro', price: 50, quantity: 2 }, metadata: { version: 1 } },
{ type: 'ItemAdicionado', aggregateId: 'cart-1', data: { productId: 'p2', name: 'Caneta', price: 5, quantity: 3 }, metadata: { version: 2 } },
{ type: 'ItemRemovido', aggregateId: 'cart-1', data: { productId: 'p2' }, metadata: { version: 3 } },
{ type: 'CarrinhoFinalizado', aggregateId: 'cart-1', data: { total: 100 }, metadata: { version: 4 } }
];
const cart = ShoppingCart.loadFromHistory(events, 'cart-1');
assert.strictEqual(cart.items.length, 1);
assert.strictEqual(cart.items[0].name, 'Livro');
assert.strictEqual(cart.status, 'finalized');
assert.strictEqual(cart.getTotal(), 100);
console.log('Testes passaram! Estado reconstruído corretamente a partir dos eventos.');
Referências
- Event Sourcing - Martin Fowler — Artigo fundamental sobre o padrão Event Sourcing, com explicações conceituais e exemplos.
- Event Sourcing with Node.js - EventStore Documentation — Documentação oficial do EventStoreDB, incluindo integração com Node.js e exemplos práticos.
- CQRS and Event Sourcing - Microsoft Docs — Guia da Microsoft sobre CQRS e Event Sourcing, com diagramas e casos de uso.
- Building Event-Driven Microservices with Node.js - O'Reilly — Livro abrangente sobre arquitetura orientada a eventos com exemplos em Node.js.
- Event Sourcing in Practice - InfoQ — Artigo técnico discutindo implementações reais de Event Sourcing, incluindo desafios e soluções.
- Node.js Event Sourcing with PostgreSQL - PostgreSQL Documentation — Guia oficial do PostgreSQL para implementar Event Sourcing utilizando funcionalidades avançadas do banco.
- Testing Event Sourced Systems - Greg Young — Recursos sobre versionamento e testes em sistemas baseados em Event Sourcing.