Go 实现 RAG 系统——文档解析、嵌入存储、语义检索全流程 Go 实现
Go 实现 RAG 系统——文档解析、嵌入存储、语义检索全流程 Go 实现
适读人群:想用 Go 独立构建完整 RAG 流水线的工程师 | 阅读时长:约 19 分钟 | 核心价值:从 PDF/Word 文档解析到语义检索的完整 Go 实现,不依赖 Python 生态
上半年我帮一个做建筑设计规范咨询的团队搭了个内部知识库系统。他们有几百份 PDF 设计规范,设计师平时找规范很费时间,想让 AI 帮他们快速定位。
技术方案一开始考虑过 Python 的 LlamaIndex,但他们整个工程部门都是 Go,不想引入 Python 依赖。我花了两周时间用纯 Go 把整个 RAG 流水线实现出来,这篇文章把核心实现细节整理出来。
RAG 流水线全景
原始文档 (PDF/Word/TXT)
↓ 文档解析
纯文本
↓ 文本分块 (Chunking)
文本片段列表
↓ Embedding 生成
向量列表 + 元数据
↓ 写入向量数据库
[检索阶段]
用户问题 → Embedding → 向量搜索 → Top-K 片段 → LLM 生成答案每一步都有细节要处理,下面逐一讲。
第一步:文档解析
PDF 解析在 Go 里相对麻烦,没有像 Python pdfplumber 那样开箱即用的库。我对比了几个选项:
ledongthuc/pdfcpu:功能全但提取文本质量一般unidoc/unipdf:商业库,效果好但要付费- 调用 pdftotext 命令行工具:免费,效果好,依赖外部工具
最终选了调用 pdftotext 的方案,因为建筑规范 PDF 通常是可以直接提取文本的(不是扫描版),这个方案效果最好。
package parser
import (
"bytes"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
)
// DocumentParser 文档解析器
type DocumentParser struct {
tempDir string
}
func NewDocumentParser() *DocumentParser {
return &DocumentParser{tempDir: os.TempDir()}
}
// ParsePDF 解析 PDF,返回纯文本内容
func (p *DocumentParser) ParsePDF(filePath string) (string, error) {
// 检查 pdftotext 是否可用
if _, err := exec.LookPath("pdftotext"); err != nil {
return "", fmt.Errorf("pdftotext not found, install poppler-utils")
}
outFile := filepath.Join(p.tempDir, "output.txt")
defer os.Remove(outFile)
cmd := exec.Command("pdftotext", "-layout", filePath, outFile)
var stderr bytes.Buffer
cmd.Stderr = &stderr
if err := cmd.Run(); err != nil {
return "", fmt.Errorf("pdftotext failed: %s", stderr.String())
}
content, err := os.ReadFile(outFile)
if err != nil {
return "", err
}
return cleanText(string(content)), nil
}
// ParseTXT 解析纯文本文件
func (p *DocumentParser) ParseTXT(filePath string) (string, error) {
content, err := os.ReadFile(filePath)
if err != nil {
return "", err
}
return cleanText(string(content)), nil
}
// cleanText 清理文本:去除多余空白行、控制字符
func cleanText(text string) string {
lines := strings.Split(text, "\n")
var cleaned []string
emptyCount := 0
for _, line := range lines {
line = strings.TrimRight(line, " \t\r")
if line == "" {
emptyCount++
if emptyCount <= 1 {
cleaned = append(cleaned, "")
}
} else {
emptyCount = 0
cleaned = append(cleaned, line)
}
}
return strings.Join(cleaned, "\n")
}第二步:文本分块(Chunking)
这是 RAG 质量最关键的一步,分块策略直接决定检索效果。
package chunker
import (
"strings"
"unicode/utf8"
)
// Chunk 文本块
type Chunk struct {
Content string
Index int // 在原文中的顺序
Source string // 来源文件名
StartPos int // 在原文中的字符起始位置
}
// ChunkerConfig 分块配置
type ChunkerConfig struct {
ChunkSize int // 每块的字符数
ChunkOverlap int // 相邻块重叠的字符数(保留上下文)
}
// DefaultConfig 默认配置
var DefaultConfig = ChunkerConfig{
ChunkSize: 800, // 约 400-500 中文字
ChunkOverlap: 100, // 重叠 50-60 个中文字
}
// Chunker 文本分块器
type Chunker struct {
config ChunkerConfig
}
func NewChunker(config ChunkerConfig) *Chunker {
return &Chunker{config: config}
}
// SplitByParagraph 按段落分块,优先保持段落完整性
func (c *Chunker) SplitByParagraph(text, source string) []Chunk {
paragraphs := strings.Split(text, "\n\n")
var chunks []Chunk
var currentChunk strings.Builder
chunkIndex := 0
for _, para := range paragraphs {
para = strings.TrimSpace(para)
if para == "" {
continue
}
// 如果加入这个段落后超过限制,先把当前 chunk 保存
if currentChunk.Len() > 0 &&
utf8.RuneCountInString(currentChunk.String())+utf8.RuneCountInString(para) > c.config.ChunkSize {
content := strings.TrimSpace(currentChunk.String())
if content != "" {
chunks = append(chunks, Chunk{
Content: content,
Index: chunkIndex,
Source: source,
})
chunkIndex++
}
// 保留最后一段作为重叠
currentChunk.Reset()
if utf8.RuneCountInString(para) > c.config.ChunkOverlap {
// 取最后 ChunkOverlap 个字符
runes := []rune(para)
overlapStart := len(runes) - c.config.ChunkOverlap
if overlapStart < 0 {
overlapStart = 0
}
currentChunk.WriteString(string(runes[overlapStart:]))
}
}
if currentChunk.Len() > 0 {
currentChunk.WriteString("\n\n")
}
currentChunk.WriteString(para)
}
// 最后一块
if content := strings.TrimSpace(currentChunk.String()); content != "" {
chunks = append(chunks, Chunk{
Content: content,
Index: chunkIndex,
Source: source,
})
}
return chunks
}第三步:完整的索引构建流水线
package pipeline
import (
"context"
"fmt"
"log"
"path/filepath"
"sync"
"time"
"your-project/chunker"
"your-project/embedding"
"your-project/parser"
"your-project/qdrant"
)
type IndexPipeline struct {
parser *parser.DocumentParser
chunker *chunker.Chunker
embedder *embedding.EmbeddingClient
db *qdrant.QdrantClient
collection string
}
func NewIndexPipeline(collection string) (*IndexPipeline, error) {
db, err := qdrant.NewQdrantClient("localhost:6334")
if err != nil {
return nil, err
}
return &IndexPipeline{
parser: parser.NewDocumentParser(),
chunker: chunker.NewChunker(chunker.DefaultConfig),
embedder: embedding.NewEmbeddingClient(),
db: db,
collection: collection,
}, nil
}
// IndexFile 索引单个文件
func (p *IndexPipeline) IndexFile(ctx context.Context, filePath string) (int, error) {
ext := strings.ToLower(filepath.Ext(filePath))
source := filepath.Base(filePath)
var text string
var err error
switch ext {
case ".pdf":
text, err = p.parser.ParsePDF(filePath)
case ".txt":
text, err = p.parser.ParseTXT(filePath)
default:
return 0, fmt.Errorf("unsupported file type: %s", ext)
}
if err != nil {
return 0, fmt.Errorf("parse file: %w", err)
}
// 分块
chunks := p.chunker.SplitByParagraph(text, source)
if len(chunks) == 0 {
return 0, fmt.Errorf("no chunks extracted from %s", filePath)
}
log.Printf("File %s: extracted %d chunks", source, len(chunks))
// 提取文本用于 embedding
texts := make([]string, len(chunks))
for i, c := range chunks {
texts[i] = c.Content
}
// 批量生成 embedding(注意速率限制)
start := time.Now()
vectors, err := p.embedder.EmbedBatch(ctx, texts)
if err != nil {
return 0, fmt.Errorf("embed chunks: %w", err)
}
log.Printf("Embedding %d chunks took %v", len(chunks), time.Since(start))
// 构建文档列表
docs := make([]qdrant.LawDocument, len(chunks))
for i, c := range chunks {
docs[i] = qdrant.LawDocument{
ID: fmt.Sprintf("%s_%d", source, i),
Content: c.Content,
Title: source,
Vector: vectors[i],
}
}
// 写入向量数据库
if err := p.db.UpsertDocuments(ctx, p.collection, docs); err != nil {
return 0, fmt.Errorf("upsert to qdrant: %w", err)
}
return len(chunks), nil
}
// IndexDirectory 索引目录下的所有文件
func (p *IndexPipeline) IndexDirectory(ctx context.Context, dir string) error {
files, err := filepath.Glob(filepath.Join(dir, "*.pdf"))
if err != nil {
return err
}
txts, _ := filepath.Glob(filepath.Join(dir, "*.txt"))
files = append(files, txts...)
var mu sync.Mutex
totalChunks := 0
var errors []error
// 串行处理(embedding 有速率限制,不要并发)
for _, f := range files {
n, err := p.IndexFile(ctx, f)
if err != nil {
mu.Lock()
errors = append(errors, fmt.Errorf("%s: %w", filepath.Base(f), err))
mu.Unlock()
log.Printf("Warning: failed to index %s: %v", f, err)
continue
}
totalChunks += n
log.Printf("Indexed %s: %d chunks", filepath.Base(f), n)
}
log.Printf("Total: %d files, %d chunks indexed", len(files)-len(errors), totalChunks)
return nil
}踩坑实录
踩坑 1:建筑规范 PDF 有大量表格,提取结果乱码
现象:某些 PDF 里的表格内容提取出来是乱码或者格式完全错乱,向量化之后完全不可用。
原因:pdftotext 对复杂表格的处理比较弱,特别是含有合并单元格的表格。
解法:
- 在分块前做质量检测:如果一个块里乱码字符比例超过 20%,丢弃
- 对质量差的文件,标记出来人工检查,或者用 OCR 工具重新处理
- 表格类数据考虑单独抽取存储,不放进向量库
func isGoodChunk(text string) bool {
runes := []rune(text)
if len(runes) < 50 {
return false // 太短的块没价值
}
// 检查可打印字符比例
printable := 0
for _, r := range runes {
if r >= 32 && r != 65533 { // 65533 是 Unicode 替换字符(乱码标志)
printable++
}
}
return float64(printable)/float64(len(runes)) > 0.8
}踩坑 2:分块太小导致上下文丢失,检索结果语义不完整
现象:搜索"防火间距",返回的片段里只有数字("不应小于6米"),但没有说明是什么类型建筑之间的间距。
原因:分块把规范条文的前提条件和具体数值切断了。
解法:把 ChunkSize 从 300 调整到 800,并增加重叠。同时对规范类文档按章节分块(识别"第X.X.X条"这样的模式),不跨条文。
踩坑 3:重复索引文件没有去重,导致向量库膨胀
现象:运行了两次索引脚本,向量库里的数据变成了两份,检索结果出现重复。
原因:Qdrant 的 upsert 按 ID 更新,但我的 ID 生成方式没有保证幂等性(用了随机数)。
解法:ID 改为用文件 MD5 + chunk index 生成,保证同一文件同一块的 ID 是固定的,重复导入会覆盖而不是追加。
检索质量优化
除了向量检索,还可以加入关键词检索做 Hybrid Search:
// HybridSearch 混合检索:向量相似度 + 关键词 BM25
// 在 Qdrant 中需要启用 sparse vectors 支持
func (r *RAGSystem) HybridSearch(ctx context.Context, query string, topK int) ([]SearchResult, error) {
// 向量检索
denseVec, _ := r.embedder.Embed(ctx, query)
denseResults, _ := r.db.Search(ctx, r.collection, denseVec, topK*2, "")
// 关键词检索(Qdrant full-text search)
keywordResults, _ := r.db.KeywordSearch(ctx, r.collection, query, topK*2)
// RRF (Reciprocal Rank Fusion) 融合两个结果列表
return rrfFusion(denseResults, keywordResults, topK), nil
}实测:对建筑规范场景,混合检索比纯向量检索的召回率提升约 15%,尤其是包含专业术语和编号的查询。
