Como implementar CQRS com event sourcing em Node.js do zero
1. Fundamentos teóricos: CQRS e Event Sourcing
CQRS (Command Query Responsibility Segregation) é um padrão arquitetural que separa operações de escrita (comandos) de operações de leitura (consultas). Enquanto comandos alteram estado, consultas apenas retornam dados sem efeitos colaterais.
Event Sourcing armazena o estado de um sistema como uma sequência imutável de eventos. Em vez de persistir o estado atual de um agregado, cada mudança é registrada como um evento que representa o que aconteceu.
A combinação desses padrões oferece:
- Auditabilidade completa: todo evento fica registrado permanentemente
- Escalabilidade: sides de leitura e escrita podem escalar independentemente
- Consistência eventual: o side de leitura atualiza assincronamente
- Replay de eventos: possibilidade de reconstruir estado em qualquer momento
2. Preparando o ambiente e estrutura do projeto
npm init -y
npm install typescript @types/node ts-node --save-dev
npx tsc --init
Estrutura de pastas:
src/
domain/
events/
aggregates/
commands/
application/
commandBus/
queryBus/
infrastructure/
eventStore/
presentation/
controllers/
Definição de interfaces base:
// src/domain/events/Event.ts
export interface Event {
eventId: string;
aggregateId: string;
eventType: string;
data: Record<string, unknown>;
version: number;
timestamp: Date;
}
// src/domain/commands/Command.ts
export interface Command {
commandId: string;
commandType: string;
data: Record<string, unknown>;
}
3. Implementando o núcleo do Event Store
// src/infrastructure/eventStore/EventStore.ts
import { Event } from '../../domain/events/Event';
export class EventStore {
private streams: Map<string, Event[]> = new Map();
async appendToStream(
aggregateId: string,
events: Event[],
expectedVersion: number
): Promise<void> {
const stream = this.streams.get(aggregateId) || [];
if (stream.length !== expectedVersion) {
throw new Error('Concurrency conflict');
}
this.streams.set(aggregateId, [...stream, ...events]);
}
async readStream(aggregateId: string): Promise<Event[]> {
return this.streams.get(aggregateId) || [];
}
async getAggregate<T>(
aggregateId: string,
builder: (events: Event[]) => T
): Promise<T> {
const events = await this.readStream(aggregateId);
return builder(events);
}
}
4. Modelagem de domínio com agregados e eventos
// src/domain/aggregates/Aggregate.ts
import { Event } from '../events/Event';
export abstract class Aggregate {
protected id: string;
protected version: number = 0;
protected changes: Event[] = [];
abstract apply(event: Event): void;
loadFromHistory(events: Event[]): void {
for (const event of events) {
this.apply(event);
this.version = event.version;
}
}
getUncommittedChanges(): Event[] {
return this.changes;
}
markChangesAsCommitted(): void {
this.changes = [];
}
}
Exemplo prático com agregado ContaBancaria:
// src/domain/events/ContaEventos.ts
export class ContaCriada implements Event {
constructor(
public eventId: string,
public aggregateId: string,
public data: { titular: string; saldoInicial: number },
public version: number,
public timestamp: Date
) {
this.eventType = 'ContaCriada';
}
}
export class DepositoRealizado implements Event {
constructor(
public eventId: string,
public aggregateId: string,
public data: { valor: number },
public version: number,
public timestamp: Date
) {
this.eventType = 'DepositoRealizado';
}
}
export class SaqueRealizado implements Event {
constructor(
public eventId: string,
public aggregateId: string,
public data: { valor: number },
public version: number,
public timestamp: Date
) {
this.eventType = 'SaqueRealizado';
}
}
// src/domain/aggregates/ContaBancaria.ts
export class ContaBancaria extends Aggregate {
private titular: string;
private saldo: number = 0;
static abrir(id: string, titular: string, saldoInicial: number): ContaBancaria {
const conta = new ContaBancaria();
conta.apply(new ContaCriada(
uuid(), id, { titular, saldoInicial }, 1, new Date()
));
return conta;
}
depositar(valor: number): void {
if (valor <= 0) throw new Error('Valor inválido');
this.apply(new DepositoRealizado(
uuid(), this.id, { valor }, this.version + 1, new Date()
));
}
sacar(valor: number): void {
if (valor <= 0 || valor > this.saldo) throw new Error('Saldo insuficiente');
this.apply(new SaqueRealizado(
uuid(), this.id, { valor }, this.version + 1, new Date()
));
}
apply(event: Event): void {
switch (event.eventType) {
case 'ContaCriada':
this.titular = event.data.titular;
this.saldo = event.data.saldoInicial;
break;
case 'DepositoRealizado':
this.saldo += event.data.valor;
break;
case 'SaqueRealizado':
this.saldo -= event.data.valor;
break;
}
this.version = event.version;
}
}
5. Construindo a camada de Comandos (CQRS - Write Side)
// src/application/commandBus/CommandBus.ts
export class CommandBus {
private handlers: Map<string, CommandHandler> = new Map();
register(commandType: string, handler: CommandHandler): void {
this.handlers.set(commandType, handler);
}
async dispatch(command: Command): Promise<void> {
const handler = this.handlers.get(command.commandType);
if (!handler) throw new Error('Handler não encontrado');
await handler.handle(command);
}
}
// src/application/commandBus/handlers/ContaCommandHandler.ts
export class ContaCommandHandler implements CommandHandler {
constructor(
private eventStore: EventStore,
private eventBus: EventBus
) {}
async handle(command: Command): Promise<void> {
switch (command.commandType) {
case 'AbrirConta': {
const conta = ContaBancaria.abrir(
command.data.id,
command.data.titular,
command.data.saldoInicial
);
await this.eventStore.appendToStream(
command.data.id,
conta.getUncommittedChanges(),
0
);
break;
}
case 'Depositar': {
const conta = await this.eventStore.getAggregate(
command.data.id,
(events) => {
const c = new ContaBancaria();
c.loadFromHistory(events);
return c;
}
);
conta.depositar(command.data.valor);
await this.eventStore.appendToStream(
command.data.id,
conta.getUncommittedChanges(),
conta.version
);
break;
}
}
}
}
6. Construindo a camada de Consultas (CQRS - Read Side)
// src/application/projections/ContaCorrenteProjection.ts
export class ContaCorrenteView {
private saldos: Map<string, number> = new Map();
apply(event: Event): void {
switch (event.eventType) {
case 'ContaCriada':
this.saldos.set(event.aggregateId, event.data.saldoInicial);
break;
case 'DepositoRealizado':
const saldoAtualDep = this.saldos.get(event.aggregateId) || 0;
this.saldos.set(event.aggregateId, saldoAtualDep + event.data.valor);
break;
case 'SaqueRealizado':
const saldoAtualSaq = this.saldos.get(event.aggregateId) || 0;
this.saldos.set(event.aggregateId, saldoAtualSaq - event.data.valor);
break;
}
}
getSaldo(aggregateId: string): number {
return this.saldos.get(aggregateId) || 0;
}
}
// src/application/queryBus/QueryBus.ts
export class QueryBus {
private handlers: Map<string, QueryHandler> = new Map();
register(queryType: string, handler: QueryHandler): void {
this.handlers.set(queryType, handler);
}
async dispatch<T>(query: Query): Promise<T> {
const handler = this.handlers.get(query.queryType);
if (!handler) throw new Error('Handler não encontrado');
return handler.handle(query);
}
}
7. Integração final e testes end-to-end
// src/infrastructure/eventBus/EventBus.ts
export class EventBus {
private subscribers: Map<string, Function[]> = new Map();
subscribe(eventType: string, handler: Function): void {
const handlers = this.subscribers.get(eventType) || [];
handlers.push(handler);
this.subscribers.set(eventType, handlers);
}
publish(events: Event[]): void {
for (const event of events) {
const handlers = this.subscribers.get(event.eventType) || [];
for (const handler of handlers) {
handler(event);
}
}
}
}
// Teste end-to-end
async function exemploCompleto() {
const eventStore = new EventStore();
const eventBus = new EventBus();
const projection = new ContaCorrenteView();
// Inscreve projeção nos eventos
eventBus.subscribe('ContaCriada', (e) => projection.apply(e));
eventBus.subscribe('DepositoRealizado', (e) => projection.apply(e));
eventBus.subscribe('SaqueRealizado', (e) => projection.apply(e));
const commandBus = new CommandBus();
commandBus.register('AbrirConta', new ContaCommandHandler(eventStore, eventBus));
commandBus.register('Depositar', new ContaCommandHandler(eventStore, eventBus));
commandBus.register('Sacar', new ContaCommandHandler(eventStore, eventBus));
// Fluxo completo
await commandBus.dispatch({
commandId: '1',
commandType: 'AbrirConta',
data: { id: '123', titular: 'João', saldoInicial: 1000 }
});
await commandBus.dispatch({
commandId: '2',
commandType: 'Depositar',
data: { id: '123', valor: 500 }
});
await commandBus.dispatch({
commandId: '3',
commandType: 'Sacar',
data: { id: '123', valor: 200 }
});
console.log('Saldo final:', projection.getSaldo('123')); // 1300
// Replay de eventos para verificar consistência
const events = await eventStore.readStream('123');
const contaReplay = new ContaBancaria();
contaReplay.loadFromHistory(events);
console.log('Saldo via replay:', contaReplay['saldo']); // 1300
}
Este exemplo demonstra como CQRS com Event Sourcing oferece auditabilidade total, capacidade de replay e separação clara entre operações de escrita e leitura. A implementação em memória serve como base para testes, podendo ser substituída por Event Stores como EventStoreDB ou PostgreSQL em produção.
Referências
- CQRS Pattern - Microsoft Docs — Documentação oficial da Microsoft sobre o padrão CQRS com exemplos práticos
- Event Sourcing Pattern - Martin Fowler — Artigo fundamental de Martin Fowler explicando Event Sourcing em detalhes
- Node.js CQRS/ES Boilerplate - GitHub — Repositório com exemplos completos de CQRS e Event Sourcing em Node.js
- EventStoreDB Documentation — Documentação oficial do EventStoreDB, banco especializado em Event Sourcing
- TypeScript Handbook - Classes — Guia oficial do TypeScript para implementação de classes e interfaces utilizadas no artigo
- Docker Compose for EventStoreDB — Imagem Docker oficial para executar EventStoreDB localmente durante desenvolvimento