消息序列化选型:JSON vs Protobuf vs Avro在Kafka场景的性能对比
消息序列化选型:JSON vs Protobuf vs Avro在Kafka场景的性能对比
适读人群:Kafka消息体积过大、序列化CPU消耗高、需要Schema管理的Java工程师 | 阅读时长:约15分钟
开篇故事
我们的用户行为日志系统,每天处理约500亿条消息,全部用JSON格式存储在Kafka中。系统运行了两年,直到某次容量规划时,DBA告诉我:Kafka集群的磁盘占用是同等数据量关系型数据库的8倍,每年光磁盘成本就要多花100万。
详细分析后发现:一条用户行为消息平均350字节,其中字段名就占了180字节(约50%是"开销",不是实际数据)。相同的数据用Protobuf存储,只需要约70字节,磁盘使用量能降低80%。
但切换序列化格式不是小事——所有生产者和消费者都要同步升级,Schema的兼容性也要仔细考虑。这篇文章把三种方案从原理到选型全部讲清楚。
一、三种序列化方案对比
1.1 基准性能测试数据
测试环境:Java 17,4核8G,相同的订单消息(包含12个字段,典型JSON大小约450字节)
| 指标 | JSON (Jackson) | Protobuf 3 | Avro (Schema Registry) |
|---|---|---|---|
| 序列化后大小 | 450字节 | 85字节 | 95字节 |
| 序列化吞吐(MB/s) | 320 | 850 | 680 |
| 反序列化吞吐(MB/s) | 280 | 920 | 720 |
| Schema演进支持 | 无 | 有限制 | 优秀 |
| 人类可读性 | 好 | 差(二进制) | 差(二进制) |
| 跨语言支持 | 优秀 | 优秀 | 良好 |
| 运维复杂度 | 低 | 中 | 高(需要Registry) |
结论:如果消息体积是瓶颈,Protobuf能减少80%大小,序列化速度提升2.7倍,非常值得迁移。
二、三种方案深度分析
2.1 JSON:灵活但臃肿
JSON的最大问题是字段名冗余。每条消息都要重复写字段名,对于"高吞吐、小消息"的Kafka场景,字段名的开销占比极高。
{
"userId": "u123456",
"eventType": "CLICK",
"targetId": "product-789",
"timestamp": 1706234567890,
"sessionId": "s-abc-def",
"clientIp": "192.168.1.1",
"deviceType": "MOBILE"
}
// 总共约 180字节,其中字段名约80字节(44%是开销)适用场景:消息量不大(< 1万TPS)、需要人工直接查看消息内容、团队没有Protobuf/Avro经验。
2.2 Protobuf:高效但Schema管理麻烦
Protobuf通过字段编号替代字段名,序列化后只存编号+值,不存字段名。字段编号占1字节,大幅减少了开销。
// user_behavior.proto
syntax = "proto3";
message UserBehaviorEvent {
string user_id = 1; // 字段编号1
string event_type = 2; // 字段编号2
string target_id = 3;
int64 timestamp = 4;
string session_id = 5;
string client_ip = 6;
string device_type = 7;
}
// 序列化后约 70-90字节,字段名完全消失Schema演进规则:
- 可以添加新字段(老消费者会忽略)
- 不能删除字段(老消费者可能依赖该字段)
- 不能改变字段编号(会破坏二进制兼容)
- 不能改变字段类型(可能导致解析错误)
2.3 Avro + Schema Registry:最佳的Schema管理
Avro在二进制消息中不包含Schema,而是包含一个Schema ID,消费者通过Schema ID从Schema Registry获取Schema定义来解析消息。
Avro的Schema演进支持最强,支持字段默认值、字段重命名(通过aliases)、字段类型宽化(int→long)。
三、完整Java代码实现
3.1 Protobuf序列化实现
// 1. 编写proto文件(src/main/proto/order_event.proto)
/*
syntax = "proto3";
package com.example.kafka.proto;
option java_package = "com.example.kafka.proto";
option java_outer_classname = "OrderEventProto";
message OrderEvent {
string order_id = 1;
string user_id = 2;
double amount = 3;
string status = 4;
int64 create_time_ms = 5;
repeated OrderItem items = 6;
message OrderItem {
string sku_id = 1;
int32 quantity = 2;
double price = 3;
}
}
*/
// 2. 自定义Protobuf序列化器/反序列化器
public class ProtobufSerializer<T extends MessageLite> implements Serializer<T> {
@Override
public byte[] serialize(String topic, T data) {
if (data == null) return null;
return data.toByteArray();
}
}
public class OrderEventProtobufDeserializer implements Deserializer<OrderEventProto.OrderEvent> {
@Override
public OrderEventProto.OrderEvent deserialize(String topic, byte[] data) {
if (data == null) return null;
try {
return OrderEventProto.OrderEvent.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Protobuf反序列化失败", e);
}
}
}
// 3. 生产者使用Protobuf
@Service
@Slf4j
public class ProtobufOrderProducer {
private final Producer<String, OrderEventProto.OrderEvent> producer;
public ProtobufOrderProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ProtobufSerializer.class.getName());
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // Protobuf再加lz4,效果更好
this.producer = new KafkaProducer<>(props);
}
public void sendOrder(Order order) {
// 构建Protobuf消息
OrderEventProto.OrderEvent.Builder builder =
OrderEventProto.OrderEvent.newBuilder()
.setOrderId(order.getOrderId())
.setUserId(order.getUserId())
.setAmount(order.getAmount().doubleValue())
.setStatus(order.getStatus())
.setCreateTimeMs(order.getCreateTime().toEpochMilli());
for (OrderItem item : order.getItems()) {
builder.addItems(
OrderEventProto.OrderEvent.OrderItem.newBuilder()
.setSkuId(item.getSkuId())
.setQuantity(item.getQuantity())
.setPrice(item.getPrice().doubleValue())
.build()
);
}
OrderEventProto.OrderEvent event = builder.build();
producer.send(
new ProducerRecord<>("order-events-proto", order.getOrderId(), event),
(metadata, exception) -> {
if (exception != null) {
log.error("发送失败: orderId={}", order.getOrderId(), exception);
} else {
log.debug("发送成功: orderId={}, size={}bytes, offset={}",
order.getOrderId(), event.getSerializedSize(), metadata.offset());
}
}
);
}
}3.2 Avro + Schema Registry实现
// 1. 定义Avro Schema(order-event.avsc)
/*
{
"type": "record",
"name": "OrderEvent",
"namespace": "com.example.kafka.avro",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "userId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "status", "type": "string"},
{"name": "createTimeMs", "type": "long"},
{"name": "source", "type": ["null", "string"], "default": null}
]
}
*/
// 2. 使用Confluent Kafka Avro Serializer
/*
pom.xml依赖:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.5.0</version>
</dependency>
*/
@Configuration
public class AvroKafkaConfig {
@Bean
public ProducerFactory<String, GenericRecord> avroProducerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// Avro序列化器(自动向Schema Registry注册Schema)
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaAvroSerializer.class);
// Schema Registry地址
configs.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://schema-registry:8081");
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public ConsumerFactory<String, GenericRecord> avroConsumerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaAvroDeserializer.class);
configs.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://schema-registry:8081");
// 反序列化为具体Java对象(需要avro-maven-plugin生成代码)
configs.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
return new DefaultKafkaConsumerFactory<>(configs);
}
}@Service
@Slf4j
public class AvroOrderProducer {
private final KafkaTemplate<String, GenericRecord> kafkaTemplate;
public void sendOrder(Order order) {
// 加载Schema
Schema schema = new Schema.Parser().parse(
getClass().getResourceAsStream("/avro/order-event.avsc")
);
// 构建Avro GenericRecord
GenericRecord record = new GenericData.Record(schema);
record.put("orderId", order.getOrderId());
record.put("userId", order.getUserId());
record.put("amount", order.getAmount().doubleValue());
record.put("status", order.getStatus());
record.put("createTimeMs", order.getCreateTime().toEpochMilli());
record.put("source", "mobile-app");
kafkaTemplate.send("order-events-avro", order.getOrderId(), record);
log.info("Avro消息已发送: orderId={}", order.getOrderId());
}
}四、踩坑实录
坑1:Protobuf字段编号被复用导致数据错乱
一个同事删除了一个废弃字段(字段编号5),然后把新字段加到了编号5。老消费者(还在使用旧proto文件)读到这条消息时,把新字段的值当成了旧字段的类型来解析,数据完全错乱。
规则:Protobuf字段编号一旦使用,永远不能复用,即使字段被删除。删除字段后要用reserved声明该编号被保留:
message OrderEvent {
reserved 5; // 原来是sourceChannel,已删除,编号保留禁止复用
reserved "sourceChannel"; // 字段名也保留
string order_id = 1;
// ...
}坑2:Avro Schema演进的兼容性模式设置错误
Schema Registry有多种兼容性模式(BACKWARD、FORWARD、FULL),默认是BACKWARD:新Schema可以读取旧数据,但旧Schema无法读取新数据。
如果设置成NONE(不检查兼容性),开发人员随意修改Schema(如删除字段、改字段类型),线上消费者反序列化就会崩溃。
最佳实践:设置FULL(前向后向都兼容),新字段必须有默认值:
// 设置Topic的兼容性策略(HTTP请求)
PUT http://schema-registry:8081/config/order-events-value
{"compatibility": "FULL"}坑3:Schema Registry成为单点故障
消费者每次反序列化都要查Schema Registry,如果Schema Registry不可用,消费者无法反序列化任何消息,整个消费链路崩溃。
解决方案:
- Schema Registry做高可用(至少2个实例 + 负载均衡)
- 客户端开启Schema缓存,不是每次都查Registry:
schema.registry.cache.capacity=1000 # 缓存1000个Schema五、选型建议
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| 日消息量 < 10亿条 | JSON | 简单,维护成本低 |
| 日消息量 > 100亿条 | Protobuf | 减少80%存储,提升吞吐 |
| 多团队/多语言,Schema需要治理 | Avro + Schema Registry | 最佳Schema演进支持 |
| 大数据生态(Hadoop/Spark) | Avro | Hadoop生态原生支持Avro |
| 已有gRPC基础设施 | Protobuf | 统一序列化格式 |
一个实用的决策点:如果你的Kafka消息每天超过10TB,就值得考虑换Protobuf或Avro。低于这个量级,JSON的运维简单性优势大于性能损失。
下一篇(第450期)讲Saga分布式事务,用消息队列实现跨服务的最终一致性,这是微服务架构里最实用的分布式事务方案。
