Kafka Connect实战:无代码数据管道连接数据库与数据仓库
大约 4 分钟
Kafka Connect实战:无代码数据管道连接数据库与数据仓库
适读人群:Java后端开发、数据工程师、关注数据集成和ETL管道的工程师 | 阅读时长:约18分钟
开篇故事
2023年,我们要把MySQL的订单数据实时同步到ClickHouse做分析。
最初的方案:写一个消费Binlog的Java服务,解析变更,写入ClickHouse。花了两周,代码写了1000行,各种边界情况处理,还要维护服务部署和监控。
后来换成Kafka Connect + Debezium,配置文件20行,两小时搞定,零代码。
Kafka Connect就是干这个的:声明式地连接各种数据源和数据目标,不需要写代码。
一、Kafka Connect 架构
数据流向:
Source ──→ Kafka ──→ Sink
Source Connector(读):
从数据源读取数据,写入Kafka Topic
示例:MySQL → Kafka(Debezium)
文件系统 → Kafka
MongoDB → Kafka
Sink Connector(写):
从Kafka Topic读取数据,写入目标系统
示例:Kafka → ClickHouse
Kafka → Elasticsearch
Kafka → S3(数据湖)
架构图:
┌──────────────────────────────────────────────────────────┐
│ Kafka Connect 集群 │
│ ┌────────────┐ ┌──────────┐ ┌──────────────────┐ │
│ │ Source │ │ │ │ Sink │ │
│ │ Connector │──→│ Kafka │──→│ Connector │ │
│ │ (Debezium) │ │ Topics │ │ (JDBC/CH) │ │
│ └────────────┘ └──────────┘ └──────────────────┘ │
│ ↑ ↓ │
│ MySQL ClickHouse │
└──────────────────────────────────────────────────────────┘二、实战:MySQL → Kafka → ClickHouse
2.1 部署Kafka Connect(Docker Compose)
# docker-compose.yml
version: '3.8'
services:
kafka:
image: confluentinc/cp-kafka:7.5.0
environment:
KAFKA_KRAFT_MODE: "true"
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_NODE_ID: 1
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_LOG_DIRS: /var/lib/kafka/data
kafka-connect:
image: confluentinc/cp-kafka-connect:7.5.0
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "connect-cluster"
CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-statuses
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
command:
- bash
- -c
- |
# 安装Debezium MySQL Connector插件
confluent-hub install debezium/debezium-connector-mysql:2.3.0 --no-prompt
# 安装JDBC Sink Connector
confluent-hub install confluentinc/kafka-connect-jdbc:10.7.4 --no-prompt
/etc/confluent/docker/run2.2 配置Debezium MySQL Source Connector
# 通过REST API创建Connector
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "mysql-orders-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql-host",
"database.port": "3306",
"database.user": "debezium",
"database.password": "debezium_password",
"database.server.id": "184054",
"database.server.name": "mysql-orders",
"database.include.list": "orderdb",
"table.include.list": "orderdb.orders,orderdb.order_items",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.orderdb",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite"
}
}'Debezium会自动创建Topic(格式:{server-name}.{database}.{table}):
mysql-orders.orderdb.ordersmysql-orders.orderdb.order_items
2.3 配置JDBC Sink Connector(写入ClickHouse)
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "clickhouse-orders-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "2",
"connection.url": "jdbc:clickhouse://clickhouse-host:8123/analytics",
"connection.user": "default",
"connection.password": "",
"topics": "mysql-orders.orderdb.orders",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "id",
"auto.create": "true",
"auto.evolve": "true",
"batch.size": "3000"
}
}'三、监控Connector状态
3.1 REST API查看状态
# 查看所有Connector
curl http://localhost:8083/connectors
# 查看特定Connector状态
curl http://localhost:8083/connectors/mysql-orders-source/status
# 输出示例:
# {
# "name": "mysql-orders-source",
# "connector": { "state": "RUNNING", "worker_id": "connect:8083" },
# "tasks": [{ "id": 0, "state": "RUNNING", "worker_id": "connect:8083" }]
# }
# 重启失败的Connector
curl -X POST http://localhost:8083/connectors/mysql-orders-source/restart
# 暂停Connector
curl -X PUT http://localhost:8083/connectors/mysql-orders-source/pause
# 恢复Connector
curl -X PUT http://localhost:8083/connectors/mysql-orders-source/resume3.2 Java监控客户端
import org.springframework.web.client.RestTemplate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* Kafka Connect健康监控
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaConnectMonitor {
private final RestTemplate restTemplate;
private final AlertService alertService;
private static final String CONNECT_URL = "http://kafka-connect:8083";
@Scheduled(fixedDelay = 60_000) // 每60秒检查
public void checkConnectorHealth() {
try {
String url = CONNECT_URL + "/connectors?expand=status";
Map<String, Object> connectors = restTemplate.getForObject(url, Map.class);
if (connectors == null) return;
connectors.forEach((name, detail) -> {
Map<String, Object> status = (Map<String, Object>) ((Map<String, Object>) detail).get("status");
Map<String, Object> connectorStatus = (Map<String, Object>) status.get("connector");
String state = (String) connectorStatus.get("state");
if (!"RUNNING".equals(state)) {
alertService.sendAlert(
String.format("[Kafka Connect] Connector '%s' 状态异常: %s", name, state));
log.error("[KafkaConnect] Connector {} 状态: {}", name, state);
}
});
} catch (Exception e) {
alertService.sendAlert("[Kafka Connect] 监控检查失败: " + e.getMessage());
}
}
}四、踩坑实录
坑1:Debezium需要MySQL开启Binlog和特定权限
-- MySQL需要开启binlog
-- my.cnf 配置
-- [mysqld]
-- log-bin=mysql-bin
-- binlog_format=ROW
-- binlog_row_image=FULL
-- Debezium用户需要的权限
CREATE USER 'debezium'@'%' IDENTIFIED BY 'password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
FLUSH PRIVILEGES;坑2:Connector任务失败后不会自动重启
默认情况下Connector任务失败后状态变为FAILED,需要手动或程序重启。
配置自动重启策略:
{
"errors.retry.timeout": "300000",
"errors.retry.delay.max.ms": "60000",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true"
}坑3:大表初始快照时间过长
Debezium首次启动会做全表快照(snapshot),大表可能几个小时。
{
"snapshot.mode": "schema_only",
"snapshot.locking.mode": "none"
}schema_only:只快照表结构,不快照历史数据,直接从当前Binlog位置开始。适合只需要增量数据的场景。
五、总结
Kafka Connect的核心价值:
- 零代码数据管道:配置文件声明连接关系,不写一行业务代码
- 生态丰富:Confluent Hub有200+连接器,覆盖主流数据库和云服务
- 内置可靠性:exactly-once语义、失败重试、offset管理
- 水平扩展:分布式部署,任务可以分散到多个Worker
最适合场景:数据库变更同步到搜索引擎/数据仓库、日志采集、数据库迁移。
