Apache Pulsar深度:多租户、分层存储与Kafka的架构差异对比
Apache Pulsar深度:多租户、分层存储与Kafka的架构差异对比
适读人群:Java后端工程师、消息队列架构师 | 阅读时长:约18分钟 | 技术栈:Apache Pulsar 3.x、Pulsar Java Client、Spring Pulsar
开篇故事
三年前,公司有个SaaS平台,消息系统一直在Kafka上跑。随着租户数量从几十增长到几百,我们遇到了一个棘手的问题:不同租户的消息相互影响,某个大租户消息量暴增,直接影响了其他小租户的消费延迟。
当时的解决方案很土:给大租户单独开一套Kafka集群。但这样运维成本直线上升,资源也浪费严重。
后来技术团队调研了Apache Pulsar,它原生的多租户隔离机制让我们眼前一亮:租户、命名空间、Topic三层架构,天然的资源隔离,不需要每个租户独立部署一套。
迁移了大半年,整体是成功的,但过程中也遇到了很多Pulsar和Kafka思维方式不同造成的坑。今天把这段经历写出来。
一、核心问题:Pulsar和Kafka的根本架构差异
1.1 Kafka的设计哲学
Kafka把消息数据直接存在Broker本地磁盘,Broker既负责消息路由,也负责数据存储。这个设计简单高效,但扩展时有限制:
- 扩容需要rebalance分区,期间性能下降
- 多租户隔离靠Topic隔离,没有原生的资源限制机制
- 历史数据存储成本高(热存储)
1.2 Pulsar的存算分离架构
存算分离带来的优势:
- Broker无状态,可以随时增减,不需要rebalance
- BookKeeper专注数据存储,可以独立扩展
- 热数据在BookKeeper,冷数据可以自动offload到S3,成本大幅下降
1.3 多租户架构
Topic的完整路径格式:persistent://tenant/namespace/topic
这个设计不只是名称空间,每个层级都可以配置独立的访问控制、配额限制、存储策略。
二、深度对比:Pulsar vs Kafka
2.1 消费模型差异
Kafka的消费模型:Consumer Group竞争消费,每个分区同时只能被一个Consumer消费。
Pulsar提供了四种订阅模式:
Key_Shared模式是Kafka没有的,它结合了并行消费和按Key有序的优点,对于需要保证同一用户、同一订单消息有序的场景非常有用。
2.2 消息确认机制差异
Kafka:提交offset,消费者记录"我已经消费到第N条消息"。
Pulsar:每条消息独立确认,支持累积确认(ack到第N条)和单条确认(ack特定消息ID)。
这意味着Pulsar可以做到"跳过某条消息",而Kafka做不到(只能skip整个offset)。
三、完整代码实现
3.1 Java客户端配置
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>3.0.0</version>
</dependency>
<!-- Spring Pulsar -->
<dependency>
<groupId>org.springframework.pulsar</groupId>
<artifactId>spring-pulsar-spring-boot-starter</artifactId>
<version>0.2.0</version>
</dependency>spring:
pulsar:
client:
service-url: pulsar://localhost:6650
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.AuthenticationToken
param: "token:eyJhbGciOiJSUzI1NiJ9..."
producer:
topic-name: persistent://myorg/prod/orders
consumer:
topics:
- persistent://myorg/prod/orders
subscription-name: order-processor
subscription-type: shared3.2 生产者:发送消息
@Service
public class OrderEventProducer {
@Autowired
private PulsarClient pulsarClient;
private Producer<Order> producer;
@PostConstruct
public void init() throws PulsarClientException {
producer = pulsarClient.newProducer(Schema.JSON(Order.class))
.topic("persistent://myorg/prod/orders")
.producerName("order-producer-" + UUID.randomUUID())
.sendTimeout(10, TimeUnit.SECONDS)
.blockIfQueueFull(true)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) // 批量发送
.batchingMaxMessages(1000)
.compressionType(CompressionType.LZ4) // 压缩
.create();
}
/**
* 同步发送,等待broker确认
*/
public MessageId sendOrder(Order order) throws PulsarClientException {
return producer.newMessage()
.key(String.valueOf(order.getUserId())) // Key决定消息路由,保证同用户有序
.value(order)
.property("version", "v1")
.property("source", "order-service")
.send();
}
/**
* 异步发送,高吞吐
*/
public CompletableFuture<MessageId> sendOrderAsync(Order order) {
return producer.newMessage()
.key(String.valueOf(order.getUserId()))
.value(order)
.sendAsync();
}
/**
* 延时消息:Pulsar原生支持,Kafka需要额外实现
*/
public void sendDelayedOrder(Order order, long delaySeconds) throws PulsarClientException {
producer.newMessage()
.value(order)
.deliverAfter(delaySeconds, TimeUnit.SECONDS) // 延迟发送
.send();
}
@PreDestroy
public void cleanup() throws PulsarClientException {
producer.close();
}
}3.3 消费者:多种订阅模式
@Service
public class OrderEventConsumer {
@Autowired
private PulsarClient pulsarClient;
/**
* Key_Shared订阅:同用户的订单按顺序处理
*/
public void startKeySharedConsumer() throws PulsarClientException {
Consumer<Order> consumer = pulsarClient.newConsumer(Schema.JSON(Order.class))
.topic("persistent://myorg/prod/orders")
.subscriptionName("order-key-processor")
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(30, TimeUnit.SECONDS) // 超时自动重发
.negativeAckRedeliveryDelay(5, TimeUnit.SECONDS) // nack后5秒重发
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(5)
.deadLetterTopic("persistent://myorg/prod/orders-dlq")
.build())
.subscribe();
Thread.ofVirtual().start(() -> {
while (true) {
try {
Message<Order> message = consumer.receive(1, TimeUnit.SECONDS);
if (message == null) continue;
try {
processOrder(message.getValue());
consumer.acknowledge(message); // 成功确认
} catch (BusinessException e) {
// 业务异常:不重试,直接ack并记录
log.error("订单处理失败,跳过: {}", message.getMessageId(), e);
consumer.acknowledge(message);
} catch (RetryableException e) {
// 可重试异常:nack,稍后重发
consumer.negativeAcknowledge(message);
}
} catch (PulsarClientException e) {
log.error("消费异常", e);
}
}
});
}
/**
* Spring Pulsar注解方式(更简洁)
*/
@PulsarListener(
topics = "persistent://myorg/prod/orders",
subscriptionName = "spring-order-processor",
subscriptionType = SubscriptionType.Shared,
schemaType = SchemaType.JSON
)
public void handleOrder(Order order) {
processOrder(order);
}
}3.4 多租户管理:Admin API
@Service
public class PulsarTenantManager {
@Autowired
private PulsarAdmin pulsarAdmin;
/**
* 为新租户创建完整的Pulsar资源
*/
public void onboardTenant(String tenantId) throws PulsarAdminException {
// 1. 创建租户
TenantInfo tenantInfo = TenantInfo.builder()
.adminRoles(Set.of("admin-" + tenantId))
.allowedClusters(Set.of("pulsar-cluster-1"))
.build();
pulsarAdmin.tenants().createTenant(tenantId, tenantInfo);
// 2. 创建命名空间
String namespace = tenantId + "/prod";
pulsarAdmin.namespaces().createNamespace(namespace);
// 3. 设置命名空间策略
// 消息保留策略:保留最近7天或最多100GB
RetentionPolicies retention = new RetentionPolicies(7 * 24 * 60, 100 * 1024);
pulsarAdmin.namespaces().setRetention(namespace, retention);
// 消息TTL:未消费消息24小时后自动删除
pulsarAdmin.namespaces().setNamespaceMessageTTL(namespace, 24 * 3600);
// 资源配额:限制该租户的带宽和QPS
ResourceQuota quota = new ResourceQuota();
quota.setMsgRateIn(10000.0); // 入站10000条/秒
quota.setMsgRateOut(20000.0); // 出站20000条/秒
quota.setBandwidthIn(50 * 1024 * 1024.0); // 50MB/s入站
quota.setBandwidthOut(100 * 1024 * 1024.0); // 100MB/s出站
pulsarAdmin.namespaces().setNamespaceResourceQuota(namespace, quota);
// 4. 设置分层存储策略:超过2天的数据offload到S3
OffloadPolicies offloadPolicies = OffloadPolicies.create(
"aws-s3", "us-east-1", "my-pulsar-offload-bucket",
null, null, null, null,
2 * 24 * 3600 * 1000L, // 2天后offload
1024 * 1024 * 1024L, // 1GB触发offload
OffloadedReadPriority.BOOKKEEPER_FIRST
);
pulsarAdmin.namespaces().setOffloadPolicies(namespace, offloadPolicies);
log.info("租户{}入驻完成", tenantId);
}
}3.5 分层存储配置
# pulsar broker配置(broker.conf)
managedLedgerOffloadDriver=aws-s3
s3ManagedLedgerOffloadBucket=pulsar-tiered-storage
s3ManagedLedgerOffloadRegion=us-east-1
s3ManagedLedgerOffloadCredentialId=xxx
s3ManagedLedgerOffloadCredentialSecret=xxx
# 自动offload策略
managedLedgerOffloadAutoTriggerSizeThresholdBytes=1073741824 # 1GB触发
managedLedgerOffloadDeletionLagMs=14400000 # offload后4小时删除本地副本四、工程实践与Kafka迁移
4.1 Pulsar与Kafka的功能对比
| 特性 | Apache Kafka | Apache Pulsar |
|---|---|---|
| 多租户 | 无原生支持 | 原生多租户 |
| 分层存储 | 需要插件 | 原生支持 |
| 消费模型 | Consumer Group | 4种订阅模式 |
| 延时消息 | 无 | 原生支持 |
| 消息去重 | 需自己实现 | 原生支持 |
| 管理工具 | 成熟 | 较新 |
| 社区成熟度 | 极成熟 | 成熟 |
| 运维复杂度 | 中等 | 较高(ZK+BK) |
4.2 Kafka Topic迁移到Pulsar
/**
* 双写方案:迁移期间同时写Kafka和Pulsar
*/
@Service
public class DualWriteProducer {
@Autowired
private KafkaTemplate<String, Order> kafkaTemplate;
@Autowired
private OrderEventProducer pulsarProducer;
@Value("${migration.pulsar-enabled:false}")
private boolean pulsarEnabled;
public void sendOrder(Order order) {
// 1. 先写Kafka(主路径)
kafkaTemplate.send("orders", String.valueOf(order.getUserId()), order);
// 2. 异步写Pulsar(不影响主路径)
if (pulsarEnabled) {
pulsarProducer.sendOrderAsync(order)
.exceptionally(e -> {
log.warn("Pulsar写入失败(不影响主流程): {}", e.getMessage());
return null;
});
}
}
}五、踩坑实录
坑一:Pulsar的运维复杂度远超Kafka
Pulsar需要三个组件:ZooKeeper(元数据)、BookKeeper(存储)、Broker(服务)。一个最小化的生产集群需要3+3+3=9个节点,比Kafka的3节点复杂得多。
我们的生产环境每次升级都是一次不小的运维挑战,特别是BookKeeper的版本升级有很多注意事项。
坑二:Key_Shared模式下的哈希冲突
Key_Shared订阅是按Key的哈希值分配给消费者的。如果某个Key的消息量特别大,会导致哈希到同一个消费者的消息过多,产生热点。
解决方案:使用KeySharedPolicy.stickyHashRange手动控制哈希分区。
坑三:消息积压后的offload性能问题
消息积压严重时,消费者可能需要从分层存储(S3)读取历史消息,性能比读BookKeeper差很多。如果有大量积压,建议先增加消费者实例追赶进度,再考虑调整offload策略。
坑四:Pulsar Functions的坑
Pulsar Functions是Pulsar内置的轻量级计算框架,看起来很诱人——不需要额外的Flink就能做流处理。但它的功能远不如Flink强大,状态管理、时间窗口等特性都很弱,不建议用于复杂计算。
六、总结与个人判断
Pulsar是一个技术上非常先进的消息系统,存算分离架构、原生多租户、分层存储,这些设计思路都比Kafka更现代。
但我要说一个让Pulsar粉丝不太高兴的结论:对于大多数公司,Kafka依然是更好的选择。
原因很简单:Kafka的社区成熟度、工具链完善度、运维经验积累,都远超Pulsar。Kafka的Confluent生态、各种连接器、监控工具,经历了十几年打磨,Pulsar在这些方面还在追赶。
Pulsar真正的优势场景:大型SaaS平台需要多租户隔离、长期历史数据存储成本要求低(分层存储)、需要延时消息原生支持。如果你的场景不涉及这些特定需求,Kafka的选型风险更低。
