Go 实现数据管道——ETL 框架的设计与实现,支持插件扩展
2026/4/30大约 5 分钟
Go 实现数据管道——ETL 框架的设计与实现,支持插件扩展
适读人群:Go 开发者、需要构建数据同步/转换流水线的工程师 | 阅读时长:约 18 分钟 | 核心价值:设计一个类 Fluentd 的 Go ETL 框架,Source→Transform→Sink 三层插件化架构
前年帮一个做新零售的团队解决数据同步问题。他们有十几个数据源:MySQL、PostgreSQL、Excel 文件、HTTP API、Kafka,数据要汇聚到 ClickHouse 做分析。每个数据源都写了独立的同步脚本,维护起来一团乱,代码重复到令人发指。
我建议他们抽象一个统一的数据管道框架:Source 插件负责读数据,Transform 插件负责清洗转换,Sink 插件负责写入目标。新增数据源只需要实现一个 Source 接口。
这个框架后来被他们用于几十条数据管道,维护成本大幅降低。
框架设计
Source (读取) → [Transform1 → Transform2 → ...] → Sink (写入)
↑ 插件化,可组合核心接口:
Source:生产数据行Transform:转换/过滤数据行Sink:消费数据行Pipeline:编排三层,控制流速和背压
核心接口和框架
package etl
import (
"context"
"fmt"
"log"
"sync"
"time"
)
// Record 数据行(弱类型 map)
type Record map[string]interface{}
// Source 数据源接口
type Source interface {
Name() string
// Open 初始化,建立连接等
Open(ctx context.Context) error
// Read 读取数据,返回 channel(Source 负责关闭)
Read(ctx context.Context) (<-chan Record, error)
Close() error
}
// Transform 转换器接口
type Transform interface {
Name() string
// Process 处理单条记录,返回 nil 表示丢弃该记录
Process(ctx context.Context, record Record) (Record, error)
}
// Sink 写入目标接口
type Sink interface {
Name() string
Open(ctx context.Context) error
// Write 批量写入
Write(ctx context.Context, records []Record) error
Close() error
}
// PipelineConfig 管道配置
type PipelineConfig struct {
Name string
BatchSize int // 批量写入的大小
FlushInterval time.Duration // 超时强制 flush
Workers int // 并发 Transform 数量
ErrorLimit int // 最大错误数,超过则停止管道
}
// Stats 管道统计
type Stats struct {
ReadCount int64
TransformCount int64
WriteCount int64
DropCount int64
ErrorCount int64
StartTime time.Time
}
// Pipeline 数据管道
type Pipeline struct {
config PipelineConfig
source Source
transforms []Transform
sink Sink
stats Stats
mu sync.RWMutex
}
func NewPipeline(config PipelineConfig, source Source, sink Sink, transforms ...Transform) *Pipeline {
if config.BatchSize == 0 {
config.BatchSize = 100
}
if config.FlushInterval == 0 {
config.FlushInterval = 5 * time.Second
}
if config.Workers == 0 {
config.Workers = 4
}
if config.ErrorLimit == 0 {
config.ErrorLimit = 1000
}
return &Pipeline{
config: config,
source: source,
transforms: transforms,
sink: sink,
}
}
// Run 运行管道
func (p *Pipeline) Run(ctx context.Context) error {
log.Printf("[Pipeline:%s] Starting...", p.config.Name)
p.stats.StartTime = time.Now()
// 初始化 source 和 sink
if err := p.source.Open(ctx); err != nil {
return fmt.Errorf("open source %s: %w", p.source.Name(), err)
}
defer p.source.Close()
if err := p.sink.Open(ctx); err != nil {
return fmt.Errorf("open sink %s: %w", p.sink.Name(), err)
}
defer p.sink.Close()
// 读取数据
inputCh, err := p.source.Read(ctx)
if err != nil {
return fmt.Errorf("read from source: %w", err)
}
// Transform 阶段(多 worker 并行)
transformedCh := p.runTransforms(ctx, inputCh)
// Sink 阶段(批量写入)
return p.runSink(ctx, transformedCh)
}
func (p *Pipeline) runTransforms(ctx context.Context, input <-chan Record) <-chan Record {
output := make(chan Record, p.config.BatchSize)
var wg sync.WaitGroup
for i := 0; i < p.config.Workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case record, ok := <-input:
if !ok {
return
}
p.mu.Lock()
p.stats.ReadCount++
p.mu.Unlock()
// 依次通过所有 Transform
result := record
dropped := false
for _, t := range p.transforms {
processed, err := t.Process(ctx, result)
if err != nil {
p.mu.Lock()
p.stats.ErrorCount++
p.mu.Unlock()
log.Printf("[Pipeline:%s] Transform %s error: %v", p.config.Name, t.Name(), err)
dropped = true
break
}
if processed == nil {
p.mu.Lock()
p.stats.DropCount++
p.mu.Unlock()
dropped = true
break
}
result = processed
}
if !dropped {
p.mu.Lock()
p.stats.TransformCount++
p.mu.Unlock()
select {
case output <- result:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}()
}
go func() {
wg.Wait()
close(output)
}()
return output
}
func (p *Pipeline) runSink(ctx context.Context, input <-chan Record) error {
batch := make([]Record, 0, p.config.BatchSize)
ticker := time.NewTicker(p.config.FlushInterval)
defer ticker.Stop()
flush := func() error {
if len(batch) == 0 {
return nil
}
if err := p.sink.Write(ctx, batch); err != nil {
return fmt.Errorf("write to sink: %w", err)
}
p.mu.Lock()
p.stats.WriteCount += int64(len(batch))
p.mu.Unlock()
log.Printf("[Pipeline:%s] Flushed %d records", p.config.Name, len(batch))
batch = batch[:0]
return nil
}
for {
select {
case record, ok := <-input:
if !ok {
// 输入关闭,最后一次 flush
return flush()
}
batch = append(batch, record)
if len(batch) >= p.config.BatchSize {
if err := flush(); err != nil {
return err
}
}
case <-ticker.C:
if err := flush(); err != nil {
return err
}
case <-ctx.Done():
flush() // 尽力 flush
return ctx.Err()
}
}
}
// GetStats 获取统计信息
func (p *Pipeline) GetStats() Stats {
p.mu.RLock()
defer p.mu.RUnlock()
return p.stats
}内置插件实现
MySQL Source
package plugins
import (
"context"
"database/sql"
"fmt"
"your-project/etl"
_ "github.com/go-sql-driver/mysql"
)
type MySQLSource struct {
dsn string
query string
db *sql.DB
}
func NewMySQLSource(dsn, query string) *MySQLSource {
return &MySQLSource{dsn: dsn, query: query}
}
func (s *MySQLSource) Name() string { return "mysql" }
func (s *MySQLSource) Open(ctx context.Context) error {
db, err := sql.Open("mysql", s.dsn)
if err != nil {
return err
}
s.db = db
return db.PingContext(ctx)
}
func (s *MySQLSource) Read(ctx context.Context) (<-chan etl.Record, error) {
rows, err := s.db.QueryContext(ctx, s.query)
if err != nil {
return nil, fmt.Errorf("query: %w", err)
}
cols, err := rows.Columns()
if err != nil {
rows.Close()
return nil, err
}
out := make(chan etl.Record, 1000)
go func() {
defer close(out)
defer rows.Close()
vals := make([]interface{}, len(cols))
scanArgs := make([]interface{}, len(cols))
for i := range vals {
scanArgs[i] = &vals[i]
}
for rows.Next() {
if err := rows.Scan(scanArgs...); err != nil {
continue
}
record := make(etl.Record, len(cols))
for i, col := range cols {
record[col] = vals[i]
}
select {
case out <- record:
case <-ctx.Done():
return
}
}
}()
return out, nil
}
func (s *MySQLSource) Close() error {
if s.db != nil {
return s.db.Close()
}
return nil
}常用 Transform 插件
// RenameTransform 字段重命名
type RenameTransform struct {
mapping map[string]string // old -> new
}
func NewRenameTransform(mapping map[string]string) *RenameTransform {
return &RenameTransform{mapping: mapping}
}
func (t *RenameTransform) Name() string { return "rename" }
func (t *RenameTransform) Process(_ context.Context, record etl.Record) (etl.Record, error) {
for oldKey, newKey := range t.mapping {
if val, ok := record[oldKey]; ok {
record[newKey] = val
delete(record, oldKey)
}
}
return record, nil
}
// FilterTransform 条件过滤
type FilterTransform struct {
predicate func(etl.Record) bool
}
func NewFilterTransform(predicate func(etl.Record) bool) *FilterTransform {
return &FilterTransform{predicate: predicate}
}
func (t *FilterTransform) Name() string { return "filter" }
func (t *FilterTransform) Process(_ context.Context, record etl.Record) (etl.Record, error) {
if !t.predicate(record) {
return nil, nil // nil 表示丢弃
}
return record, nil
}踩坑实录
踩坑 1:背压处理不当导致内存暴涨
现象:Source 读取速度远快于 Sink 写入速度,中间 channel 积压,内存暴涨。
解法:给中间 channel 设置合理的 buffer(BatchSize),当 buffer 满时 Source 会自然背压(阻塞在 chan <- record)。同时监控 channel 长度,超过阈值时触发告警。
踩坑 2:Transform panic 导致整个管道停止
现象:某个 Transform 里访问了 nil 指针,整个 worker goroutine 崩溃,管道停止处理数据,但没有错误日志。
解法:在每个 Transform worker 里加 recover:
defer func() {
if r := recover(); r != nil {
log.Printf("[Pipeline] Transform panic: %v", r)
}
}()踩坑 3:Sink 批量写失败时没有重试,数据丢失
现象:ClickHouse 短暂不可用时,批量写入失败,这批数据直接丢失了。
解法:Sink 写入加指数退避重试,重试都失败时写入 dead letter 文件,人工处理:
func writeWithRetry(ctx context.Context, sink etl.Sink, records []etl.Record) error {
for attempt := 0; attempt < 3; attempt++ {
if err := sink.Write(ctx, records); err == nil {
return nil
}
time.Sleep(time.Duration(1<<attempt) * time.Second)
}
return writeToDeadLetter(records)
}