Debezium CDC数据变更捕获:从MySQL binlog到Kafka的数据同步
Debezium CDC数据变更捕获:从MySQL binlog到Kafka的数据同步
适读人群:需要实时同步数据库变更到Kafka、构建数据管道的Java工程师 | 阅读时长:约16分钟
开篇故事
数据同步是每个业务系统迟早要面对的需求。
我们有一个老的订单数据库,需要把订单表的数据实时同步到搜索引擎(Elasticsearch)和数据仓库(ClickHouse)。最初的方案是在订单Service里,每次写DB之后同时调用ES接口和ClickHouse写入接口。
这个方案很脆弱:ES或ClickHouse任一超时都可能影响主流程,每次新增一个下游同步目标都要改业务代码,还要处理三者的一致性(DB写成功了ES写失败怎么办)。
换成Debezium CDC之后,整个架构变成了:只要MySQL binlog有变更,Debezium自动捕获并发送到Kafka,ES和ClickHouse各自从Kafka消费,业务代码完全不需要改。数据一致性由Kafka保证,新增同步目标只需要加消费者,对业务零侵入。
今天把Debezium CDC的原理和完整配置讲清楚。
一、CDC的核心原理
CDC(Change Data Capture,变更数据捕获)通过监听数据库的事务日志(MySQL是binlog)来捕获数据变更,不对业务数据库性能产生显著影响(只读binlog)。
Debezium伪装成MySQL的一个Slave节点,通过MySQL复制协议读取binlog,把每行数据的INSERT/UPDATE/DELETE变更解析成结构化的JSON消息,发送到Kafka。
二、Debezium消息格式
每条Debezium消息包含变更前(before)和变更后(after)的完整数据:
{
"schema": {...},
"payload": {
"before": { // 变更前(INSERT时为null)
"id": 1001,
"order_no": "O202401001",
"status": "CREATED"
},
"after": { // 变更后(DELETE时为null)
"id": 1001,
"order_no": "O202401001",
"status": "PAID"
},
"source": {
"db": "order_db",
"table": "orders",
"ts_ms": 1706234567890, // binlog时间戳(毫秒)
"pos": 456789, // binlog位置
"file": "mysql-bin.000001" // binlog文件名
},
"op": "u", // 操作类型:c=create, u=update, d=delete, r=read(snapshot)
"ts_ms": 1706234567900 // Debezium处理时间
}
}三、完整部署配置
3.1 MySQL端准备
-- 1. 确保MySQL开启了binlog(Row模式)
-- my.cnf 配置:
-- server-id = 1
-- log_bin = mysql-bin
-- binlog_format = ROW -- 必须是ROW模式,不能是STATEMENT
-- binlog_row_image = FULL -- 记录完整行数据
-- expire_logs_days = 7
-- 2. 创建Debezium专用用户(最小权限)
CREATE USER 'debezium'@'%' IDENTIFIED BY 'debezium_password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT
ON *.* TO 'debezium'@'%';
-- 3. 如果需要从指定位置开始同步(增量模式)
SHOW MASTER STATUS;
-- 记录 File 和 Position 的值3.2 Debezium Connector配置
// POST http://kafka-connect:8083/connectors
// Content-Type: application/json
{
"name": "order-db-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
// MySQL连接配置
"database.hostname": "mysql-master",
"database.port": "3306",
"database.user": "debezium",
"database.password": "debezium_password",
"database.server.id": "184054", // 唯一的MySQL服务器ID
"database.server.name": "order-db", // 作为Kafka Topic前缀
// 监听的数据库和表
"database.include.list": "order_db",
"table.include.list": "order_db.orders,order_db.order_items",
// Kafka Schema Registry(用于Schema管理)
"schema.history.internal.kafka.bootstrap.servers": "kafka1:9092",
"schema.history.internal.kafka.topic": "schema-changes.order-db",
// 快照配置:initial=首次启动做全量快照,never=跳过快照只读增量
"snapshot.mode": "initial",
// 消息格式
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
// 时间类型处理
"time.precision.mode": "connect",
"temporal.precision.mode": "adaptive_time_microseconds",
// 心跳(防止binlog位置长时间不更新导致截断)
"heartbeat.interval.ms": "5000",
"heartbeat.topics.prefix": "__debezium-heartbeat",
// 并发(Kafka Connect任务数)
"tasks.max": "1" // MySQL Connector只支持1个任务
}
}3.3 Java消费者处理Debezium消息
/**
* Debezium CDC消息消费者
* 将数据库变更同步到Elasticsearch
*/
@Component
@Slf4j
public class OrderCDCConsumer {
private final ElasticsearchClient esClient;
private final ObjectMapper objectMapper;
public OrderCDCConsumer(ElasticsearchClient esClient, ObjectMapper objectMapper) {
this.esClient = esClient;
this.objectMapper = objectMapper;
}
/**
* 消费Debezium生成的变更消息
* Topic名规则:{server.name}.{database}.{table}
* 本例:order-db.order_db.orders
*/
@KafkaListener(
topics = "order-db.order_db.orders",
groupId = "es-cdc-sync-group",
containerFactory = "cdcContainerFactory"
)
public void consumeOrderChange(ConsumerRecord<String, String> record,
Acknowledgment acknowledgment) {
// 消息Key是变更行的主键(JSON格式)
// 消息Value是变更内容(before + after)
String key = record.key();
String value = record.value();
if (value == null) {
// Debezium墓碑消息(DELETE之后会发送一条null value消息)
// 用于支持Log Compaction
acknowledgment.acknowledge();
return;
}
try {
JsonNode payload = objectMapper.readTree(value);
String op = payload.path("op").asText();
JsonNode after = payload.path("after");
JsonNode before = payload.path("before");
switch (op) {
case "c": // CREATE
handleInsert(after, acknowledgment);
break;
case "u": // UPDATE
handleUpdate(before, after, acknowledgment);
break;
case "d": // DELETE
handleDelete(before, acknowledgment);
break;
case "r": // READ(snapshot时的全量读)
handleSnapshot(after, acknowledgment);
break;
default:
log.warn("未知的操作类型: op={}", op);
acknowledgment.acknowledge();
}
} catch (Exception e) {
log.error("处理CDC消息失败: key={}, offset={}",
key, record.offset(), e);
// 失败不提交,等待重试
throw new RuntimeException("CDC消息处理失败", e);
}
}
private void handleInsert(JsonNode after, Acknowledgment ack) throws Exception {
String orderId = after.path("id").asText();
OrderDocument doc = parseOrderDoc(after);
esClient.index(i -> i
.index("orders")
.id(orderId)
.document(doc)
);
log.info("ES索引创建: orderId={}", orderId);
ack.acknowledge();
}
private void handleUpdate(JsonNode before, JsonNode after,
Acknowledgment ack) throws Exception {
String orderId = after.path("id").asText();
String oldStatus = before.path("status").asText();
String newStatus = after.path("status").asText();
// 部分更新(只更新变化的字段)
Map<String, Object> updateFields = new HashMap<>();
updateFields.put("status", newStatus);
updateFields.put("updated_at", after.path("updated_at").asText());
esClient.update(u -> u
.index("orders")
.id(orderId)
.doc(updateFields)
);
log.info("ES索引更新: orderId={}, status: {} -> {}",
orderId, oldStatus, newStatus);
ack.acknowledge();
}
private void handleDelete(JsonNode before, Acknowledgment ack) throws Exception {
String orderId = before.path("id").asText();
esClient.delete(d -> d
.index("orders")
.id(orderId)
);
log.info("ES索引删除: orderId={}", orderId);
ack.acknowledge();
}
private void handleSnapshot(JsonNode after, Acknowledgment ack) throws Exception {
// 全量快照,批量写入(实际应该用批量API提高效率)
handleInsert(after, ack);
}
private OrderDocument parseOrderDoc(JsonNode node) {
OrderDocument doc = new OrderDocument();
doc.setId(node.path("id").asLong());
doc.setOrderNo(node.path("order_no").asText());
doc.setUserId(node.path("user_id").asLong());
doc.setStatus(node.path("status").asText());
doc.setAmount(node.path("amount").decimalValue());
return doc;
}
}3.4 监控Connector状态
/**
* Debezium Connector健康检查
* 定期调用Kafka Connect REST API检查Connector状态
*/
@Component
@Slf4j
public class DebeziumHealthChecker {
private final RestTemplate restTemplate;
private final String connectUrl = "http://kafka-connect:8083";
@Scheduled(fixedDelay = 30000) // 每30秒检查
public void checkConnectorStatus() {
try {
ResponseEntity<Map> response = restTemplate.getForEntity(
connectUrl + "/connectors/order-db-connector/status",
Map.class
);
Map<String, Object> body = response.getBody();
if (body == null) return;
Map<String, Object> connector = (Map<String, Object>) body.get("connector");
String state = (String) connector.get("state");
if (!"RUNNING".equals(state)) {
log.error("Debezium Connector异常! state={}, body={}", state, body);
sendAlarm("Debezium Connector异常: " + state);
}
// 检查Task状态
List<Map<String, Object>> tasks = (List<Map<String, Object>>) body.get("tasks");
for (Map<String, Object> task : tasks) {
String taskState = (String) task.get("state");
if (!"RUNNING".equals(taskState)) {
log.error("Debezium Task异常! taskId={}, state={}",
task.get("id"), taskState);
}
}
} catch (Exception e) {
log.error("检查Connector状态失败", e);
}
}
/**
* 重启失败的Connector
*/
public void restartConnector() {
try {
restTemplate.postForEntity(
connectUrl + "/connectors/order-db-connector/restart",
null,
Void.class
);
log.info("Connector重启请求已发送");
} catch (Exception e) {
log.error("Connector重启失败", e);
}
}
private void sendAlarm(String message) {
// 告警逻辑
}
}四、踩坑实录
坑1:binlog_format不是ROW模式
MySQL的binlog默认可能是STATEMENT模式,Debezium只支持ROW模式。如果配置错误,Debezium会在日志里报错:binlog_format is not ROW,Connector FAILED。
修改MySQL配置后,需要重启MySQL才能生效,而重启期间binlog位置会变化,Debezium需要重新做快照。
坑2:大表全量快照导致MySQL压力过大
Debezium首次启动时,如果配置了snapshot.mode=initial,会对监控的所有表做全量SELECT,对于千万级大表,这个SELECT会持有表锁(取决于MySQL版本和快照策略),严重影响业务写入。
解决方案:
- 大表在业务低峰期启动Debezium
- 配置
snapshot.locking.mode=none跳过锁(可能导致快照数据不一致,但通常可以接受) - 或者配置
snapshot.mode=schema_only只同步Schema,从当前binlog位置开始增量
坑3:Debezium消息没有Schema导致下游消费报错
默认情况下Debezium消息包含完整的Schema信息(很大),消费者反序列化时如果类型不匹配会报错。
解决方案:
- 在Connector配置里关闭schema:
"value.converter.schemas.enable": "false" - 或者使用Confluent Schema Registry统一管理Schema
坑4:binlog文件被MySQL清理,Debezium无法续接
Debezium依赖binlog文件来记录当前同步位置。如果MySQL的expire_logs_days设得太短,binlog被清理了,而Debezium上次处理的位置在被清理的文件里,Debezium就找不到续接点,需要重新做全量快照。
解决方案:确保expire_logs_days比Debezium可能停机的最长时间长得多。如果Debezium出现长时间停机,恢复时需要做全量快照。
五、总结
Debezium CDC是目前最成熟的数据库变更捕获方案,特别适合需要:
- 实时数据同步:DB变更毫秒级同步到Kafka
- 业务解耦:下游系统不依赖业务代码,只消费Kafka
- 灵活扩展:新增同步目标只需加消费者,对业务零侵入
核心组件:Kafka Connect + Debezium MySQL Connector。
部署注意事项:MySQL必须开启Row格式binlog,快照策略根据表大小选择,Connector状态需要监控。
下一篇(第446期)讲Spring Kafka最佳实践:@KafkaListener的并发、错误处理和手动提交,把Spring封装层的细节全部讲透。
