Go 实现消息总线——进程内事件系统,解耦模块间通信
Go 实现消息总线——进程内事件系统,解耦模块间通信
适读人群:Go 后端工程师、想在不引入 MQ 的情况下解耦模块依赖的开发者 | 阅读时长:约 15 分钟 | 核心价值:进程内事件总线的完整实现,支持同步/异步、泛型类型安全、通配符订阅
做了几年 Java 的人转 Go,有一个习惯会带过来:喜欢用 Spring 的事件系统做模块解耦。@EventListener 一注解,发布方和订阅方互不感知,代码非常清爽。
转 Go 之后,一开始我直接用 channel,但 channel 需要双方都知道对方的存在,解耦程度不够。引入 Kafka/RabbitMQ 又太重,毕竟只是进程内通信。
最后自己实现了一个进程内的事件总线,这篇文章把实现逻辑写清楚。
使用场景
进程内事件总线适合:
- 模块间的低耦合通知(订单创建后通知积分、库存、通知等多个模块)
- 插件化架构(核心逻辑不感知扩展模块)
- 测试时方便 mock 和断言
不适合:
- 跨进程通信(用 MQ)
- 高吞吐量消息流(用 channel 直接传递)
- 需要持久化和重放的消息(用 MQ)
设计要点
- 类型安全:用泛型,订阅和发布的事件类型在编译期检查
- 同步 + 异步:默认同步,支持异步订阅(goroutine 里执行)
- 通配符:支持
order.*这样的主题模式 - 错误隔离:一个 handler panic 不影响其他 handler
- 取消订阅:返回 Unsubscribe 函数
完整实现
package eventbus
import (
"fmt"
"path"
"reflect"
"sync"
)
// Event 事件接口
type Event interface {
Topic() string
}
// Handler 事件处理器
type Handler[T Event] func(event T)
// Subscription 订阅信息
type subscription struct {
id int64
topic string // 可以包含通配符,如 "order.*"
handler reflect.Value
async bool
}
// Bus 事件总线
type Bus struct {
mu sync.RWMutex
subs []*subscription
nextID int64
}
var defaultBus = &Bus{}
// Subscribe 订阅事件(同步)
func Subscribe[T Event](bus *Bus, handler Handler[T]) func() {
return subscribe[T](bus, handler, false)
}
// SubscribeAsync 订阅事件(异步,在新 goroutine 里执行)
func SubscribeAsync[T Event](bus *Bus, handler Handler[T]) func() {
return subscribe[T](bus, handler, true)
}
func subscribe[T Event](bus *Bus, handler Handler[T], async bool) func() {
// 用零值获取 topic
var zero T
topic := zero.Topic()
bus.mu.Lock()
defer bus.mu.Unlock()
bus.nextID++
id := bus.nextID
sub := &subscription{
id: id,
topic: topic,
handler: reflect.ValueOf(handler),
async: async,
}
bus.subs = append(bus.subs, sub)
// 返回取消订阅函数
return func() {
bus.mu.Lock()
defer bus.mu.Unlock()
for i, s := range bus.subs {
if s.id == id {
bus.subs = append(bus.subs[:i], bus.subs[i+1:]...)
return
}
}
}
}
// Publish 发布事件
func Publish[T Event](bus *Bus, event T) {
bus.mu.RLock()
// 复制一份,避免在处理时锁被持有
subs := make([]*subscription, len(bus.subs))
copy(subs, bus.subs)
bus.mu.RUnlock()
topic := event.Topic()
for _, sub := range subs {
// 检查 topic 是否匹配(支持通配符)
matched, err := path.Match(sub.topic, topic)
if err != nil || !matched {
// 也检查反向匹配(订阅 "order.created",发布 "order.*")
matched2, _ := path.Match(topic, sub.topic)
if !matched2 {
continue
}
}
// 检查类型是否兼容
eventVal := reflect.ValueOf(event)
if !eventVal.Type().AssignableTo(sub.handler.Type().In(0)) {
continue
}
if sub.async {
go safeCall(sub.handler, eventVal, sub.topic)
} else {
safeCall(sub.handler, eventVal, sub.topic)
}
}
}
func safeCall(handler reflect.Value, arg reflect.Value, topic string) {
defer func() {
if r := recover(); r != nil {
fmt.Printf("[EventBus] Handler for topic %q panicked: %v\n", topic, r)
}
}()
handler.Call([]reflect.Value{arg})
}
// Default 使用默认总线的便捷函数
func Sub[T Event](handler Handler[T]) func() {
return Subscribe[T](defaultBus, handler)
}
func Pub[T Event](event T) {
Publish[T](defaultBus, event)
}使用示例
package main
import (
"fmt"
"time"
"your-project/eventbus"
)
// 定义事件
type OrderCreatedEvent struct {
OrderID string
UserID string
Amount float64
Items []string
}
func (e OrderCreatedEvent) Topic() string { return "order.created" }
type OrderPaidEvent struct {
OrderID string
PaymentID string
Amount float64
}
func (e OrderPaidEvent) Topic() string { return "order.paid" }
// 积分服务
func setupPointsService() {
eventbus.Sub[OrderPaidEvent](func(e OrderPaidEvent) {
points := int(e.Amount / 10)
fmt.Printf("[积分服务] 订单 %s 支付完成,增加 %d 积分\n", e.OrderID, points)
})
}
// 库存服务
func setupInventoryService() {
eventbus.Sub[OrderCreatedEvent](func(e OrderCreatedEvent) {
fmt.Printf("[库存服务] 订单 %s 创建,锁定商品: %v\n", e.OrderID, e.Items)
})
}
// 通知服务(异步,不阻塞主流程)
func setupNotificationService() {
eventbus.Sub[OrderCreatedEvent](func(e OrderCreatedEvent) {
// 模拟发送短信(较慢)
time.Sleep(100 * time.Millisecond)
fmt.Printf("[通知服务] 发送订单创建短信给用户 %s\n", e.UserID)
})
}
// 订单服务(核心业务,不知道也不依赖上面的服务)
func createOrder(orderID, userID string, amount float64, items []string) {
fmt.Printf("[订单服务] 创建订单 %s\n", orderID)
// ... 订单数据库写入 ...
// 发布事件,不关心谁在监听
eventbus.Pub(OrderCreatedEvent{
OrderID: orderID,
UserID: userID,
Amount: amount,
Items: items,
})
}
func payOrder(orderID, paymentID string, amount float64) {
fmt.Printf("[订单服务] 订单 %s 支付成功\n", orderID)
eventbus.Pub(OrderPaidEvent{
OrderID: orderID,
PaymentID: paymentID,
Amount: amount,
})
}
func main() {
// 各服务注册自己的事件监听
setupPointsService()
setupInventoryService()
setupNotificationService()
// 业务逻辑
createOrder("O001", "U123", 299.0, []string{"iPhone Case", "Screen Protector"})
fmt.Println("--- 订单创建完成 ---")
payOrder("O001", "P001", 299.0)
fmt.Println("--- 支付完成 ---")
time.Sleep(200 * time.Millisecond) // 等待异步处理
}输出:
[订单服务] 创建订单 O001
[库存服务] 订单 O001 创建,锁定商品: [iPhone Case Screen Protector]
[通知服务] 发送订单创建短信给用户 U123 ← 异步
--- 订单创建完成 ---
[订单服务] 订单 O001 支付成功
[积分服务] 订单 O001 支付完成,增加 29 积分
--- 支付完成 ---踩坑实录
踩坑 1:同步 handler 里修改了共享状态,有 race condition
现象:开启 -race 标志运行测试,发现 data race 警告。
原因:两个不同的 handler 在同一个 goroutine 里同步执行,但如果 handler 里访问了共享变量(没有加锁),就会有 race。特别是多个 goroutine 同时发布不同事件时。
解法:对同步 handler,事件总线串行调用它们(当前实现已是如此);对异步 handler,handler 内部要自己加锁保护共享状态。
踩坑 2:订阅但忘记取消订阅,导致 handler 被重复注册
现象:某个模块在每次 HTTP 请求里都调用了 Subscribe,导致订阅数量不断增长,同一个事件被处理了几百次。
解法:订阅操作应该在初始化时做一次,而不是在请求处理路径里。在代码 review 中要特别注意这类问题。
踩坑 3:泛型 zero value 的 Topic() 在某些情况下 panic
现象:var zero T; topic := zero.Topic() 在 T 是指针类型时(*OrderCreatedEvent)panic,因为指针零值是 nil。
解法:要求 Event 实现为值类型,不要用指针类型作为泛型参数。或者用 reflect.New(reflect.TypeOf(zero).Elem()).Interface().(T) 创建非 nil 实例。
与其他方案对比
| 方案 | 耦合度 | 类型安全 | 异步支持 | 适用场景 |
|---|---|---|---|---|
| 直接函数调用 | 强 | 强 | 需自己实现 | 简单依赖 |
| Channel | 中 | 强 | 原生 | 流式数据 |
| 本文事件总线 | 弱 | 强(泛型) | 支持 | 模块解耦 |
| MQ(Kafka等) | 弱 | 弱(序列化) | 原生 | 跨进程通信 |
