Go 开发服务注册中心——简化版 Consul,用 Go + Raft 实现
Go 开发服务注册中心——简化版 Consul,用 Go + Raft 实现
适读人群:Go 后端工程师、想理解服务注册中心和 Raft 共识算法原理的开发者 | 阅读时长:约 20 分钟 | 核心价值:用 hashicorp/raft 实现强一致性的服务注册中心,理解 Consul 的核心机制
做了几年微服务的同学都知道,服务注册中心是微服务架构的基础设施。Consul 是大家用得最多的,但有时候你会遇到一些场景:私有化部署不让用云上的,Consul 集群的运维成本太高,或者就是想搞清楚 Consul 内部是怎么工作的。
今年帮一个做工业物联网的团队,他们有几十个边缘节点,每个节点上跑 3-5 个 Go 微服务,想要一个轻量级的服务注册中心,能在节点内部做服务发现,不依赖外部基础设施。
我们最终的方案是:每个节点起一个 3 节点的 miniConsul 集群,用 hashicorp/raft 做共识。
核心组件
服务注册中心 = Raft 共识 + 状态机 + HTTP API + 健康检查- Raft:保证集群数据一致性(用 hashicorp/raft,Consul 用的就是这个库)
- 状态机:存储服务注册数据(KV 结构)
- HTTP API:服务注册/注销/查询
- 健康检查:定期 ping 服务,自动注销失效实例
实现
go get github.com/hashicorp/raft
go get github.com/hashicorp/raft-boltdb/v2Raft 状态机:服务注册数据
package registry
import (
"encoding/json"
"fmt"
"io"
"sync"
"time"
"github.com/hashicorp/raft"
)
// ServiceInstance 服务实例
type ServiceInstance struct {
ID string `json:"id"`
ServiceName string `json:"service_name"`
Host string `json:"host"`
Port int `json:"port"`
Tags []string `json:"tags,omitempty"`
Meta map[string]string `json:"meta,omitempty"`
Health string `json:"health"` // passing / warning / critical
RegisteredAt time.Time `json:"registered_at"`
LastChecked time.Time `json:"last_checked"`
}
// Address 返回服务地址
func (s *ServiceInstance) Address() string {
return fmt.Sprintf("%s:%d", s.Host, s.Port)
}
// Command Raft 命令
type Command struct {
Op string `json:"op"` // "register" / "deregister" / "update_health"
Instance *ServiceInstance `json:"instance,omitempty"`
ID string `json:"id,omitempty"`
Health string `json:"health,omitempty"`
}
// FSM 状态机(Raft 的应用层)
type FSM struct {
mu sync.RWMutex
instances map[string]*ServiceInstance // instanceID -> instance
}
func NewFSM() *FSM {
return &FSM{
instances: make(map[string]*ServiceInstance),
}
}
// Apply 应用 Raft 日志(Raft 共识后调用)
func (f *FSM) Apply(log *raft.Log) interface{} {
var cmd Command
if err := json.Unmarshal(log.Data, &cmd); err != nil {
return fmt.Errorf("unmarshal command: %w", err)
}
f.mu.Lock()
defer f.mu.Unlock()
switch cmd.Op {
case "register":
f.instances[cmd.Instance.ID] = cmd.Instance
case "deregister":
delete(f.instances, cmd.ID)
case "update_health":
if inst, ok := f.instances[cmd.ID]; ok {
inst.Health = cmd.Health
inst.LastChecked = time.Now()
}
default:
return fmt.Errorf("unknown op: %s", cmd.Op)
}
return nil
}
// Snapshot 创建快照(Raft 日志压缩用)
func (f *FSM) Snapshot() (raft.FSMSnapshot, error) {
f.mu.RLock()
defer f.mu.RUnlock()
instances := make(map[string]*ServiceInstance)
for k, v := range f.instances {
copy := *v
instances[k] = ©
}
return &fsmSnapshot{instances: instances}, nil
}
// Restore 从快照恢复
func (f *FSM) Restore(r io.ReadCloser) error {
defer r.Close()
var instances map[string]*ServiceInstance
if err := json.NewDecoder(r).Decode(&instances); err != nil {
return err
}
f.mu.Lock()
f.instances = instances
f.mu.Unlock()
return nil
}
// QueryByService 查询某个服务的所有健康实例
func (f *FSM) QueryByService(serviceName string, onlyHealthy bool) []*ServiceInstance {
f.mu.RLock()
defer f.mu.RUnlock()
var result []*ServiceInstance
for _, inst := range f.instances {
if inst.ServiceName != serviceName {
continue
}
if onlyHealthy && inst.Health != "passing" {
continue
}
copy := *inst
result = append(result, ©)
}
return result
}
type fsmSnapshot struct {
instances map[string]*ServiceInstance
}
func (s *fsmSnapshot) Persist(sink raft.SnapshotSink) error {
defer sink.Close()
return json.NewEncoder(sink).Encode(s.instances)
}
func (s *fsmSnapshot) Release() {}注册中心服务器
package registry
import (
"encoding/json"
"fmt"
"net/http"
"os"
"path/filepath"
"time"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb/v2"
)
// Server 服务注册中心节点
type Server struct {
raft *raft.Raft
fsm *FSM
nodeID raft.ServerID
dataDir string
}
// NewServer 创建新节点
func NewServer(nodeID, bindAddr, dataDir string) (*Server, error) {
if err := os.MkdirAll(dataDir, 0755); err != nil {
return nil, err
}
fsm := NewFSM()
config := raft.DefaultConfig()
config.LocalID = raft.ServerID(nodeID)
config.HeartbeatTimeout = 500 * time.Millisecond
config.ElectionTimeout = 1 * time.Second
config.CommitTimeout = 50 * time.Millisecond
// 日志存储
logStore, err := raftboltdb.NewBoltStore(filepath.Join(dataDir, "raft-log.db"))
if err != nil {
return nil, err
}
// 稳定存储(term 和 vote)
stableStore, err := raftboltdb.NewBoltStore(filepath.Join(dataDir, "raft-stable.db"))
if err != nil {
return nil, err
}
// 快照存储
snapshotStore, err := raft.NewFileSnapshotStore(dataDir, 2, os.Stderr)
if err != nil {
return nil, err
}
// 网络传输
addr, err := raft.NewTCPTransport(bindAddr, nil, 3, 10*time.Second, os.Stderr)
if err != nil {
return nil, err
}
r, err := raft.NewRaft(config, fsm, logStore, stableStore, snapshotStore, addr)
if err != nil {
return nil, err
}
return &Server{
raft: r,
fsm: fsm,
nodeID: raft.ServerID(nodeID),
dataDir: dataDir,
}, nil
}
// Bootstrap 初始化单节点集群
func (s *Server) Bootstrap() error {
cfg := raft.Configuration{
Servers: []raft.Server{{
ID: s.nodeID,
Address: s.raft.Leader(), // 获取自己的地址
}},
}
return s.raft.BootstrapCluster(cfg).Error()
}
// applyCommand 向 Raft 集群提交命令
func (s *Server) applyCommand(cmd Command) error {
if s.raft.State() != raft.Leader {
return fmt.Errorf("not leader")
}
data, _ := json.Marshal(cmd)
f := s.raft.Apply(data, 5*time.Second)
if err := f.Error(); err != nil {
return err
}
if err, ok := f.Response().(error); ok && err != nil {
return err
}
return nil
}
// RegisterHTTPHandlers 注册 HTTP 路由
func (s *Server) RegisterHTTPHandlers(mux *http.ServeMux) {
mux.HandleFunc("/v1/agent/service/register", s.handleRegister)
mux.HandleFunc("/v1/agent/service/deregister/", s.handleDeregister)
mux.HandleFunc("/v1/health/service/", s.handleQuery)
mux.HandleFunc("/v1/status/leader", s.handleLeader)
}
func (s *Server) handleRegister(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPut {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var inst ServiceInstance
if err := json.NewDecoder(r.Body).Decode(&inst); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
inst.Health = "passing"
inst.RegisteredAt = time.Now()
if err := s.applyCommand(Command{Op: "register", Instance: &inst}); err != nil {
if err.Error() == "not leader" {
// 重定向到 leader
w.Header().Set("X-Raft-Leader", string(s.raft.Leader()))
http.Error(w, "not leader", http.StatusTemporaryRedirect)
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
func (s *Server) handleDeregister(w http.ResponseWriter, r *http.Request) {
instanceID := r.URL.Path[len("/v1/agent/service/deregister/"):]
if err := s.applyCommand(Command{Op: "deregister", ID: instanceID}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
func (s *Server) handleQuery(w http.ResponseWriter, r *http.Request) {
serviceName := r.URL.Path[len("/v1/health/service/"):]
onlyHealthy := r.URL.Query().Get("passing") != ""
instances := s.fsm.QueryByService(serviceName, onlyHealthy)
json.NewEncoder(w).Encode(instances)
}
func (s *Server) handleLeader(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(map[string]string{
"leader": string(s.raft.Leader()),
"state": s.raft.State().String(),
})
}踩坑实录
踩坑 1:Raft 集群启动后一直选不出 Leader
现象:三个节点启动后,/v1/status/leader 一直返回空字符串,状态一直是 Follower 或 Candidate。
原因:三个节点各自调用了 BootstrapCluster,但传入的地址互相不包含对方。正确做法是只在集群第一次启动时,由一个节点 Bootstrap,包含所有节点的地址信息,其他节点不调用 Bootstrap。
解法:
// 只有第一次启动才 Bootstrap,之后重启不需要
isFirstBoot := isFirstBoot(dataDir) // 检查 raft-log.db 是否存在
if isFirstBoot {
s.raft.BootstrapCluster(raft.Configuration{
Servers: []raft.Server{
{ID: "node1", Address: "10.0.0.1:7000"},
{ID: "node2", Address: "10.0.0.2:7000"},
{ID: "node3", Address: "10.0.0.3:7000"},
},
})
}踩坑 2:非 Leader 节点收到写请求,客户端不知道怎么重定向
现象:客户端注册服务时,如果打到了非 Leader 节点,收到 307 重定向,但很多 HTTP 客户端不会自动跟随非 GET 方法的重定向。
解法:在响应 header 里返回 Leader 地址(X-Raft-Leader),客户端检查到非 200 时,解析 header 并重试。或者在客户端实现简单的轮询:注册失败时自动尝试下一个节点。
踩坑 3:健康检查和 Raft 写入的并发
现象:健康检查协程频繁更新实例健康状态,产生大量 Raft 日志,影响正常服务注册的延迟。
解法:健康状态的更新可以做成 best-effort(非强一致)——只有 Leader 才做健康检查,结果直接更新本地 FSM 状态(不通过 Raft 复制)。健康状态不需要强一致,短暂的不一致可以接受。
与 Consul 的差距在哪
本文实现的是 Consul 的核心骨架,真正的 Consul 还有:
- 多数据中心:跨区域的服务发现
- DNS 接口:不需要改代码,直接用 DNS 做服务发现
- ACL 权限控制:细粒度的读写权限
- Service Mesh(Connect):mTLS、流量管理
对于内网微服务发现场景,本文实现已经足够用了。
