Como implementar CQRS com event sourcing em Node.js do zero

1. Fundamentos teóricos: CQRS e Event Sourcing

CQRS (Command Query Responsibility Segregation) é um padrão arquitetural que separa operações de escrita (comandos) de operações de leitura (consultas). Enquanto comandos alteram estado, consultas apenas retornam dados sem efeitos colaterais.

Event Sourcing armazena o estado de um sistema como uma sequência imutável de eventos. Em vez de persistir o estado atual de um agregado, cada mudança é registrada como um evento que representa o que aconteceu.

A combinação desses padrões oferece:
- Auditabilidade completa: todo evento fica registrado permanentemente
- Escalabilidade: sides de leitura e escrita podem escalar independentemente
- Consistência eventual: o side de leitura atualiza assincronamente
- Replay de eventos: possibilidade de reconstruir estado em qualquer momento

2. Preparando o ambiente e estrutura do projeto

npm init -y
npm install typescript @types/node ts-node --save-dev
npx tsc --init

Estrutura de pastas:

src/
  domain/
    events/
    aggregates/
    commands/
  application/
    commandBus/
    queryBus/
  infrastructure/
    eventStore/
  presentation/
    controllers/

Definição de interfaces base:

// src/domain/events/Event.ts
export interface Event {
  eventId: string;
  aggregateId: string;
  eventType: string;
  data: Record<string, unknown>;
  version: number;
  timestamp: Date;
}

// src/domain/commands/Command.ts
export interface Command {
  commandId: string;
  commandType: string;
  data: Record<string, unknown>;
}

3. Implementando o núcleo do Event Store

// src/infrastructure/eventStore/EventStore.ts
import { Event } from '../../domain/events/Event';

export class EventStore {
  private streams: Map<string, Event[]> = new Map();

  async appendToStream(
    aggregateId: string,
    events: Event[],
    expectedVersion: number
  ): Promise<void> {
    const stream = this.streams.get(aggregateId) || [];

    if (stream.length !== expectedVersion) {
      throw new Error('Concurrency conflict');
    }

    this.streams.set(aggregateId, [...stream, ...events]);
  }

  async readStream(aggregateId: string): Promise<Event[]> {
    return this.streams.get(aggregateId) || [];
  }

  async getAggregate<T>(
    aggregateId: string,
    builder: (events: Event[]) => T
  ): Promise<T> {
    const events = await this.readStream(aggregateId);
    return builder(events);
  }
}

4. Modelagem de domínio com agregados e eventos

// src/domain/aggregates/Aggregate.ts
import { Event } from '../events/Event';

export abstract class Aggregate {
  protected id: string;
  protected version: number = 0;
  protected changes: Event[] = [];

  abstract apply(event: Event): void;

  loadFromHistory(events: Event[]): void {
    for (const event of events) {
      this.apply(event);
      this.version = event.version;
    }
  }

  getUncommittedChanges(): Event[] {
    return this.changes;
  }

  markChangesAsCommitted(): void {
    this.changes = [];
  }
}

Exemplo prático com agregado ContaBancaria:

// src/domain/events/ContaEventos.ts
export class ContaCriada implements Event {
  constructor(
    public eventId: string,
    public aggregateId: string,
    public data: { titular: string; saldoInicial: number },
    public version: number,
    public timestamp: Date
  ) {
    this.eventType = 'ContaCriada';
  }
}

export class DepositoRealizado implements Event {
  constructor(
    public eventId: string,
    public aggregateId: string,
    public data: { valor: number },
    public version: number,
    public timestamp: Date
  ) {
    this.eventType = 'DepositoRealizado';
  }
}

export class SaqueRealizado implements Event {
  constructor(
    public eventId: string,
    public aggregateId: string,
    public data: { valor: number },
    public version: number,
    public timestamp: Date
  ) {
    this.eventType = 'SaqueRealizado';
  }
}
// src/domain/aggregates/ContaBancaria.ts
export class ContaBancaria extends Aggregate {
  private titular: string;
  private saldo: number = 0;

  static abrir(id: string, titular: string, saldoInicial: number): ContaBancaria {
    const conta = new ContaBancaria();
    conta.apply(new ContaCriada(
      uuid(), id, { titular, saldoInicial }, 1, new Date()
    ));
    return conta;
  }

  depositar(valor: number): void {
    if (valor <= 0) throw new Error('Valor inválido');
    this.apply(new DepositoRealizado(
      uuid(), this.id, { valor }, this.version + 1, new Date()
    ));
  }

  sacar(valor: number): void {
    if (valor <= 0 || valor > this.saldo) throw new Error('Saldo insuficiente');
    this.apply(new SaqueRealizado(
      uuid(), this.id, { valor }, this.version + 1, new Date()
    ));
  }

  apply(event: Event): void {
    switch (event.eventType) {
      case 'ContaCriada':
        this.titular = event.data.titular;
        this.saldo = event.data.saldoInicial;
        break;
      case 'DepositoRealizado':
        this.saldo += event.data.valor;
        break;
      case 'SaqueRealizado':
        this.saldo -= event.data.valor;
        break;
    }
    this.version = event.version;
  }
}

5. Construindo a camada de Comandos (CQRS - Write Side)

// src/application/commandBus/CommandBus.ts
export class CommandBus {
  private handlers: Map<string, CommandHandler> = new Map();

  register(commandType: string, handler: CommandHandler): void {
    this.handlers.set(commandType, handler);
  }

  async dispatch(command: Command): Promise<void> {
    const handler = this.handlers.get(command.commandType);
    if (!handler) throw new Error('Handler não encontrado');
    await handler.handle(command);
  }
}

// src/application/commandBus/handlers/ContaCommandHandler.ts
export class ContaCommandHandler implements CommandHandler {
  constructor(
    private eventStore: EventStore,
    private eventBus: EventBus
  ) {}

  async handle(command: Command): Promise<void> {
    switch (command.commandType) {
      case 'AbrirConta': {
        const conta = ContaBancaria.abrir(
          command.data.id,
          command.data.titular,
          command.data.saldoInicial
        );
        await this.eventStore.appendToStream(
          command.data.id,
          conta.getUncommittedChanges(),
          0
        );
        break;
      }
      case 'Depositar': {
        const conta = await this.eventStore.getAggregate(
          command.data.id,
          (events) => {
            const c = new ContaBancaria();
            c.loadFromHistory(events);
            return c;
          }
        );
        conta.depositar(command.data.valor);
        await this.eventStore.appendToStream(
          command.data.id,
          conta.getUncommittedChanges(),
          conta.version
        );
        break;
      }
    }
  }
}

6. Construindo a camada de Consultas (CQRS - Read Side)

// src/application/projections/ContaCorrenteProjection.ts
export class ContaCorrenteView {
  private saldos: Map<string, number> = new Map();

  apply(event: Event): void {
    switch (event.eventType) {
      case 'ContaCriada':
        this.saldos.set(event.aggregateId, event.data.saldoInicial);
        break;
      case 'DepositoRealizado':
        const saldoAtualDep = this.saldos.get(event.aggregateId) || 0;
        this.saldos.set(event.aggregateId, saldoAtualDep + event.data.valor);
        break;
      case 'SaqueRealizado':
        const saldoAtualSaq = this.saldos.get(event.aggregateId) || 0;
        this.saldos.set(event.aggregateId, saldoAtualSaq - event.data.valor);
        break;
    }
  }

  getSaldo(aggregateId: string): number {
    return this.saldos.get(aggregateId) || 0;
  }
}

// src/application/queryBus/QueryBus.ts
export class QueryBus {
  private handlers: Map<string, QueryHandler> = new Map();

  register(queryType: string, handler: QueryHandler): void {
    this.handlers.set(queryType, handler);
  }

  async dispatch<T>(query: Query): Promise<T> {
    const handler = this.handlers.get(query.queryType);
    if (!handler) throw new Error('Handler não encontrado');
    return handler.handle(query);
  }
}

7. Integração final e testes end-to-end

// src/infrastructure/eventBus/EventBus.ts
export class EventBus {
  private subscribers: Map<string, Function[]> = new Map();

  subscribe(eventType: string, handler: Function): void {
    const handlers = this.subscribers.get(eventType) || [];
    handlers.push(handler);
    this.subscribers.set(eventType, handlers);
  }

  publish(events: Event[]): void {
    for (const event of events) {
      const handlers = this.subscribers.get(event.eventType) || [];
      for (const handler of handlers) {
        handler(event);
      }
    }
  }
}

// Teste end-to-end
async function exemploCompleto() {
  const eventStore = new EventStore();
  const eventBus = new EventBus();
  const projection = new ContaCorrenteView();

  // Inscreve projeção nos eventos
  eventBus.subscribe('ContaCriada', (e) => projection.apply(e));
  eventBus.subscribe('DepositoRealizado', (e) => projection.apply(e));
  eventBus.subscribe('SaqueRealizado', (e) => projection.apply(e));

  const commandBus = new CommandBus();
  commandBus.register('AbrirConta', new ContaCommandHandler(eventStore, eventBus));
  commandBus.register('Depositar', new ContaCommandHandler(eventStore, eventBus));
  commandBus.register('Sacar', new ContaCommandHandler(eventStore, eventBus));

  // Fluxo completo
  await commandBus.dispatch({
    commandId: '1',
    commandType: 'AbrirConta',
    data: { id: '123', titular: 'João', saldoInicial: 1000 }
  });

  await commandBus.dispatch({
    commandId: '2',
    commandType: 'Depositar',
    data: { id: '123', valor: 500 }
  });

  await commandBus.dispatch({
    commandId: '3',
    commandType: 'Sacar',
    data: { id: '123', valor: 200 }
  });

  console.log('Saldo final:', projection.getSaldo('123')); // 1300

  // Replay de eventos para verificar consistência
  const events = await eventStore.readStream('123');
  const contaReplay = new ContaBancaria();
  contaReplay.loadFromHistory(events);
  console.log('Saldo via replay:', contaReplay['saldo']); // 1300
}

Este exemplo demonstra como CQRS com Event Sourcing oferece auditabilidade total, capacidade de replay e separação clara entre operações de escrita e leitura. A implementação em memória serve como base para testes, podendo ser substituída por Event Stores como EventStoreDB ou PostgreSQL em produção.

Referências