Go 开发配置管理工具——类 Consul 的 KV 存储,用 Go 从零实现
Go 开发配置管理工具——类 Consul 的 KV 存储,用 Go 从零实现
适读人群:Go 后端工程师、想理解配置中心内部原理的开发者 | 阅读时长:约 18 分钟 | 核心价值:用 Go 实现带版本、Watch、权限的 KV 配置存储,理解 Consul KV 的核心机制
两年前在一个做物联网平台的公司,他们有几百台边缘设备需要动态下发配置,不能重启服务。一开始用 Consul,但 Consul 部署成本高,边缘环境网络不稳定,而且他们的设备配置有严格的权限隔离需求,Consul 的 ACL 配置太复杂。
他们决定自己实现一个轻量级的配置中心,只实现核心功能:KV 存储、版本控制、Watch 推送、简单权限。这篇文章把那个实现整理出来。
核心功能设计
HTTP API
├── GET /kv/{key} 读取配置
├── PUT /kv/{key} 写入配置
├── DELETE /kv/{key} 删除配置
├── GET /kv?prefix={p} 前缀列举
└── GET /watch/{key} SSE 长轮询监听变化
存储层:SQLite(嵌入式,无外部依赖)
版本:每次修改递增全局版本号
Watch:基于 SSE(Server-Sent Events),实时推送变更为什么用 SQLite 而不是 etcd/Redis?对于边缘设备,SQLite 是嵌入式数据库,无外部依赖,持久化可靠,对于配置存储完全够用。
存储层实现
package store
import (
"database/sql"
"encoding/json"
"fmt"
"sync/atomic"
"time"
_ "github.com/mattn/go-sqlite3"
)
// ConfigItem KV 配置项
type ConfigItem struct {
Key string
Value string
Version int64
CreatedAt time.Time
UpdatedAt time.Time
Metadata map[string]string // 额外元数据(如创建者、描述)
}
// ChangeEvent 变更事件
type ChangeEvent struct {
Type string // "set" / "delete"
Key string
Item *ConfigItem
Version int64
}
// KVStore KV 存储引擎
type KVStore struct {
db *sql.DB
version atomic.Int64
watchers *WatcherRegistry
}
func NewKVStore(dbPath string) (*KVStore, error) {
db, err := sql.Open("sqlite3", dbPath+"?_journal_mode=WAL&_synchronous=NORMAL")
if err != nil {
return nil, err
}
s := &KVStore{
db: db,
watchers: NewWatcherRegistry(),
}
if err := s.initSchema(); err != nil {
return nil, err
}
// 恢复最大版本号
var maxVersion int64
db.QueryRow("SELECT COALESCE(MAX(version), 0) FROM kv_items").Scan(&maxVersion)
s.version.Store(maxVersion)
return s, nil
}
func (s *KVStore) initSchema() error {
_, err := s.db.Exec(`
CREATE TABLE IF NOT EXISTS kv_items (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
version INTEGER NOT NULL,
metadata TEXT,
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL
);
CREATE TABLE IF NOT EXISTS kv_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT NOT NULL,
value TEXT NOT NULL,
version INTEGER NOT NULL,
op TEXT NOT NULL,
created_at DATETIME NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_history_key ON kv_history(key);
CREATE INDEX IF NOT EXISTS idx_history_version ON kv_history(version);
`)
return err
}
// Set 写入或更新配置
func (s *KVStore) Set(key, value string, metadata map[string]string) (*ConfigItem, error) {
version := s.version.Add(1)
now := time.Now()
metaJSON, _ := json.Marshal(metadata)
tx, err := s.db.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
_, err = tx.Exec(`
INSERT INTO kv_items (key, value, version, metadata, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(key) DO UPDATE SET
value = excluded.value,
version = excluded.version,
metadata = excluded.metadata,
updated_at = excluded.updated_at
`, key, value, version, string(metaJSON), now, now)
if err != nil {
return nil, err
}
// 写入历史记录
_, err = tx.Exec(`
INSERT INTO kv_history (key, value, version, op, created_at)
VALUES (?, ?, ?, 'set', ?)
`, key, value, version, now)
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
item := &ConfigItem{
Key: key,
Value: value,
Version: version,
UpdatedAt: now,
Metadata: metadata,
}
// 通知 watchers
s.watchers.Notify(ChangeEvent{
Type: "set",
Key: key,
Item: item,
Version: version,
})
return item, nil
}
// Get 读取配置
func (s *KVStore) Get(key string) (*ConfigItem, error) {
row := s.db.QueryRow(`
SELECT key, value, version, metadata, created_at, updated_at
FROM kv_items WHERE key = ?
`, key)
item := &ConfigItem{}
var metaJSON string
err := row.Scan(&item.Key, &item.Value, &item.Version, &metaJSON, &item.CreatedAt, &item.UpdatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
if metaJSON != "" {
json.Unmarshal([]byte(metaJSON), &item.Metadata)
}
return item, nil
}
// List 前缀列举
func (s *KVStore) List(prefix string) ([]*ConfigItem, error) {
rows, err := s.db.Query(`
SELECT key, value, version, metadata, created_at, updated_at
FROM kv_items WHERE key LIKE ? ORDER BY key
`, prefix+"%")
if err != nil {
return nil, err
}
defer rows.Close()
var items []*ConfigItem
for rows.Next() {
item := &ConfigItem{}
var metaJSON string
rows.Scan(&item.Key, &item.Value, &item.Version, &metaJSON, &item.CreatedAt, &item.UpdatedAt)
if metaJSON != "" {
json.Unmarshal([]byte(metaJSON), &item.Metadata)
}
items = append(items, item)
}
return items, nil
}
// Delete 删除配置
func (s *KVStore) Delete(key string) error {
version := s.version.Add(1)
now := time.Now()
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
result, err := tx.Exec("DELETE FROM kv_items WHERE key = ?", key)
if err != nil {
return err
}
n, _ := result.RowsAffected()
if n == 0 {
return fmt.Errorf("key not found: %s", key)
}
tx.Exec("INSERT INTO kv_history (key, value, version, op, created_at) VALUES (?, '', ?, 'delete', ?)",
key, version, now)
if err := tx.Commit(); err != nil {
return err
}
s.watchers.Notify(ChangeEvent{Type: "delete", Key: key, Version: version})
return nil
}Watch 机制:SSE 推送
package store
import (
"fmt"
"net/http"
"sync"
"time"
)
// WatcherRegistry 管理所有活跃的 Watch 连接
type WatcherRegistry struct {
mu sync.RWMutex
watchers map[string][]chan ChangeEvent // key -> watcher channels
}
func NewWatcherRegistry() *WatcherRegistry {
return &WatcherRegistry{
watchers: make(map[string][]chan ChangeEvent),
}
}
func (r *WatcherRegistry) Subscribe(key string) chan ChangeEvent {
ch := make(chan ChangeEvent, 10)
r.mu.Lock()
r.watchers[key] = append(r.watchers[key], ch)
r.mu.Unlock()
return ch
}
func (r *WatcherRegistry) Unsubscribe(key string, ch chan ChangeEvent) {
r.mu.Lock()
defer r.mu.Unlock()
watchers := r.watchers[key]
for i, w := range watchers {
if w == ch {
r.watchers[key] = append(watchers[:i], watchers[i+1:]...)
close(ch)
return
}
}
}
func (r *WatcherRegistry) Notify(event ChangeEvent) {
r.mu.RLock()
defer r.mu.RUnlock()
// 精确匹配
for _, ch := range r.watchers[event.Key] {
select {
case ch <- event:
default:
}
}
// 前缀匹配的 watcher(key 以 "/" 结尾表示前缀监听)
for watchKey, chs := range r.watchers {
if len(watchKey) > 0 && watchKey[len(watchKey)-1] == '/' {
prefix := watchKey[:len(watchKey)-1]
if len(event.Key) >= len(prefix) && event.Key[:len(prefix)] == prefix {
for _, ch := range chs {
select {
case ch <- event:
default:
}
}
}
}
}
}
// WatchHandler HTTP SSE 处理器
func WatchHandler(store *KVStore) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
key := r.URL.Path[len("/watch/"):]
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
ch := store.watchers.Subscribe(key)
defer store.watchers.Unsubscribe(key, ch)
// 发送初始值
if item, _ := store.Get(key); item != nil {
fmt.Fprintf(w, "data: {\"type\":\"init\",\"key\":\"%s\",\"value\":\"%s\",\"version\":%d}\n\n",
item.Key, item.Value, item.Version)
flusher.Flush()
}
// 心跳 ticker,防止客户端连接超时
heartbeat := time.NewTicker(30 * time.Second)
defer heartbeat.Stop()
for {
select {
case event, ok := <-ch:
if !ok {
return
}
var data string
if event.Type == "delete" {
data = fmt.Sprintf(`{"type":"delete","key":"%s","version":%d}`, event.Key, event.Version)
} else {
data = fmt.Sprintf(`{"type":"set","key":"%s","value":"%s","version":%d}`,
event.Key, event.Item.Value, event.Version)
}
fmt.Fprintf(w, "data: %s\n\n", data)
flusher.Flush()
case <-heartbeat.C:
fmt.Fprintf(w, ": heartbeat\n\n")
flusher.Flush()
case <-r.Context().Done():
return
}
}
}
}踩坑实录
踩坑 1:SQLite WAL 模式下并发写冲突
现象:多个配置写入同时到来时,偶发 database is locked 错误。
原因:SQLite 默认的 WAL 模式允许一个写者 + 多个读者,但如果写事务太多,后来的写者会短暂 block。连接串里没有设置 _busy_timeout。
解法:连接串里加 _busy_timeout=5000(等 5 秒),对于配置中心的写入频率完全够:
db, err := sql.Open("sqlite3", dbPath+"?_journal_mode=WAL&_busy_timeout=5000")踩坑 2:Watch 连接断开后,Channel 没有清理导致 goroutine 泄漏
现象:客户端频繁断开重连后,watcher 列表越来越长,内存持续增长。
原因:客户端断开连接后,r.Context().Done() 触发了 WatchHandler 退出,defer Unsubscribe 也执行了,但 ch 还有未消费的消息,close(ch) 之后写入 closed channel 会 panic。
解法:Notify 里写入 channel 要用 select { case ch <- event: default: }(已有),close(ch) 要在确认没有写入方之后才调用。更稳健的做法是不用 close,而是用一个额外的 done channel 通知:
type Watcher struct {
events chan ChangeEvent
done chan struct{}
}踩坑 3:版本号 atomic 在重启后从 0 开始,低于数据库里的版本
现象:服务重启后,写入的新配置版本号从 1 开始,而数据库里已有版本号到 10000,客户端的版本比较逻辑出错。
解法:启动时从数据库里读 MAX(version) 来初始化 atomic 版本号(代码里已有):
var maxVersion int64
db.QueryRow("SELECT COALESCE(MAX(version), 0) FROM kv_items").Scan(&maxVersion)
s.version.Store(maxVersion)与 Consul KV 功能对比
| 功能 | 本方案 | Consul KV |
|---|---|---|
| 基本 CRUD | 支持 | 支持 |
| 前缀列举 | 支持 | 支持 |
| Watch 推送 | SSE | Long Polling / SSE |
| 版本历史 | 支持(SQLite) | 支持(有限) |
| 多节点同步 | 不支持(单节点) | 支持(Raft) |
| 部署依赖 | 无 | 需要 Consul 集群 |
单节点场景(边缘设备、内部工具)用这个方案完全够用。需要多节点高可用的场景,还是建议 Consul 或 etcd。
