Go 服务发现与注册——Consul、etcd 在 Go 微服务中的实战
Go 服务发现与注册——Consul、etcd 在 Go 微服务中的实战
适读人群:正在搭建 Go 微服务基础设施的工程师、从 Java 迁移过来想了解服务注册发现的同学 | 阅读时长:约19分钟 | 核心价值:掌握 Consul 和 etcd 两套方案的完整接入方式,以及健康检查和故障摘除的实战技巧
那次让我一夜没睡的"服务找不到"故障
2023年3月,我们刚把订单服务拆出来跑了一个月,某天凌晨3点,监控报警:订单创建失败率飙到40%。
我爬起来看日志,全是 connection refused 错误。打开机器一看,商品服务那台机器因为内存不足被 OOM killer 干掉了,进程重启后端口变了。
但订单服务的客户端配置里硬编码的还是旧端口。
那一夜之后,我痛下决心:硬编码 IP + 端口的方式必须彻底废掉,换成服务注册与发现。
这篇文章是我接入 Consul 和 etcd 这两套方案的完整记录。
服务注册发现是什么
Java 背景的工程师:这和 Spring Cloud Eureka/Nacos 是同一个问题域,就是"服务怎么找到对方"的问题。
没有服务注册发现之前:
订单服务 → 调用 → 商品服务(192.168.1.10:8080) # 硬编码IP,一旦换机器就崩有了服务注册发现之后:
商品服务启动 → 向注册中心注册自己(地址+端口+健康状态)
订单服务调用 → 向注册中心查询商品服务的可用实例列表 → 负载均衡选一个调用关键优势:
- 服务实例可以动态增加/减少(弹性伸缩)
- 故障实例自动被摘除(高可用)
- 不需要修改配置文件就能更换后端实例
方案一:Consul 实战
Consul 是 HashiCorp 出品的服务发现工具,内置 DNS 接口、健康检查、KV 存储,功能齐全,运维成本低。
安装 Consul
# macOS
brew install consul
consul agent -dev # 开发模式,单节点,数据不持久化
# 验证
curl http://localhost:8500/v1/status/leader服务注册
package main
import (
"fmt"
"log"
"net"
"os"
"os/signal"
"syscall"
consulapi "github.com/hashicorp/consul/api"
"google.golang.org/grpc"
pb "product-service/pb/product"
)
const (
serviceName = "product-service"
servicePort = 8081
)
func main() {
// 1. 启动 gRPC 服务
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", servicePort))
if err != nil {
log.Fatal(err)
}
grpcServer := grpc.NewServer()
pb.RegisterProductServiceServer(grpcServer, &ProductServer{})
go func() {
log.Printf("服务启动: :%d", servicePort)
grpcServer.Serve(lis)
}()
// 2. 注册到 Consul
serviceID := fmt.Sprintf("%s-%s-%d", serviceName, getLocalIP(), servicePort)
if err := registerToConsul(serviceID); err != nil {
log.Fatalf("注册 Consul 失败: %v", err)
}
log.Printf("已注册到 Consul,serviceID: %s", serviceID)
// 3. 优雅退出:注销服务
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
deregisterFromConsul(serviceID)
grpcServer.GracefulStop()
log.Println("服务已优雅退出")
}
func registerToConsul(serviceID string) error {
config := consulapi.DefaultConfig()
config.Address = "localhost:8500"
client, err := consulapi.NewClient(config)
if err != nil {
return err
}
// 服务注册信息
registration := &consulapi.AgentServiceRegistration{
ID: serviceID,
Name: serviceName,
Address: getLocalIP(),
Port: servicePort,
Tags: []string{"grpc", "v1"},
// 健康检查:Consul 每隔5秒 ping 一次,连续3次失败就摘除
Check: &consulapi.AgentServiceCheck{
GRPC: fmt.Sprintf("%s:%d", getLocalIP(), servicePort),
Interval: "5s",
Timeout: "3s",
DeregisterCriticalServiceAfter: "30s", // 不健康30秒后自动注销
},
}
return client.Agent().ServiceRegister(registration)
}
func deregisterFromConsul(serviceID string) {
config := consulapi.DefaultConfig()
client, _ := consulapi.NewClient(config)
client.Agent().ServiceDeregister(serviceID)
log.Printf("已从 Consul 注销: %s", serviceID)
}
func getLocalIP() string {
addrs, _ := net.InterfaceAddrs()
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() && ipnet.IP.To4() != nil {
return ipnet.IP.String()
}
}
return "127.0.0.1"
}服务发现(客户端)
package discovery
import (
"fmt"
"log"
consulapi "github.com/hashicorp/consul/api"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// ConsulResolver 从 Consul 发现服务并建立连接
type ConsulResolver struct {
client *consulapi.Client
}
func NewConsulResolver(addr string) (*ConsulResolver, error) {
config := consulapi.DefaultConfig()
config.Address = addr
client, err := consulapi.NewClient(config)
if err != nil {
return nil, err
}
return &ConsulResolver{client: client}, nil
}
// DiscoverAndDial 发现服务并建立 gRPC 连接(带简单轮询负载均衡)
func (r *ConsulResolver) DiscoverAndDial(serviceName string) (*grpc.ClientConn, error) {
// 查询健康的服务实例
services, _, err := r.client.Health().Service(serviceName, "", true, nil)
if err != nil {
return nil, fmt.Errorf("查询服务 %s 失败: %w", serviceName, err)
}
if len(services) == 0 {
return nil, fmt.Errorf("服务 %s 没有可用实例", serviceName)
}
// 简单轮询(生产中应用更完善的负载均衡)
svc := services[0].Service
addr := fmt.Sprintf("%s:%d", svc.Address, svc.Port)
log.Printf("发现服务 %s,连接: %s", serviceName, addr)
return grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
// WatchAndUpdate 监听服务变化(Watch 模式)
func (r *ConsulResolver) WatchAndUpdate(serviceName string, onChange func([]string)) {
var lastIndex uint64
for {
services, meta, err := r.client.Health().Service(
serviceName, "", true,
&consulapi.QueryOptions{WaitIndex: lastIndex},
)
if err != nil {
log.Printf("Watch 服务 %s 失败: %v", serviceName, err)
continue
}
lastIndex = meta.LastIndex
var addrs []string
for _, svc := range services {
addrs = append(addrs, fmt.Sprintf("%s:%d", svc.Service.Address, svc.Service.Port))
}
onChange(addrs)
}
}方案二:etcd 实战
etcd 是 Kubernetes 的默认存储后端,强一致性,适合对数据可靠性要求高的场景。
package etcd
import (
"context"
"fmt"
"log"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
const etcdEndpoint = "localhost:2379"
// EtcdRegistry 基于 etcd 的服务注册
type EtcdRegistry struct {
client *clientv3.Client
leaseID clientv3.LeaseID
serviceKey string
}
func NewEtcdRegistry() (*EtcdRegistry, error) {
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{etcdEndpoint},
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, err
}
return &EtcdRegistry{client: client}, nil
}
// Register 注册服务(带 TTL 租约)
func (r *EtcdRegistry) Register(serviceName, addr string, ttl int64) error {
// 创建租约(ttl 秒后自动过期,相当于心跳机制)
ctx := context.Background()
resp, err := r.client.Grant(ctx, ttl)
if err != nil {
return fmt.Errorf("创建租约失败: %w", err)
}
r.leaseID = resp.ID
// 注册服务,key 格式:/services/serviceName/addr
r.serviceKey = fmt.Sprintf("/services/%s/%s", serviceName, addr)
_, err = r.client.Put(ctx, r.serviceKey, addr, clientv3.WithLease(r.leaseID))
if err != nil {
return fmt.Errorf("注册服务失败: %w", err)
}
// 启动续约(keepalive)
keepAliveCh, err := r.client.KeepAlive(ctx, r.leaseID)
if err != nil {
return fmt.Errorf("启动续约失败: %w", err)
}
// 在后台消费续约响应(不消费会阻塞)
go func() {
for resp := range keepAliveCh {
if resp == nil {
log.Println("etcd 续约 channel 关闭,服务可能已注销")
return
}
}
}()
log.Printf("服务已注册到 etcd: %s → %s", r.serviceKey, addr)
return nil
}
// Deregister 注销服务
func (r *EtcdRegistry) Deregister() error {
ctx := context.Background()
_, err := r.client.Revoke(ctx, r.leaseID)
if err != nil {
return err
}
log.Printf("服务已从 etcd 注销: %s", r.serviceKey)
return nil
}
// EtcdDiscovery 服务发现
type EtcdDiscovery struct {
client *clientv3.Client
}
func NewEtcdDiscovery() (*EtcdDiscovery, error) {
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{etcdEndpoint},
DialTimeout: 5 * time.Second,
})
if err != nil {
return nil, err
}
return &EtcdDiscovery{client: client}, nil
}
// GetServiceAddrs 获取服务的所有可用地址
func (d *EtcdDiscovery) GetServiceAddrs(serviceName string) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
prefix := fmt.Sprintf("/services/%s/", serviceName)
resp, err := d.client.Get(ctx, prefix, clientv3.WithPrefix())
if err != nil {
return nil, err
}
var addrs []string
for _, kv := range resp.Kvs {
addrs = append(addrs, string(kv.Value))
}
return addrs, nil
}
// WatchService 监听服务变化(Watch etcd 前缀)
func (d *EtcdDiscovery) WatchService(serviceName string, onChange func([]string)) {
prefix := fmt.Sprintf("/services/%s/", serviceName)
watchChan := d.client.Watch(context.Background(), prefix, clientv3.WithPrefix())
for resp := range watchChan {
for _, event := range resp.Events {
log.Printf("服务变化: %s %s", event.Type, event.Kv.Key)
}
// 重新获取最新地址列表
addrs, err := d.GetServiceAddrs(serviceName)
if err == nil {
onChange(addrs)
}
}
}踩坑实录
坑1:Consul 健康检查用 TCP 而不是 gRPC,导致实际挂了但状态显示健康
现象: 服务的 gRPC 端口无法处理请求了(handler panic 全部返回错误),但 Consul 健康检查显示服务状态正常,流量还在打进来。
原因: 配置的是 TCP 检查(只检查端口是否可连接),而不是 gRPC 健康检查(调用 grpc.health.v1.Health/Check)。
解法: 使用 gRPC 协议的健康检查,并在服务里实现健康检查接口:
import "google.golang.org/grpc/health/grpc_health_v1"
// 注册健康检查服务
healthSrv := health.NewServer()
grpc_health_v1.RegisterHealthServer(grpcServer, healthSrv)
healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)坑2:etcd 租约过期了但 KeepAlive 的 channel 没有及时消费,续约失败
现象: 服务在高负载下,etcd 的 KeepAlive channel 积压,续约响应没人消费,导致 KeepAlive 被取消,服务从 etcd 消失。
原因: client.KeepAlive 返回的 channel 必须被消费,否则会阻塞续约流程。
解法: 启动一个独立的 goroutine 专门消费续约响应,并监控 channel 关闭事件以便重新注册。
坑3:服务注销时没等注销完就退出,导致短暂的"僵尸服务"
现象: 滚动发布时,老实例已经退出,但 Consul 里的服务条目还在,新请求偶尔打到已经不存在的实例,报 connection refused。
原因: 优雅退出时,调用了 deregister 但没等待注销完成,进程就退出了。Consul 还没来得及同步给其他服务的本地缓存。
解法: 注销后加一个短暂的等待,让 Consul 的变更传播开:
deregisterFromConsul(serviceID)
time.Sleep(3 * time.Second) // 等待 Consul 传播
grpcServer.GracefulStop()Consul vs etcd 选型建议
| 对比维度 | Consul | etcd |
|---|---|---|
| 部署复杂度 | 低(单节点开发模式) | 中(推荐3节点集群) |
| 内置功能 | 丰富(DNS、KV、健康检查) | 简洁(只做分布式KV) |
| 与 K8s 集成 | 需要额外配置 | 原生(K8s 内置) |
| 健康检查 | 内置,功能强大 | 依赖租约 TTL |
| 适合场景 | 裸机部署、混合云 | K8s 环境、需要强一致性 |
我的建议:
- 已在用 Kubernetes:直接用 etcd(K8s 自带)或者 K8s Service + CoreDNS
- 裸机 / 混合云部署:用 Consul,运维更友好
- 从 Java Nacos 迁移:Consul 的概念更接近,上手快
