Programação reativa com RxJS e Reactor

1. Fundamentos da Programação Reativa

A programação reativa representa uma mudança paradigmática na forma como construímos sistemas que lidam com fluxos de dados assíncronos. Diferentemente da programação imperativa tradicional, onde o código executa sequencialmente e o programador controla explicitamente o fluxo, a abordagem reativa trata dados como streams contínuos que podem ser observados, transformados e combinados.

No modelo imperativo, uma requisição HTTP bloqueia a thread até obter resposta. No paradigma reativo, definimos um pipeline de processamento que reage automaticamente à chegada dos dados. O Manifesto Reativo estabelece quatro pilares fundamentais: responsividade (respostas rápidas), resiliência (recuperação de falhas), elasticidade (adaptação à carga) e orientação a mensagens (comunicação assíncrona).

// Imperativo tradicional
const resultado = dados.filter(item => item.ativo).map(item => item.nome);

// Reativo com RxJS
from(dados).pipe(
  filter(item => item.ativo),
  map(item => item.nome)
).subscribe(console.log);

2. RxJS: Reactive Extensions para JavaScript

RxJS implementa o padrão Observer com operadores funcionais. O trio central é composto por Observable (fonte de dados), Observer (consumidor) e Subscription (controle do ciclo de vida).

import { fromEvent, of } from 'rxjs';
import { map, debounceTime, switchMap } from 'rxjs/operators';

// Observable de eventos de input
const inputBusca = document.getElementById('busca');
const busca$ = fromEvent(inputBusca, 'input').pipe(
  map(event => event.target.value),
  debounceTime(300),
  switchMap(termo => buscarDados(termo))
);

busca$.subscribe(resultados => {
  atualizarUI(resultados);
});

Operadores essenciais:
- map: transforma cada valor emitido
- filter: seleciona valores que atendem a condição
- mergeMap: mapeia para múltiplos observables internos simultaneamente
- switchMap: cancela observable anterior ao receber novo valor
- debounceTime: aguarda pausa nas emissões antes de propagar

3. Reactor: Programação Reativa no Ecossistema Java

Reactor é a implementação reativa para JVM, baseada em Reactive Streams. Seus dois tipos principais são Mono (0 ou 1 elemento) e Flux (N elementos).

Flux<String> fluxoNomes = Flux.just("Ana", "Bruno", "Carla")
    .map(nome -> nome.toUpperCase())
    .filter(nome -> nome.startsWith("C"));

Mono<User> usuario = Mono.fromCallable(() -> buscarUsuario(1))
    .timeout(Duration.ofSeconds(5))
    .onErrorResume(error -> Mono.just(new User("anonimo")));

Exemplo completo com Spring WebFlux e R2DBC:

@RestController
public class ProdutoController {

    @GetMapping("/produtos")
    public Flux<Produto> listarProdutos() {
        return produtoRepository.findAll()
            .filter(produto -> produto.getPreco() > 100)
            .flatMap(produto -> calcularDesconto(produto))
            .doOnNext(produto -> log.info("Produto processado: {}", produto.getId()));
    }

    private Mono<Produto> calcularDesconto(Produto produto) {
        return Mono.just(produto)
            .map(p -> {
                p.setPreco(p.getPreco() * 0.9);
                return p;
            });
    }
}

4. Padrões Comuns em Fluxos Reativos

Backpressure: controla a velocidade de processamento quando produtor é mais rápido que consumidor.

// RxJS - backpressure com buffer
fromEvent(document, 'mousemove').pipe(
    bufferTime(200),
    filter(clicks => clicks.length > 0)
).subscribe(console.log);

// Reactor - backpressure com limitRate
Flux.range(1, 1000)
    .limitRate(10)
    .subscribe(System.out::println);

Tratamento de erros:

// RxJS
observable$.pipe(
    catchError(err => of(valorFallback)),
    retry(3)
);

// Reactor
fluxo
    .onErrorResume(ex -> Flux.just("fallback1", "fallback2"))
    .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)));

Combinação de fluxos:

// RxJS - combineLatest
const combinado$ = combineLatest([
    observable1$,
    observable2$
]).pipe(map(([val1, val2]) => val1 + val2));

// Reactor - zip
Flux.zip(fluxoA, fluxoB, (a, b) -> a + " " + b);

5. Comparação entre RxJS e Reactor

Ambas bibliotecas compartilham conceitos fundamentais: Observable/Flux, operadores transformadores e schedulers para controle de concorrência. As principais diferenças residem no ecossistema:

  • RxJS: tipagem dinâmica, ideal para frontend com Angular, manipulação de eventos DOM e requisições HTTP
  • Reactor: tipagem forte, integração nativa com Spring Boot, suporte a backpressure via Reactive Streams

RxJS oferece maior flexibilidade em cenários de UI, enquanto Reactor proporciona segurança de tipos e performance otimizada para servidores.

6. Integração com Frameworks Populares

Angular + RxJS:

@Injectable()
export class UserService {
  constructor(private http: HttpClient) {}

  getUsers(): Observable<User[]> {
    return this.http.get<User[]>('/api/users').pipe(
      map(users => users.filter(u => u.active)),
      catchError(error => {
        console.error('Erro ao buscar usuários', error);
        return of([]);
      })
    );
  }
}

Spring Boot + Reactor:

@Configuration
public class WebFluxConfig {
    @Bean
    public RouterFunction<ServerResponse> rotas() {
        return route()
            .GET("/api/eventos", request -> 
                ServerResponse.ok()
                    .contentType(MediaType.TEXT_EVENT_STREAM)
                    .body(fluxoEventos(), Evento.class))
            .build();
    }
}

7. Boas Práticas e Armadilhas Comuns

Evitando memory leaks:

// RxJS - gerenciamento de subscriptions
const subscription = observable$.subscribe();
subscription.unsubscribe(); // ou use takeUntil

// Reactor - evitar bloqueios
Mono.fromCallable(this::operacaoBloqueante)
    .subscribeOn(Schedulers.boundedElastic())
    .subscribe();

Testando código reativo:

// RxJS - marble testing
const source$ = cold('--a--b|', { a: 1, b: 2 });
const expected$ = cold('--A--B|', { A: 2, B: 4 });
expect(source$.pipe(map(x => x * 2))).toBeObservable(expected$);

// Reactor - StepVerifier
StepVerifier.create(fluxo)
    .expectNext("A", "B", "C")
    .expectComplete()
    .verify();

Debugging:

// RxJS
observable$.pipe(
    tap(console.log),
    map(transformar)
);

// Reactor
fluxo
    .doOnNext(item -> log.debug("Processando: {}", item))
    .flatMap(this::processar);

Referências

  • Documentação oficial do RxJS — Guia completo sobre Observables, operadores e práticas recomendadas para programação reativa em JavaScript
  • Reactor Reference Guide — Documentação oficial do Reactor com Mono, Flux, operadores e integração com Spring
  • Reactive Manifesto — Os princípios fundamentais da programação reativa: responsivo, resiliente, elástico e orientado a mensagens
  • Spring WebFlux Documentation — Guia oficial para construção de aplicações web reativas com Spring Boot e Reactor
  • RxJS Marble Testing Guide — Tutorial detalhado sobre testes com diagramas marble para verificar comportamento de streams
  • R2DBC Specification — Especificação para acesso reativo a bancos de dados relacionais, complementar ao Reactor em aplicações Spring