Logical replication: filtrando e transformando dados em trânsito

1. Fundamentos da Replicação Lógica

A replicação lógica no PostgreSQL é um mecanismo que permite copiar dados de um servidor (publisher) para um ou mais servidores (subscribers) com granularidade fina. Diferente da replicação física, que replica o banco inteiro no nível do disco, a replicação lógica opera no nível das transações e permite selecionar quais tabelas, linhas e colunas serão replicadas.

O fluxo básico é: o publisher escreve as alterações no Write-Ahead Log (WAL). Um processo de decodificação lógica converte essas alterações em um formato compreensível (como SQL ou JSON). O subscriber então aplica essas alterações em suas tabelas locais.

A configuração se dá através de dois objetos principais:

  • Publication: define o que será publicado (tabelas, operações, filtros)
  • Subscription: define onde e como os dados serão recebidos
-- No publisher:
CREATE PUBLICATION vendas_publication FOR TABLE vendas;

-- No subscriber:
CREATE SUBSCRIPTION vendas_subscription
CONNECTION 'host=publisher_host dbname=prod user=replicator'
PUBLICATION vendas_publication;

2. Filtragem de Dados na Publicação

Filtragem por tabela

Você pode publicar apenas um subconjunto de tabelas do banco:

-- Publica apenas tabelas específicas
CREATE PUBLICATION pub_clientes FOR TABLE clientes, pedidos, itens_pedido;

-- Publica todas as tabelas (menos comum, mas possível)
CREATE PUBLICATION pub_tudo FOR ALL TABLES;

Filtragem por operação

Por padrão, todas as operações (INSERT, UPDATE, DELETE, TRUNCATE) são replicadas. Você pode limitar:

-- Apenas inserts e updates (sem deletes)
CREATE PUBLICATION pub_auditoria FOR TABLE log_atividades
WITH (publish = 'insert, update');

Filtragem por colunas

Para evitar replicar colunas sensíveis ou desnecessárias:

-- Publica apenas nome e email, omitindo senha e cpf
CREATE PUBLICATION pub_usuarios_parcial FOR TABLE usuarios (id, nome, email);

3. Filtragem por Linhas com Expressões WHERE

Este é um dos recursos mais poderosos da replicação lógica: replicar apenas linhas que atendem a uma condição.

-- Replicar apenas pedidos da região Sul
CREATE PUBLICATION pub_pedidos_sul FOR TABLE pedidos
WHERE (regiao = 'SUL');

-- Replicar apenas clientes ativos
CREATE PUBLICATION pub_clientes_ativos FOR TABLE clientes
WHERE (status = 'ativo');

-- Replicar apenas vendas do mês corrente
CREATE PUBLICATION pub_vendas_mes FOR TABLE vendas
WHERE (data_venda >= date_trunc('month', CURRENT_DATE));

Limitações importantes:

  • A expressão WHERE deve usar apenas funções imutáveis (não pode depender de now() ou random())
  • Todas as colunas usadas no filtro precisam estar incluídas na publicação
  • O filtro é avaliado no momento da replicação, não no momento da inserção original
-- ERRADO: uso de função volátil
CREATE PUBLICATION pub_erro FOR TABLE vendas
WHERE (data_venda = CURRENT_DATE);  -- CURRENT_DATE é estável, mas não imutável

-- CORRETO: uso de função imutável
CREATE PUBLICATION pub_correto FOR TABLE vendas
WHERE (data_venda >= '2025-01-01');

4. Transformação de Dados com Triggers no Assinante

No lado do assinante, você pode usar triggers para transformar os dados antes de serem inseridos ou atualizados.

Mascaramento de dados sensíveis

-- No subscriber, criar trigger para mascarar CPF
CREATE OR REPLACE FUNCTION mascarar_cpf()
RETURNS TRIGGER AS $$
BEGIN
    NEW.cpf := substring(NEW.cpf, 1, 3) || '.***.***-**';
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_mascarar_cpf
BEFORE INSERT OR UPDATE ON clientes
FOR EACH ROW EXECUTE FUNCTION mascarar_cpf();

Normalização de dados

-- Converter todas as strings para maiúsculas no destino
CREATE OR REPLACE FUNCTION normalizar_nome()
RETURNS TRIGGER AS $$
BEGIN
    NEW.nome := UPPER(TRIM(NEW.nome));
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_normalizar_nome
BEFORE INSERT OR UPDATE ON clientes
FOR EACH ROW EXECUTE FUNCTION normalizar_nome();

Enriquecimento com dados locais

-- Adicionar timestamp de recebimento no assinante
CREATE OR REPLACE FUNCTION adicionar_metadados()
RETURNS TRIGGER AS $$
BEGIN
    NEW.recebido_em := NOW();
    NEW.origem := 'replicacao_logica';
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

5. Transformação com Funções de Decodificação Customizadas

O PostgreSQL padrão usa o plugin pgoutput para decodificar o WAL. Plugins alternativos como wal2json permitem transformações mais elaboradas.

Usando wal2json para transformar dados

-- No postgresql.conf do publisher
wal_level = logical
max_replication_slots = 5

-- Criar slot de replicação com wal2json
SELECT pg_create_logical_replication_slot('slot_json', 'wal2json');

Plugin de decodificação customizado (exemplo conceitual)

Para cenários avançados, você pode criar um plugin em C que transforma os dados antes de enviá-los. Isso é útil quando:

  • Você precisa normalizar dados antes da replicação (ex: converter fusos horários)
  • Quer reduzir o volume de dados enviados pela rede
  • Precisa aplicar regras de negócio complexas no publisher

Quando transformar no publisher vs. subscriber:

Cenário Publisher Subscriber
Reduzir tráfego de rede
Manter dados originais no publisher
Regra de negócio global
Regra específica do subscriber

6. Estratégias de Sincronização e Consistência

Tratamento de conflitos

Conflitos ocorrem quando o subscriber tenta inserir uma chave que já existe ou atualizar um registro divergente.

-- Verificar conflitos no subscriber
SELECT * FROM pg_stat_subscription_workers;

-- Opções de resolução:
-- 1. Ignorar conflitos (padrão)
ALTER SUBSCRIPTION sub_vendas SET (failover = true);

-- 2. Replicar novamente a partir de um LSN específico
ALTER SUBSCRIPTION sub_vendas REFRESH PUBLICATION;

Monitoramento de lag

-- Verificar lag da replicação
SELECT
    slot_name,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_bytes,
    ROUND(EXTRACT(EPOCH FROM (NOW() - confirmed_flush_lsn::text::timestamp))) AS lag_segundos
FROM pg_replication_slots
WHERE slot_type = 'logical';

Reaplicação após falha

-- Forçar reaplicação de todas as alterações pendentes
ALTER SUBSCRIPTION sub_vendas ENABLE;

-- Em caso de corrupção, recriar a subscription
DROP SUBSCRIPTION IF EXISTS sub_vendas;
CREATE SUBSCRIPTION sub_vendas
CONNECTION 'host=...'
PUBLICATION pub_vendas;

7. Casos de Uso Avançados e Boas Práticas

Replicação seletiva para staging/teste

-- Publisher: publica apenas dados de teste
CREATE PUBLICATION pub_staging FOR TABLE clientes, pedidos
WHERE (ambiente = 'staging');

Data warehouse incremental

-- Replicar apenas inserts para o DW
CREATE PUBLICATION pub_dw_incremental FOR TABLE transacoes
WITH (publish = 'insert');

Segurança com filtros e transformações

-- Publisher: filtra colunas sensíveis
CREATE PUBLICATION pub_segura FOR TABLE funcionarios (id, nome, cargo)
WHERE (cargo != 'DIRETOR');

-- Subscriber: mascara dados remanescentes
CREATE TRIGGER trg_mascarar_salario
BEFORE INSERT ON funcionarios
FOR EACH ROW EXECUTE FUNCTION mascarar_salario();

Boas práticas finais:

  1. Sempre teste filtros e transformações em ambiente de homologação
  2. Monitore o lag de replicação com alertas automáticos
  3. Documente todas as publicações e subscriptions criadas
  4. Use schemas separados para dados replicados quando possível
  5. Evite triggers no publisher que possam causar loops de replicação

Referências