Kafka Log Compaction:实现状态快照的KV存储原理
Kafka Log Compaction:实现状态快照的KV存储原理
适读人群:想深入理解Kafka日志压缩机制,或需要在Kafka中保存最新状态的Java工程师 | 阅读时长:约15分钟
开篇故事
我们有一个配置中心服务,系统配置以Key-Value形式存储,每次修改配置都会推送一条消息到Kafka,下游各服务消费这条消息更新本地缓存。
系统运行了两年,配置修改了数万次,Kafka里积累了数万条配置变更消息。每次新服务启动,需要从头消费所有历史消息才能还原出最新配置状态,光是这个启动初始化过程就要花3-5分钟,而且随着历史消息越来越多,这个时间会持续增长。
后来我发现Kafka的Log Compaction功能完全可以解决这个问题:只保留每个Key的最新消息,历史版本自动清理。开启后,新服务启动只需要消费"最新状态快照",初始化时间从3-5分钟降到了20秒。
今天把Log Compaction的原理和实践全部讲清楚。
一、Log Compaction的本质
普通Kafka Topic按时间保留消息(log.retention.hours),过期后删除整段日志。
Log Compaction是另一种清理策略:对于相同Key的消息,只保留最新的那条,老的历史版本被清理掉,但最新状态永久保留(除非主动发送null值的"墓碑"消息来删除)。
压缩后的效果:每个Key只有最新的一条消息,消费者从头消费就能得到完整的最新状态。
二、Log Compaction原理
2.1 Clean与Dirty的概念
Kafka把日志文件分为两个区域:
- Clean部分:已经被压缩过,每个Key只有一条消息
- Dirty部分:新写入的消息,还没有被压缩
Log Cleaner线程周期性扫描,当Dirty比例超过min.cleanable.dirty.ratio(默认0.5,即50%)时,触发压缩。
2.2 墓碑消息(Tombstone)
发送value=null的消息(墓碑消息)可以删除一个Key。压缩时,墓碑消息本身会保留一段时间(delete.retention.ms,默认1天),让所有消费者都有机会看到这个"删除"操作,之后墓碑消息本身也会被清理。
三、完整Java实现
3.1 配置Log Compaction的Topic
/**
* 创建支持Log Compaction的Topic
*/
@Configuration
public class CompactionTopicConfig {
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic configStoreTopic() {
NewTopic topic = new NewTopic("config-store", 8, (short) 3);
Map<String, String> configs = new HashMap<>();
// 关键:设置清理策略为compact
configs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
// 最小压缩dirty比例(默认0.5)
// 调小(如0.1):更频繁压缩,磁盘占用更小,但CPU消耗更大
// 调大(如0.9):不频繁压缩,磁盘占用更大
configs.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.3");
// 消息最小保留时间(在这个时间内不会被压缩,保证消费者有时间消费到被覆盖的消息)
configs.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, "60000"); // 1分钟
// 墓碑消息保留时间(value=null的消息)
configs.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, "86400000"); // 1天
// 可以和时间清理策略组合使用(compact + delete)
// configs.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete");
// configs.put(TopicConfig.RETENTION_MS_CONFIG, "604800000"); // 7天
topic.configs(configs);
return topic;
}
}3.2 配置中心的KV存储实现
/**
* 基于Kafka Log Compaction的配置中心实现
*
* 特性:
* 1. 配置修改实时发布
* 2. 消费最新状态快照(Log Compaction保证)
* 3. 支持配置删除(墓碑消息)
*/
@Service
@Slf4j
public class KafkaConfigStore {
private static final String CONFIG_TOPIC = "config-store";
private final KafkaTemplate<String, String> kafkaTemplate;
// 本地缓存:消费Kafka后构建的完整配置Map
private final Map<String, String> localCache = new ConcurrentHashMap<>();
private volatile boolean initialized = false;
public KafkaConfigStore(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/**
* 写入/更新配置
*/
public void put(String key, String value) {
kafkaTemplate.send(CONFIG_TOPIC, key, value)
.thenAccept(result ->
log.info("配置已更新: key={}, offset={}",
key, result.getRecordMetadata().offset())
);
}
/**
* 删除配置(发送墓碑消息)
*/
public void delete(String key) {
// value=null 是墓碑消息,Log Compaction会最终删除这个Key
kafkaTemplate.send(CONFIG_TOPIC, key, null)
.thenAccept(result ->
log.info("配置已删除(墓碑消息): key={}", key)
);
localCache.remove(key);
}
/**
* 从本地缓存读取(不查Kafka)
*/
public Optional<String> get(String key) {
return Optional.ofNullable(localCache.get(key));
}
/**
* 启动时从头消费Kafka,构建完整的配置快照
* 由于Log Compaction,只需要消费压缩后的最新状态
* 消费完当前所有消息后(Lag=0),标记初始化完成
*/
@PostConstruct
public void loadSnapshot() {
Thread initThread = new Thread(() -> {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "config-store-init-" +
UUID.randomUUID()); // 每次启动用新group,从头消费
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(CONFIG_TOPIC));
// 获取Topic末尾offset(用于判断是否消费完)
Map<TopicPartition, Long> endOffsets = getEndOffsets(consumer);
boolean done = false;
while (!done) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (record.value() == null) {
// 墓碑消息:删除Key
localCache.remove(record.key());
} else {
localCache.put(record.key(), record.value());
}
}
// 检查是否已消费到末尾
done = isConsumedToEnd(consumer, endOffsets);
}
initialized = true;
log.info("配置快照加载完成: {} 个配置项", localCache.size());
}
}, "config-store-init");
initThread.setDaemon(false);
initThread.start();
}
private Map<TopicPartition, Long> getEndOffsets(
KafkaConsumer<String, String> consumer) {
Set<TopicPartition> partitions = consumer.assignment();
// 先等待分配
while (partitions.isEmpty()) {
consumer.poll(Duration.ofMillis(100));
partitions = consumer.assignment();
}
return consumer.endOffsets(partitions);
}
private boolean isConsumedToEnd(KafkaConsumer<String, String> consumer,
Map<TopicPartition, Long> endOffsets) {
for (Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
long position = consumer.position(entry.getKey());
if (position < entry.getValue()) {
return false;
}
}
return true;
}
}3.3 监听配置变更(实时更新本地缓存)
/**
* 配置变更监听器
* 初始化完成后,实时监听新的配置变更
*/
@Component
@Slf4j
public class ConfigChangeListener {
private final KafkaConfigStore configStore;
@KafkaListener(
topics = "config-store",
groupId = "config-store-listener-group"
)
public void onConfigChange(ConsumerRecord<String, String> record) {
String key = record.key();
String value = record.value();
if (value == null) {
log.info("配置删除: key={}", key);
configStore.removeFromCache(key);
} else {
log.info("配置更新: key={}, value={}", key, value);
configStore.updateCache(key, value);
}
}
}四、踩坑实录
坑1:compact策略下消费者启动慢的误解
很多人以为开了compact,消费者从头消费只需要消费"压缩后的少量消息",应该很快。但实际上:
- 压缩是后台异步进行的,不是实时的,Dirty区域的历史消息还在
- 新消费者要等Log Cleaner做完压缩后,重启再消费才会快
我们实测:即使开了compact,在压缩完成之前,从头消费还是要消费所有历史消息。只有压缩完成后,新消费者重新从earliest消费,才能体验到快速加载的效果。
对策:开启compact后,等Broker完成一次完整压缩(可观察Cleaner日志),再重启消费者。
坑2:null value的消息处理错误
消费时没有判断record.value() == null,直接record.value().toString(),NPE。墓碑消息的value是null,必须处理。
坑3:compact Topic和时间保留策略的组合使用
配置了cleanup.policy=compact后,历史消息永远不会因为时间过期而删除,磁盘占用只增不减(压缩只是去重,不是清理)。
如果想兼顾"最新状态"和"过期清理",可以设置cleanup.policy=compact,delete,既保留每个Key的最新消息,也按时间清理古老的消息段。
坑4:Log Compaction不是实时的,短期内有历史消息
压缩间隔受log.cleaner.min.cleanable.ratio和log.cleaner.backoff.ms控制,默认情况下可能几分钟到几十分钟才压缩一次。在这段时间内,消费者仍然会看到同一个Key的多个版本。
代码必须处理重复Key:消费时后来的消息覆盖之前的消息(Map.put语义,不是第一次出现的消息覆盖后来的)。
五、总结
Log Compaction将Kafka从纯粹的消息队列变成了一个"可重放的状态存储",特别适合以下场景:
- 配置中心:配置的最新值就是系统的真实状态
- 用户档案:用户信息的最新版本
- 事件溯源的快照:定期把状态写入compact Topic,避免从头replay所有事件
- Kafka Streams的状态存储:Kafka Streams就大量使用compact Topic存储流处理的中间状态
核心配置:cleanup.policy=compact,再配合min.cleanable.dirty.ratio控制压缩频率。
下一篇(第445期)讲Debezium CDC,从MySQL binlog到Kafka的数据同步,这是目前最流行的数据库变更捕获方案。
