Como implementar workflows de aprovação assíncronos com máquinas de estado persistentes

1. Fundamentos de workflows de aprovação assíncronos

1.1. Conceitos básicos: eventos, estados, transições e ações

Workflows de aprovação assíncronos são sistemas onde o fluxo de decisão não ocorre em tempo real, permitindo que participantes ajam em momentos distintos. Os elementos fundamentais incluem:

  • Eventos: estímulos externos ou internos que disparam transições (ex: pedido_submetido, aprovador_designado)
  • Estados: condições estáveis do workflow (ex: pendente, em_analise)
  • Transições: movimentos válidos entre estados, acionados por eventos
  • Ações: efeitos colaterais executados durante transições (ex: notificar_aprovador)

1.2. Diferenças entre workflows síncronos e assíncronos

Característica Workflow Síncrono Workflow Assíncrono
Resiliência Baixa — falha interrompe processo Alta — estados persistidos permitem retomada
Escalabilidade Limitada por bloqueios Alta — processamento paralelo via filas
Tolerância a falhas Baixa — perda de contexto Alta — recuperação via replay

1.3. Casos de uso típicos

  • Aprovação de pedidos de compra com múltiplos níveis hierárquicos
  • Revisão de documentos com prazos de expiração
  • Liberação de deploys em ambientes de produção
  • Fluxos de compliance com auditoria obrigatória

2. Modelagem da máquina de estados persistente

2.1. Definição dos estados do workflow

ESTADOS_VALIDOS = {
    "pendente": "Aguardando submissão inicial",
    "em_analise": "Sob revisão do aprovador",
    "aprovado": "Aprovado sem ressalvas",
    "rejeitado": "Rejeitado pelo aprovador",
    "cancelado": "Cancelado pelo solicitante",
    "expirado": "Prazo de análise esgotado"
}

2.2. Mapeamento de transições permitidas e regras de guarda

TRANSICOES = {
    "pendente": {
        "submeter": { "destino": "em_analise", "guarda": "usuario_tem_permissao" }
    },
    "em_analise": {
        "aprovar": { "destino": "aprovado", "guarda": "aprovador_autorizado" },
        "rejeitar": { "destino": "rejeitado", "guarda": "motivo_obrigatorio" },
        "cancelar": { "destino": "cancelado", "guarda": "solicitante_ou_admin" },
        "expiracao": { "destino": "expirado", "guarda": "prazo_excedido" }
    }
}

2.3. Representação formal: tabela de transições

| Estado Atual   | Evento       | Guarda                | Estado Destino | Ação                  |
|----------------|--------------|-----------------------|----------------|-----------------------|
| pendente       | submeter     | permissao_valida      | em_analise     | notificar_aprovador   |
| em_analise     | aprovar      | aprovador_autorizado  | aprovado       | liberar_pedido        |
| em_analise     | rejeitar     | motivo_preenchido     | rejeitado      | notificar_solicitante |
| em_analise     | expiracao    | timeout_atingido      | expirado       | escalar_para_gestor   |

3. Persistência do estado: armazenamento e recuperação

3.1. Estratégias de persistência

-- Tabela relacional para armazenamento de estados
CREATE TABLE workflow_estados (
    workflow_id UUID PRIMARY KEY,
    estado_atual VARCHAR(20) NOT NULL,
    versao INTEGER NOT NULL DEFAULT 1,
    dados_json JSONB,
    criado_em TIMESTAMP DEFAULT NOW(),
    atualizado_em TIMESTAMP DEFAULT NOW()
);

CREATE TABLE workflow_eventos (
    evento_id UUID PRIMARY KEY,
    workflow_id UUID REFERENCES workflow_estados(workflow_id),
    tipo_evento VARCHAR(50) NOT NULL,
    payload JSONB,
    criado_em TIMESTAMP DEFAULT NOW()
);

3.2. Versionamento de estado e concorrência

-- Atualização com optimistic locking
UPDATE workflow_estados 
SET estado_atual = 'em_analise', 
    versao = versao + 1,
    atualizado_em = NOW()
WHERE workflow_id = $1 AND versao = $2
RETURNING versao;

3.3. Recuperação de estado após falhas

funcao recuperar_estado(workflow_id):
    estado_atual = buscar_ultimo_snapshot(workflow_id)
    eventos_pendentes = buscar_eventos_apos_snapshot(workflow_id, estado_atual.versao)

    para cada evento em eventos_pendentes:
        estado_atual = aplicar_evento(estado_atual, evento)

    retornar estado_atual

4. Implementação do motor de workflow assíncrono

4.1. Estrutura do motor

class MotorWorkflow:
    def __init__(self, fila_eventos, repositorio_estados):
        self.fila = fila_eventos
        self.repositorio = repositorio_estados
        self.executores = {}

    def processar_evento(self, evento):
        workflow = self.repositorio.carregar(evento.workflow_id)
        transicao = workflow.obter_transicao(evento.tipo)

        if transicao.guarda_valida():
            with self.repositorio.lock(workflow.id):
                workflow.aplicar_transicao(transicao)
                self.despachar_acoes(transicao.acoes)
                self.repositorio.salvar(workflow)

4.2. Uso de filas e mensageria

# Publicação de evento no Kafka
producer.send('workflow-eventos', {
    'workflow_id': 'abc-123',
    'tipo_evento': 'submeter',
    'payload': {'usuario': 'joao', 'pedido_id': 456},
    'timestamp': datetime.now().isoformat()
})

# Consumidor processando eventos
@consumer(topic='workflow-eventos', group_id='motor-workflow')
def processar_evento(evento):
    motor.processar_evento(evento)

4.3. Tratamento de timeouts e expiração de estados

# Scheduler para verificar expirações
agendar_tarefa_recorrente(intervalo=60)  # a cada minuto
def verificar_expirados():
    workflows_expirados = buscar_workflows_em_analise_com_prazo_vencido()
    for wf in workflows_expirados:
        publicar_evento({
            'workflow_id': wf.id,
            'tipo_evento': 'expiracao',
            'payload': {'motivo': 'prazo_excedido'}
        })

5. Notificações e ações em cada transição

5.1. Gatilhos para notificações

ACOES_POR_TRANSICAO = {
    'pendente->em_analise': [
        {'tipo': 'email', 'destinatario': 'aprovador', 'template': 'nova_solicitacao'},
        {'tipo': 'webhook', 'url': 'https://api.exemplo.com/notificar'}
    ],
    'em_analise->aprovado': [
        {'tipo': 'email', 'destinatario': 'solicitante', 'template': 'aprovado'},
        {'tipo': 'push', 'canal': 'mobile_app'}
    ]
}

5.2. Execução de ações colaterais

def executar_acoes_colaterais(workflow, transicao):
    for acao in transicao.acoes:
        if acao.tipo == 'atualizar_sistema_externo':
            chamar_api_externa(acao.url, workflow.dados)
        elif acao.tipo == 'registrar_auditoria':
            inserir_log_auditoria(workflow, transicao)

5.3. Políticas de compensação

def compensar_acoes(workflow, motivo_rejeicao):
    if workflow.estado_atual == 'rejeitado':
        # Reverter ações executadas durante aprovação
        reverter_reserva_estoque(workflow.pedido_id)
        liberar_credito_cliente(workflow.cliente_id)

6. Tratamento de concorrência e consistência eventual

6.1. Garantia de idempotência em transições

funcao aplicar_evento_se_unico(workflow_id, evento_id, transicao):
    if not evento_ja_processado(evento_id):
        with lock_transacional(workflow_id):
            workflow = carregar_workflow(workflow_id)
            workflow.aplicar(transicao)
            marcar_evento_processado(evento_id)
            salvar_workflow(workflow)

6.2. Consistência eventual vs consistência forte

# Abordagem de consistência eventual com filas
# Vantagens: alta disponibilidade, baixa latência
# Desvantagens: leituras podem retornar estado obsoleto por segundos

def ler_workflow_consistente(workflow_id):
    # Tenta cache primeiro, depois banco
    estado_cache = cache.get(f'workflow:{workflow_id}')
    if estado_cache:
        return estado_cache
    return banco.buscar(workflow_id)

6.3. Resolução de conflitos com saga patterns

class SagaCoordenador:
    def executar_saga(self, workflow_id, passos):
        passos_executados = []
        try:
            for passo in passos:
                passo.executar()
                passos_executados.append(passo)
        except Exception as e:
            # Compensar passos executados em ordem reversa
            for passo in reversed(passos_executados):
                passo.compensar()
            raise

7. Monitoramento e observabilidade do workflow

7.1. Métricas-chave

# Métricas Prometheus
workflow_tempo_aprovacao_seconds = Histogram(
    'workflow_tempo_aprovacao_seconds',
    'Tempo entre submissão e aprovação',
    buckets=[60, 300, 600, 1800, 3600]
)

workflow_taxa_rejeicao = Counter(
    'workflow_transicoes_total',
    'Total de transições por resultado',
    ['estado_destino']
)

7.2. Logging estruturado e rastreamento distribuído

import structlog

logger = structlog.get_logger()

def processar_evento(evento):
    logger.info("processando_evento",
                workflow_id=evento.workflow_id,
                tipo=evento.tipo,
                trace_id=evento.trace_id,
                usuario=evento.payload.get('usuario'))

7.3. Health checks específicos

@app.route('/health/workflow-motor')
def health_check():
    status = {
        'fila_consumidores': verificar_consumidores_ativos(),
        'repositorio_estados': verificar_conexao_banco(),
        'scheduler_timeouts': verificar_scheduler_ativo()
    }
    return status, 200 if all(status.values()) else 503

8. Exemplo prático: workflow de aprovação de pedidos

8.1. Implementação passo a passo

# Definição completa do workflow
workflow_pedido = {
    'estados': ['pendente', 'em_analise', 'aprovado', 'rejeitado', 'cancelado', 'expirado'],
    'transicoes': {
        ('pendente', 'submeter'): {
            'destino': 'em_analise',
            'guardas': ['pedido_valido', 'limite_credito_suficiente'],
            'acoes': ['notificar_aprovador', 'reservar_estoque']
        },
        ('em_analise', 'aprovar'): {
            'destino': 'aprovado',
            'guardas': ['aprovador_autorizado'],
            'acoes': ['confirmar_pedido', 'notificar_cliente', 'cobrar_cartao']
        },
        ('em_analise', 'rejeitar'): {
            'destino': 'rejeitado',
            'guardas': ['motivo_preenchido'],
            'acoes': ['notificar_cliente_rejeicao', 'liberar_estoque']
        }
    },
    'timeout': {
        'em_analise': {'duracao': '24h', 'acao': 'escalar_para_gestor'}
    }
}

8.2. Código da máquina de estados

class MaquinaEstadosWorkflow:
    def __init__(self, workflow_id, definicao, repositorio):
        self.id = workflow_id
        self.definicao = definicao
        self.repositorio = repositorio
        self.estado_atual = 'pendente'
        self.versao = 1
        self.historico = []

    def processar_evento(self, evento):
        transicao = self.definicao['transicoes'].get((self.estado_atual, evento.tipo))
        if not transicao:
            raise TransicaoInvalida(f"{self.estado_atual} -> {evento.tipo}")

        if not self._validar_guardas(transicao['guardas'], evento):
            raise GuardaFalhou("Condições não atendidas")

        estado_anterior = self.estado_atual
        self.estado_atual = transicao['destino']
        self.versao += 1
        self.historico.append({
            'de': estado_anterior,
            'para': self.estado_atual,
            'evento': evento.tipo,
            'timestamp': datetime.now()
        })

        self._executar_acoes(transicao['acoes'], evento)
        self.repositorio.salvar(self)

        return self.estado_atual

# Uso do motor
motor = MotorWorkflow(
    fila=KafkaProducer(bootstrap_servers='localhost:9092'),
    repositorio=RepositorioPostgres(connection_string='dbname=workflows')
)

# Submeter novo pedido
motor.processar_evento(Evento(
    workflow_id='wf-001',
    tipo='submeter',
    payload={'pedido_id': 123, 'valor': 5000, 'cliente': 'Maria'}
))

8.3. Testes de integração e cenários de falha

def test_workflow_aprovacao_completo():
    # Cenário: aprovação bem-sucedida
    wf = MaquinaEstadosWorkflow('test-1', workflow_pedido, repositorio_mock)

    assert wf.processar_evento(Evento('submeter', {'pedido_id': 1})) == 'em_analise'
    assert wf.processar_evento(Evento('aprovar', {'aprovador': 'joao'})) == 'aprovado'

    # Cenário: rejeição com compensação
    wf2 = MaquinaEstadosWorkflow('test-2', workflow_pedido, repositorio_mock)
    wf2.processar_evento(Evento('submeter', {'pedido_id': 2}))

    with patch('api_externa.liberar_estoque') as mock_liberar:
        wf2.processar_evento(Evento('rejeitar', {'motivo': 'sem_estoque'}))
        assert wf2.estado_atual == 'rejeitado'
        mock_liberar.assert_called_once()

    # Cenário: timeout e expiração
    with freeze_time('2024-01-01 10:00:00'):
        wf3 = MaquinaEstadosWorkflow('test-3', workflow_pedido, repositorio_mock)
        wf3.processar_evento(Evento('submeter', {'pedido_id': 3}))

    with freeze_time('2024-01-02 11:00:00'):  # Após 25h
        wf3.processar_evento(Evento('expiracao', {}))
        assert wf3.estado_atual == 'expirado'

Referências

event sourcing, base para implementação de máquinas de estado persistentes com replay de eventos
- Microsoft — Saga Pattern — Guia prático sobre implementação de sagas para gerenciamento de transações distribuídas em workflows assíncronos
- Redis — Streams Documentation — Documentação sobre uso de streams como fila de eventos para workflows assíncronos
- PostgreSQL — Advisory Locks — Técnica de locking otimista para controle de concorrência em persistência de estados

Conclusão

Implementar workflows de aprovação assíncronos com máquinas de estado persistentes é uma abordagem robusta para lidar com processos longos, distribuídos e sujeitos a falhas. A combinação de modelagem formal de estados, persistência resiliente, filas de mensagens e tratamento cuidadoso de concorrência permite construir sistemas que escalam horizontalmente, recuperam-se automaticamente de falhas e mantêm consistência eventual.

Os principais aprendizados deste artigo são:

  1. Modelagem clara: Definir estados, transições, guardas e ações de forma explícita é essencial para evitar comportamentos inesperados.
  2. Persistência adequada: Escolher entre banco relacional, event store ou banco de documentos depende dos requisitos de consistência e volume de dados.
  3. Assincronia com filas: Kafka, RabbitMQ ou Redis Streams desacoplam os componentes e permitem processamento escalável.
  4. Tratamento de falhas: Timeouts, retentativas com backoff, sagas e compensações garantem resiliência mesmo em cenários adversos.
  5. Observabilidade: Métricas, logs estruturados e health checks são fundamentais para operar o motor de workflow em produção.

Com esses conceitos e exemplos práticos, você está preparado para implementar seu próprio motor de workflow assíncrono, adaptando-o às necessidades específicas do seu domínio de negócio.