Filas de tarefas com Bull e Redis

1. Introdução às filas de tarefas no Node.js

No desenvolvimento de aplicações modernas, é comum enfrentarmos situações onde operações síncronas simplesmente não são viáveis. Processamento de imagens, envio de emails em massa, geração de relatórios complexos ou integrações com APIs externas podem bloquear o event loop do Node.js e degradar a experiência do usuário.

Filas de tarefas resolvem esses problemas ao permitir que operações pesadas sejam executadas de forma assíncrona, em segundo plano. Os conceitos fundamentais são simples: um produtor adiciona jobs (tarefas) a uma fila, e um ou mais workers (consumidores) processam esses jobs quando possível.

Por que Bull + Redis? O Bull é a biblioteca mais robusta para filas no ecossistema Node.js. Aliado ao Redis, oferece:
- Performance: Redis é um banco em memória extremamente rápido
- Persistência: jobs sobrevivem a reinicializações da aplicação
- Recursos avançados: agendamento, prioridades, retry automático, rate limiting

2. Configuração do ambiente e instalação

Primeiro, precisamos do Redis em execução. A forma mais prática é com Docker:

docker run -d -p 6379:6379 --name redis-bull redis:7-alpine

Verifique se está funcionando:

docker exec redis-bull redis-cli ping
# Resposta esperada: PONG

Agora, crie um novo projeto Node.js e instale o Bull:

mkdir filas-com-bull
cd filas-com-bull
npm init -y
npm install bull express socket.io

Estrutura inicial do projeto:

// queue.js
const Bull = require('bull');

const emailQueue = new Bull('email', {
  redis: {
    host: 'localhost',
    port: 6379
  }
});

module.exports = emailQueue;

3. Criando e gerenciando filas com Bull

Instanciar uma fila é simples. O Bull gerencia automaticamente a conexão com Redis:

const taskQueue = new Bull('tarefas', {
  redis: { host: 'localhost', port: 6379 }
});

Para adicionar jobs, use o método add(). Você pode passar qualquer objeto serializável em JSON:

async function adicionarJob() {
  const job = await taskQueue.add(
    { 
      userId: 123, 
      tipo: 'processar_imagem',
      caminho: '/uploads/foto.jpg'
    },
    {
      delay: 5000,           // Espera 5 segundos antes de processar
      attempts: 3,           // Tenta 3 vezes em caso de falha
      removeOnComplete: true // Remove após concluir
    }
  );

  console.log(`Job ${job.id} adicionado`);
}

4. Processamento de jobs com workers

O worker é a função que realmente executa o trabalho. Cada job passa por estados: waitingactivecompleted ou failed.

taskQueue.process(async (job) => {
  console.log(`Processando job ${job.id}: ${job.data.tipo}`);

  // Simulando processamento pesado
  await new Promise(resolve => setTimeout(resolve, 3000));

  // Relata progresso (0-100)
  await job.progress(50);

  // Continua processando...

  return { resultado: 'sucesso' };
});

Para controle de concorrência, especifique quantos jobs o worker pode processar simultaneamente:

// Processa até 5 jobs simultaneamente
taskQueue.process(5, async (job) => {
  // ... lógica do worker
});

Tratamento de erros e retry automático:

taskQueue.process(async (job) => {
  try {
    const resultado = await operacaoArriscada(job.data);
    return resultado;
  } catch (error) {
    // Se falhar, o Bull tentará novamente baseado na opção 'attempts'
    throw error;
  }
});

5. Eventos e monitoramento da fila

O Bull emite eventos que permitem monitorar cada aspecto da fila:

taskQueue.on('completed', (job, result) => {
  console.log(`Job ${job.id} concluído com resultado:`, result);
});

taskQueue.on('failed', (job, err) => {
  console.error(`Job ${job.id} falhou:`, err.message);
});

taskQueue.on('progress', (job, progress) => {
  console.log(`Job ${job.id} está em ${progress}%`);
});

taskQueue.on('waiting', (jobId) => {
  console.log(`Job ${jobId} aguardando processamento`);
});

6. Funcionalidades avançadas

Agendamento com cron:

const relatorioQueue = new Bull('relatorios');

// Executa toda segunda-feira às 8h
await relatorioQueue.add(
  { tipo: 'relatorio_semanal' },
  { repeat: { cron: '0 8 * * 1' } }
);

Prioridades entre jobs:

await taskQueue.add({ tipo: 'urgente' }, { priority: 1 });
await taskQueue.add({ tipo: 'normal' }, { priority: 10 });

Limpeza automática de jobs antigos:

// Remove jobs completados há mais de 24 horas
await taskQueue.clean(24 * 60 * 60 * 1000, 'completed');

// Ou configure na criação do job
await taskQueue.add(data, { removeOnComplete: { age: 86400 } });

Rate limiting para controlar throughput:

const taskQueue = new Bull('tarefas', {
  limiter: {
    max: 100,    // máximo de jobs
    duration: 1000 // por segundo
  }
});

7. Integração com React: interface de gerenciamento

Vamos criar um backend Express que expõe endpoints para gerenciar a fila:

// server.js
const express = require('express');
const http = require('http');
const socketIo = require('socket.io');
const taskQueue = require('./queue');

const app = express();
const server = http.createServer(app);
const io = socketIo(server, { cors: { origin: '*' } });

app.use(express.json());

// Endpoint para adicionar job
app.post('/jobs', async (req, res) => {
  const { tipo, dados } = req.body;
  const job = await taskQueue.add({ tipo, ...dados });
  res.json({ jobId: job.id });
});

// Endpoint para listar jobs
app.get('/jobs', async (req, res) => {
  const waiting = await taskQueue.getWaiting();
  const active = await taskQueue.getActive();
  const completed = await taskQueue.getCompleted();
  const failed = await taskQueue.getFailed();

  res.json({ waiting, active, completed, failed });
});

// WebSocket para atualizações em tempo real
taskQueue.on('completed', (job, result) => {
  io.emit('job-completed', { jobId: job.id, result });
});

taskQueue.on('progress', (job, progress) => {
  io.emit('job-progress', { jobId: job.id, progress });
});

server.listen(3001, () => console.log('API rodando na porta 3001'));

Componente React para exibir os jobs em tempo real:

// JobDashboard.jsx
import React, { useState, useEffect } from 'react';
import io from 'socket.io-client';

const socket = io('http://localhost:3001');

function JobDashboard() {
  const [jobs, setJobs] = useState([]);
  const [formData, setFormData] = useState({ tipo: '', dados: '' });

  useEffect(() => {
    socket.on('job-completed', (data) => {
      setJobs(prev => [...prev, { id: data.jobId, status: 'completed', ...data }]);
    });

    return () => socket.disconnect();
  }, []);

  const handleSubmit = async (e) => {
    e.preventDefault();
    await fetch('http://localhost:3001/jobs', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(formData)
    });
  };

  return (
    <div>
      <h2>Gerenciador de Filas</h2>

      <form onSubmit={handleSubmit}>
        <input
          type="text"
          placeholder="Tipo do job"
          value={formData.tipo}
          onChange={(e) => setFormData({...formData, tipo: e.target.value})}
        />
        <button type="submit">Adicionar Job</button>
      </form>

      <table>
        <thead>
          <tr>
            <th>ID</th>
            <th>Status</th>
            <th>Progresso</th>
          </tr>
        </thead>
        <tbody>
          {jobs.map(job => (
            <tr key={job.id}>
              <td>{job.id}</td>
              <td>{job.status}</td>
              <td>{job.progress || 0}%</td>
            </tr>
          ))}
        </tbody>
      </table>
    </div>
  );
}

export default JobDashboard;

8. Boas práticas e troubleshooting

Tratamento de falhas no Redis:

const queue = new Bull('minha-fila', {
  redis: {
    host: 'localhost',
    port: 6379,
    maxRetriesPerRequest: 3,
    retryStrategy: (times) => Math.min(times * 100, 3000)
  }
});

queue.on('error', (error) => {
  console.error('Erro na conexão Redis:', error.message);
});

Testes unitários com Jest:

// queue.test.js
const Queue = require('bull');

jest.mock('bull', () => {
  return jest.fn().mockImplementation(() => ({
    add: jest.fn().mockResolvedValue({ id: 'mock-job-123' }),
    process: jest.fn(),
    on: jest.fn()
  }));
});

test('deve adicionar um job na fila', async () => {
  const queue = new Queue();
  const job = await queue.add({ tarefa: 'teste' });
  expect(job.id).toBe('mock-job-123');
});

Monitoramento com Bull Board:

npm install @bull-board/express
const { createBullBoard } = require('@bull-board/api');
const { BullAdapter } = require('@bull-board/api/bullAdapter');
const { ExpressAdapter } = require('@bull-board/express');

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');

createBullBoard({
  queues: [new BullAdapter(taskQueue)],
  serverAdapter
});

app.use('/admin/queues', serverAdapter.getRouter());

Considerações de deploy:
- Em produção, use Redis Cluster ou ElastiCache para alta disponibilidade
- Separe workers em processos ou containers independentes
- Configure variáveis de ambiente para conexão Redis
- Implemente health checks periódicos

Referências