Go 开发数据库迁移工具——版本化 schema 管理,类 Flyway 的 Go 实现
Go 开发数据库迁移工具——版本化 schema 管理,类 Flyway 的 Go 实现
适读人群:Go 后端工程师、对数据库 schema 版本管理有需求的团队 | 阅读时长:约 16 分钟 | 核心价值:理解 Flyway 核心机制,用 Go 从零实现版本化 schema 迁移工具
年初有个做 SaaS 的团队找到我,他们有十几个客户用的是独立的数据库实例(私有化部署),每次产品升级都要手动 SSH 进去执行 SQL,有几次因为漏执行某个 ALTER TABLE,导致功能异常,客户投诉。
他们一开始想用 Flyway,但 Flyway 需要 JVM,每个客户机器上还要装 Java。他们问我能不能用 Go 实现一个类似的工具——单个二进制,扔过去就能用。
这篇文章把那个迁移工具的完整实现写出来。
核心设计
Flyway 的核心思路:
- 迁移脚本按版本号命名:
V1__init_schema.sql、V2__add_user_table.sql - 一个专门的
schema_migrations表记录已执行的版本 - 每次运行,对比脚本目录和已执行记录,执行所有新版本
- 已执行的脚本计算校验和,变更会报错(防止篡改历史迁移)
完整实现
package migrate
import (
"crypto/md5"
"database/sql"
"fmt"
"io"
"os"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"time"
)
// Migration 单个迁移文件
type Migration struct {
Version int
Description string
Filename string
FilePath string
Checksum string
SQL string
}
// MigrationRecord 已执行的迁移记录
type MigrationRecord struct {
Version int
Description string
Checksum string
ExecutedAt time.Time
ExecutionTimeMs int64
Success bool
}
// Migrator 数据库迁移器
type Migrator struct {
db *sql.DB
migrationsDir string
tableName string
dryRun bool
}
// filenamePattern 匹配 V1__description.sql 格式
var filenamePattern = regexp.MustCompile(`^V(\d+)__(.+)\.sql$`)
func NewMigrator(db *sql.DB, migrationsDir string) *Migrator {
return &Migrator{
db: db,
migrationsDir: migrationsDir,
tableName: "schema_migrations",
}
}
func (m *Migrator) WithDryRun(dryRun bool) *Migrator {
m.dryRun = dryRun
return m
}
// ensureTable 确保迁移记录表存在
func (m *Migrator) ensureTable() error {
_, err := m.db.Exec(fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
version INTEGER PRIMARY KEY,
description TEXT NOT NULL,
checksum TEXT NOT NULL,
executed_at TIMESTAMP NOT NULL,
execution_time_ms BIGINT NOT NULL,
success BOOLEAN NOT NULL DEFAULT TRUE
)
`, m.tableName))
return err
}
// loadMigrationFiles 从目录加载所有迁移脚本
func (m *Migrator) loadMigrationFiles() ([]*Migration, error) {
entries, err := os.ReadDir(m.migrationsDir)
if err != nil {
return nil, fmt.Errorf("read migrations dir: %w", err)
}
var migrations []*Migration
for _, entry := range entries {
if entry.IsDir() {
continue
}
matches := filenamePattern.FindStringSubmatch(entry.Name())
if matches == nil {
continue
}
version, _ := strconv.Atoi(matches[1])
description := strings.ReplaceAll(matches[2], "_", " ")
filePath := filepath.Join(m.migrationsDir, entry.Name())
content, err := os.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("read %s: %w", entry.Name(), err)
}
checksum := fmt.Sprintf("%x", md5.Sum(content))
migrations = append(migrations, &Migration{
Version: version,
Description: description,
Filename: entry.Name(),
FilePath: filePath,
Checksum: checksum,
SQL: string(content),
})
}
// 按版本号排序
sort.Slice(migrations, func(i, j int) bool {
return migrations[i].Version < migrations[j].Version
})
return migrations, nil
}
// loadAppliedRecords 加载已执行的迁移记录
func (m *Migrator) loadAppliedRecords() (map[int]*MigrationRecord, error) {
rows, err := m.db.Query(fmt.Sprintf(`
SELECT version, description, checksum, executed_at, execution_time_ms, success
FROM %s ORDER BY version
`, m.tableName))
if err != nil {
return nil, err
}
defer rows.Close()
records := make(map[int]*MigrationRecord)
for rows.Next() {
r := &MigrationRecord{}
rows.Scan(&r.Version, &r.Description, &r.Checksum, &r.ExecutedAt, &r.ExecutionTimeMs, &r.Success)
records[r.Version] = r
}
return records, nil
}
// Migrate 执行迁移
func (m *Migrator) Migrate() error {
if err := m.ensureTable(); err != nil {
return fmt.Errorf("ensure migration table: %w", err)
}
migrations, err := m.loadMigrationFiles()
if err != nil {
return err
}
applied, err := m.loadAppliedRecords()
if err != nil {
return err
}
// 校验已执行的迁移(校验和一致性检查)
for _, migration := range migrations {
record, wasApplied := applied[migration.Version]
if !wasApplied {
continue
}
if record.Checksum != migration.Checksum {
return fmt.Errorf(
"checksum mismatch for V%d (%s): expected %s, got %s. "+
"Do not modify applied migration files!",
migration.Version, migration.Description,
record.Checksum, migration.Checksum,
)
}
}
// 执行未执行的迁移
pending := 0
for _, migration := range migrations {
if _, wasApplied := applied[migration.Version]; wasApplied {
continue
}
pending++
fmt.Printf("Applying V%d: %s\n", migration.Version, migration.Description)
if m.dryRun {
fmt.Printf("[DRY RUN] Would execute:\n%s\n\n", migration.SQL)
continue
}
if err := m.applyMigration(migration); err != nil {
return fmt.Errorf("failed to apply V%d: %w", migration.Version, err)
}
fmt.Printf("Successfully applied V%d (%s)\n", migration.Version, migration.Description)
}
if pending == 0 {
fmt.Println("Already up to date.")
} else if !m.dryRun {
fmt.Printf("Applied %d migration(s) successfully.\n", pending)
}
return nil
}
// applyMigration 在事务中执行单个迁移
func (m *Migrator) applyMigration(migration *Migration) error {
tx, err := m.db.Begin()
if err != nil {
return err
}
start := time.Now()
// 执行 SQL(支持多条语句)
statements := splitSQL(migration.SQL)
for _, stmt := range statements {
stmt = strings.TrimSpace(stmt)
if stmt == "" {
continue
}
if _, err := tx.Exec(stmt); err != nil {
tx.Rollback()
return fmt.Errorf("SQL error: %w\nStatement: %s", err, stmt)
}
}
// 记录执行结果
execTime := time.Since(start).Milliseconds()
_, err = tx.Exec(fmt.Sprintf(`
INSERT INTO %s (version, description, checksum, executed_at, execution_time_ms, success)
VALUES (?, ?, ?, ?, ?, ?)
`, m.tableName),
migration.Version,
migration.Description,
migration.Checksum,
time.Now(),
execTime,
true,
)
if err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}
// splitSQL 按分号分割 SQL 语句(简单实现,不处理注释里的分号)
func splitSQL(sql string) []string {
var statements []string
parts := strings.Split(sql, ";")
for _, p := range parts {
p = strings.TrimSpace(p)
if p != "" && !strings.HasPrefix(p, "--") {
statements = append(statements, p)
}
}
return statements
}
// Status 显示迁移状态
func (m *Migrator) Status() error {
if err := m.ensureTable(); err != nil {
return err
}
migrations, err := m.loadMigrationFiles()
if err != nil {
return err
}
applied, err := m.loadAppliedRecords()
if err != nil {
return err
}
fmt.Printf("%-6s %-8s %-40s %-20s\n", "Status", "Version", "Description", "Applied At")
fmt.Println(strings.Repeat("-", 80))
for _, mig := range migrations {
record, wasApplied := applied[mig.Version]
status := "Pending"
appliedAt := "-"
if wasApplied {
status = "Applied"
if record.Success {
status = "OK"
} else {
status = "Failed"
}
appliedAt = record.ExecutedAt.Format("2006-01-02 15:04:05")
}
fmt.Printf("%-6s V%-7d %-40s %-20s\n", status, mig.Version, mig.Description, appliedAt)
}
return nil
}命令行接口
func main() {
var (
dsn = flag.String("dsn", "", "数据库连接字符串")
dir = flag.String("dir", "./migrations", "迁移文件目录")
dryRun = flag.Bool("dry-run", false, "预演,不实际执行")
statusCmd = flag.Bool("status", false, "显示迁移状态")
)
flag.Parse()
if *dsn == "" {
*dsn = os.Getenv("DATABASE_URL")
}
db, err := sql.Open("mysql", *dsn)
if err != nil {
log.Fatal(err)
}
defer db.Close()
migrator := migrate.NewMigrator(db, *dir).WithDryRun(*dryRun)
if *statusCmd {
migrator.Status()
return
}
if err := migrator.Migrate(); err != nil {
log.Fatalf("Migration failed: %v", err)
}
}踩坑实录
踩坑 1:MySQL 不支持事务内 DDL
现象:在 MySQL 里执行包含 CREATE TABLE 的迁移,迁移记录写入成功,但执行失败时回滚不了 DDL,导致数据库状态和迁移记录不一致。
原因:MySQL 的 DDL 语句(CREATE/ALTER/DROP TABLE)是隐式提交的,不支持事务回滚。PostgreSQL 支持事务 DDL,MySQL 不行。
解法:对于 MySQL,在执行 DDL 失败时,把失败状态也记录到 schema_migrations 表(success=false),下次运行时检查到失败记录会提示用户手动处理,不会重复执行。
踩坑 2:SQL 文件里有存储过程,分号分割出错
现象:某个迁移文件里包含创建存储过程的 SQL,存储过程体里有分号,splitSQL 函数把它们当作语句分割点,执行出错。
解法:增加 DELIMITER 支持,或者要求存储过程独立文件,不和普通 DDL 混在一起。更简单的方案是约定:复杂的存储过程迁移文件用 -- NO_SPLIT 注释标记,跳过分割直接整体执行。
踩坑 3:版本号跳跃导致顺序混乱
现象:两个开发同时创建了 V5 的迁移文件,合并代码时出现了两个 V5。
解法:
- 用时间戳作为版本号:
V20240115143000__add_index.sql(精确到秒,冲突概率极低) - 或者在 CI 里加检查脚本,发现版本号重复就报错
与 golang-migrate 对比
golang-migrate/migrate 是 Go 界最流行的迁移库,功能完善。如果你的项目只需要迁移功能,直接用它:
go get github.com/golang-migrate/migrate/v4本文实现的版本适合:
- 需要定制迁移逻辑(比如多租户按客户 ID 路由到不同数据库)
- 想理解迁移工具的内部原理
- 需要内嵌到自己的工具里,不想引入外部二进制
