Como implementar change data capture (CDC) com Debezium e Kafka
1. Fundamentos do Change Data Capture (CDC)
Change Data Capture (CDC) é uma técnica de engenharia de dados que permite capturar e propagar alterações ocorridas em bancos de dados em tempo real. Em vez de realizar consultas periódicas (polling) ou depender de gatilhos complexos, o CDC observa diretamente o log de transações do banco, detectando inserts, updates e deletes no momento exato em que ocorrem.
Os casos de uso mais comuns incluem: sincronização entre bancos de dados replicados, alimentação de data lakes com dados atualizados, integração entre microsserviços e manutenção de caches consistentes. Existem três mecanismos principais de CDC:
- Log-based (WAL): Utiliza o Write-Ahead Log ou o log binário do banco. É o método mais eficiente e não intrusivo, pois não requer alterações na estrutura das tabelas.
- Trigger-based: Cria triggers no banco para capturar mudanças. Pode impactar a performance do banco de origem.
- Polling: Consulta a tabela periodicamente por registros modificados (usando timestamps ou flags). Simples, mas com latência maior.
A combinação Debezium + Kafka se destaca por oferecer conectores prontos para diversos bancos (MySQL, PostgreSQL, MongoDB, SQL Server), escalabilidade horizontal via Kafka, baixa latência (sub-segundos) e integração nativa com o ecossistema de streaming.
2. Arquitetura e Componentes da Solução
A arquitetura típica de uma solução CDC com Debezium e Kafka envolve:
- Kafka Connect (distributed mode): Framework que gerencia a execução dos conectores. O Debezium atua como um conector fonte (source connector) dentro do Kafka Connect.
- Debezium: Conector especializado que lê o log de transações do banco de dados e publica cada mudança como uma mensagem em tópicos Kafka.
- Apache Kafka: Barramento de eventos distribuído que armazena e entrega as mensagens de forma durável e ordenada.
- Consumidores downstream: Aplicações, data lakes ou outros sistemas que processam os eventos em tempo real.
O fluxo de dados segue esta sequência: banco de origem → log de transações → Debezium → tópicos Kafka → consumidores. Cada tabela monitorada gera um tópico Kafka específico, facilitando o roteamento.
3. Preparação do Ambiente de Infraestrutura
Para iniciar, vamos preparar o ambiente usando Docker Compose. Crie um arquivo docker-compose.yml:
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
connect:
image: debezium/connect:2.5
depends_on:
- kafka
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect-configs
OFFSET_STORAGE_TOPIC: connect-offsets
STATUS_STORAGE_TOPIC: connect-status
Para o banco de origem, ative o log binário no MySQL. Edite o arquivo my.cnf:
[mysqld]
server-id=1
log_bin=mysql-bin
binlog_format=row
binlog_row_image=full
expire_logs_days=7
Reinicie o MySQL e crie um usuário para o Debezium:
CREATE USER 'debezium'@'%' IDENTIFIED BY 'senha123';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;
4. Configuração do Conector Debezium
Com o ambiente rodando, registre o conector Debezium via REST API. Para MySQL, crie o arquivo mysql-connector.json:
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "debezium",
"database.password": "senha123",
"database.server.id": "184054",
"topic.prefix": "dbserver1",
"database.include.list": "minha_base",
"table.include.list": "minha_base.clientes,minha_base.pedidos",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.minha_base",
"snapshot.mode": "initial",
"tombstones.on.delete": "true"
}
}
Registre o conector com:
curl -X POST -H "Content-Type: application/json" --data @mysql-connector.json http://localhost:8083/connectors
Para PostgreSQL, use configuração similar:
{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "debezium",
"database.password": "senha123",
"database.dbname": "minha_base",
"topic.prefix": "dbserver1",
"table.include.list": "public.clientes,public.pedidos",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"snapshot.mode": "initial"
}
}
Parâmetros críticos:
- snapshot.mode: initial faz snapshot completo antes de capturar mudanças; schema_only captura apenas mudanças futuras.
- tombstones.on.delete: quando true, gera uma mensagem tombstone (valor nulo) após o delete, permitindo compactação de tópicos.
- database.history.kafka.topic: armazena o histórico de schemas para evolução.
5. Monitoramento e Gerenciamento de Eventos
Após o conector iniciar, inspecione os tópicos criados:
kafka-topics --bootstrap-server localhost:9092 --list
Você verá tópicos como dbserver1.minha_base.clientes e dbserver1.minha_base.pedidos. Consuma eventos com:
kafka-console-consumer --bootstrap-server localhost:9092 --topic dbserver1.minha_base.clientes --from-beginning
A estrutura de cada evento CDC segue o formato:
{
"schema": {
"type": "struct",
"fields": [
{"field": "before", "type": "struct", "fields": [...]},
{"field": "after", "type": "struct", "fields": [...]},
{"field": "source", "type": "struct", "fields": [...]},
{"field": "op", "type": "string"},
{"field": "ts_ms", "type": "int64"}
]
},
"payload": {
"before": null,
"after": {
"id": 1,
"nome": "João Silva",
"email": "joao@email.com"
},
"source": {
"version": "2.5.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1700000000000,
"snapshot": "false",
"db": "minha_base",
"table": "clientes"
},
"op": "c",
"ts_ms": 1700000000001
}
}
O campo op indica: c (create), u (update), d (delete), r (snapshot read). Monitore o status do conector:
curl http://localhost:8083/connectors/mysql-connector/status
6. Consumo e Processamento dos Streams de Dados
Consumimos os eventos com Kafka Streams (Java) para processamento em tempo real. Exemplo de aplicação que mantém um cache atualizado:
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
public class CDCConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "cdc-consumer");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("dbserver1.minha_base.clientes");
stream.foreach((key, value) -> {
try {
ObjectMapper mapper = new ObjectMapper();
JsonNode payload = mapper.readTree(value).get("payload");
String operacao = payload.get("op").asText();
JsonNode after = payload.get("after");
if ("c".equals(operacao) || "u".equals(operacao)) {
System.out.println("UPSERT: " + after.toString());
// Atualizar cache ou banco de dados destino
} else if ("d".equals(operacao)) {
System.out.println("DELETE: key=" + key);
// Remover do cache
}
} catch (Exception e) {
e.printStackTrace();
}
});
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
Para roteamento por tabela, use SMT (Single Message Transform) na configuração do conector:
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "(.*)",
"transforms.route.replacement": "cdc.$1"
7. Boas Práticas, Limitações e Troubleshooting
Boas práticas:
- Utilize tópicos com múltiplas partições (ex: 6-12) para paralelizar o consumo.
- Configure tasks.max igual ao número de partições para balanceamento.
- Use Avro com Schema Registry para evolução segura de schemas.
- Implemente idempotência nos consumidores para garantir exactly-once.
Limitações:
- Snapshot inicial pode ser lento para tabelas grandes (milhões de registros).
- Eventos de delete geram tombstones que exigem compactação de tópicos.
- Ordenação global não é garantida; apenas por chave primária dentro de uma partição.
Troubleshooting comum:
- Falha de conexão: verifique se o banco de origem permite conexões externas e se o log binário está ativo.
- Offset reset: utilize curl -X DELETE http://localhost:8083/connectors/mysql-connector/offsets para reiniciar o offset.
- Conflitos de schema: configure schema.history.internal.kafka.topic com cleanup.policy=compact e retention.ms=604800000 (7 dias).
- Recuperação pós-crash: o Debezium salva offsets no Kafka; ao reiniciar, ele retoma do último offset commitado.
Para verificar a saúde do conector em produção, monitore as métricas JMX (ex: debezium-mysql:type=connector-metrics,context=streaming) e configure alertas para MilliSecondsBehindSource > 5000 (5 segundos de atraso).
Referências
- Debezium Documentation - MySQL Connector — Documentação oficial do conector MySQL, com exemplos de configuração e parâmetros.
- Apache Kafka Connect Documentation — Guia oficial do Kafka Connect, incluindo modos standalone e distributed.
- Confluent Documentation - Debezium CDC Tutorial — Tutorial prático da Confluent sobre CDC com Debezium e Kafka em nuvem.
- Debezium Blog - CDC Patterns and Best Practices — Artigo técnico sobre padrões de CDC, incluindo SMTs e tratamento de eventos.
- Kafka Streams Documentation - Exactly-Once Semantics — Documentação oficial sobre semântica exactly-once no Kafka Streams, relevante para processamento confiável de eventos CDC.
- MySQL Binary Log Documentation — Documentação oficial do MySQL sobre ativação e configuração do log binário para CDC.
- Debezium GitHub Repository - Examples — Repositório oficial com exemplos completos de configuração e consumo de eventos CDC.