Go 调用 OpenAI API 实战——并发请求、重试限流、成本控制完整方案
Go 调用 OpenAI API 实战——并发请求、重试限流、成本控制完整方案
适读人群:Go 开发者、正在或计划接入 OpenAI API 的工程师 | 阅读时长:约 16 分钟 | 核心价值:生产级 OpenAI Go 客户端实现,重点解决并发、限流、成本三大工程问题
上个季度有个做医疗影像报告的团队找到我,他们已经在 Python 里把 OpenAI 接好了,但准备把整个后端迁到 Go。产品是给放射科医生用的,AI 辅助读片之后生成初步报告,医生复核确认。
迁移过程中遇到的问题比我预想的多。Python 那套对 API 限流的处理比较随意——加个 time.sleep(1) 就完事了。Go 里要做成生产级别的,并发控制、重试逻辑、成本统计都要认真设计。
这篇文章把那次迁移的经验整理出来。
选 openai-go 还是自己封装 HTTP?
这个问题值得先聊一下。OpenAI 的官方 Go SDK(github.com/openai/openai-go)2024 年才发布,之前社区最流行的是 sashabaranov/go-openai。
我的建议:
- 新项目:用官方 SDK
openai-go,设计风格和 Anthropic 的 Go SDK 类似,类型安全,维护有保障 - 老项目:
go-openai已经相当成熟,API 稳定,迁移成本不值得
这篇文章用官方 SDK 演示。
go get github.com/openai/openai-go基础封装
package client
import (
"context"
"os"
"github.com/openai/openai-go"
"github.com/openai/openai-go/option"
)
type OpenAIClient struct {
client *openai.Client
model string
}
func NewOpenAIClient() *OpenAIClient {
c := openai.NewClient(
option.WithAPIKey(os.Getenv("OPENAI_API_KEY")),
)
return &OpenAIClient{
client: c,
model: "gpt-4o-mini", // 默认用便宜的
}
}
func (c *OpenAIClient) Chat(ctx context.Context, prompt string) (string, int, int, error) {
resp, err := c.client.Chat.Completions.New(ctx, openai.ChatCompletionNewParams{
Model: openai.F(openai.ChatModelGPT4oMini),
Messages: openai.F([]openai.ChatCompletionMessageParamUnion{
openai.UserMessage(prompt),
}),
})
if err != nil {
return "", 0, 0, err
}
text := resp.Choices[0].Message.Content
inputTokens := int(resp.Usage.PromptTokens)
outputTokens := int(resp.Usage.CompletionTokens)
return text, inputTokens, outputTokens, nil
}并发请求:批量处理的正确姿势
医疗报告的场景里,每天有几百张影像需要处理,最直接的想法是起 goroutine 并发打 API。但 OpenAI 有速率限制,不加控制的并发会触发 429。
package client
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/time/rate"
)
// BatchTask 批处理任务
type BatchTask struct {
ID string
Prompt string
}
// BatchResult 批处理结果
type BatchResult struct {
ID string
Response string
InputTokens int
OutputTokens int
Err error
Latency time.Duration
}
// BatchProcessor 带速率限制的批处理器
type BatchProcessor struct {
client *OpenAIClient
limiter *rate.Limiter
workers int
}
// NewBatchProcessor 创建批处理器
// rpm: requests per minute 限制
// workers: 并发 goroutine 数量
func NewBatchProcessor(client *OpenAIClient, rpmLimit, workers int) *BatchProcessor {
// 把 RPM 转换为每秒的速率
rps := rate.Limit(float64(rpmLimit) / 60.0)
return &BatchProcessor{
client: client,
limiter: rate.NewLimiter(rps, workers), // burst 设为 workers
workers: workers,
}
}
// Process 并发处理批量任务
func (bp *BatchProcessor) Process(ctx context.Context, tasks []BatchTask) []BatchResult {
results := make([]BatchResult, len(tasks))
taskCh := make(chan struct {
idx int
task BatchTask
}, len(tasks))
// 填充任务队列
for i, task := range tasks {
taskCh <- struct {
idx int
task BatchTask
}{i, task}
}
close(taskCh)
var wg sync.WaitGroup
for i := 0; i < bp.workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for item := range taskCh {
// 等待速率限制令牌
if err := bp.limiter.Wait(ctx); err != nil {
results[item.idx] = BatchResult{
ID: item.task.ID,
Err: fmt.Errorf("rate limiter: %w", err),
}
continue
}
start := time.Now()
resp, inTokens, outTokens, err := bp.client.Chat(ctx, item.task.Prompt)
results[item.idx] = BatchResult{
ID: item.task.ID,
Response: resp,
InputTokens: inTokens,
OutputTokens: outTokens,
Err: err,
Latency: time.Since(start),
}
}
}()
}
wg.Wait()
return results
}踩坑 1:并发数设太大,反而更慢
现象:把 workers 从 5 增加到 50,吞吐量反而下降了,平均延迟上升了 3 倍。
原因:OpenAI API 的速率限制不只是 RPM,还有 TPM(tokens per minute)。workers 多了,每个请求在等 limiter 的时间更长,加上连接竞争,整体反而更差。
解法:根据 tier 确定合理的 workers 数量。大多数 Tier 2 账户 RPM 上限是 3500,workers 设 10-20 就够了,更多不会有提升。
重试机制:指数退避 + 抖动
直接用 cenkalti/backoff 库是最省事的方案:
package client
import (
"context"
"errors"
"fmt"
"math/rand"
"time"
"github.com/openai/openai-go"
)
// RetryableChat 带重试的对话接口
func (c *OpenAIClient) RetryableChat(ctx context.Context, prompt string) (string, int, int, error) {
maxRetries := 5
baseDelay := 1 * time.Second
maxDelay := 60 * time.Second
for attempt := 0; attempt < maxRetries; attempt++ {
resp, inTokens, outTokens, err := c.Chat(ctx, prompt)
if err == nil {
return resp, inTokens, outTokens, nil
}
// 判断是否可重试
var apiErr *openai.APIError
if !errors.As(err, &apiErr) {
return "", 0, 0, err // 非 API 错误,不重试
}
switch apiErr.StatusCode {
case 429, 500, 502, 503, 504:
// 可重试的错误
default:
return "", 0, 0, err // 400/401 这类不重试
}
if attempt == maxRetries-1 {
return "", 0, 0, fmt.Errorf("max retries exceeded: %w", err)
}
// 指数退避 + 随机抖动,避免惊群
delay := min(baseDelay*time.Duration(1<<attempt), maxDelay)
jitter := time.Duration(rand.Int63n(int64(delay / 2)))
sleepTime := delay + jitter
select {
case <-ctx.Done():
return "", 0, 0, ctx.Err()
case <-time.After(sleepTime):
}
}
return "", 0, 0, fmt.Errorf("unreachable")
}
func min(a, b time.Duration) time.Duration {
if a < b {
return a
}
return b
}踩坑 2:重试时没有考虑 context 取消
现象:用户取消了请求(关闭了浏览器),后端 goroutine 还在傻傻重试,白白消耗 API 配额。
原因:在 time.Sleep 时没有监听 context 取消信号。
解法:用 select + time.After 替代 time.Sleep,如上面代码所示。这样 context 被取消时立刻退出。
成本统计与控制
这个功能在商业项目里几乎是必须的,老板要知道 AI 每个月花了多少钱。
package billing
import (
"fmt"
"sync/atomic"
)
// 价格表(美元/1M tokens,2024年底价格)
var modelPricing = map[string][2]float64{
"gpt-4o": {2.50, 10.00},
"gpt-4o-mini": {0.15, 0.60},
"gpt-4-turbo": {10.00, 30.00},
"text-embedding-3-small": {0.02, 0},
}
// CostTracker 成本追踪器(线程安全)
type CostTracker struct {
totalInputTokens atomic.Int64
totalOutputTokens atomic.Int64
model string
}
func NewCostTracker(model string) *CostTracker {
return &CostTracker{model: model}
}
func (ct *CostTracker) Record(inputTokens, outputTokens int) {
ct.totalInputTokens.Add(int64(inputTokens))
ct.totalOutputTokens.Add(int64(outputTokens))
}
func (ct *CostTracker) TotalCostUSD() float64 {
pricing, ok := modelPricing[ct.model]
if !ok {
return -1
}
input := float64(ct.totalInputTokens.Load()) / 1_000_000 * pricing[0]
output := float64(ct.totalOutputTokens.Load()) / 1_000_000 * pricing[1]
return input + output
}
func (ct *CostTracker) Summary() string {
return fmt.Sprintf(
"model=%s input_tokens=%d output_tokens=%d cost=USD%.4f",
ct.model,
ct.totalInputTokens.Load(),
ct.totalOutputTokens.Load(),
ct.TotalCostUSD(),
)
}踩坑 3:Prompt 里夹带了太多无用内容
现象:系统上线一个月,API 账单比预估高了 40%。排查下来发现有个 prompt 模板在每次请求里都带了一份 3000 token 的静态知识库内容,绝大多数请求根本用不上。
原因:早期开发图方便,把所有背景信息都塞进 system prompt,没有按需加载。
解法:把静态知识库做成 RAG,只在 similarity search 命中时才注入相关片段。这个优化让平均 prompt 长度从 4500 tokens 降到 1800 tokens,成本直接降了 40%。
实测成本数据
医疗影像报告的实际数据(使用 gpt-4o-mini):
| 场景 | 平均输入 tokens | 平均输出 tokens | 每次成本 |
|---|---|---|---|
| 简单报告初筛 | 800 | 200 | $0.00024 |
| 完整报告生成 | 2500 | 800 | $0.00086 |
| 异常标注分析 | 1200 | 400 | $0.00042 |
按每天处理 1000 份报告(3:6:1 比例):
- 日成本约 $0.64
- 月成本约 $19.2
换 gpt-4o 大约贵 15 倍,除非报告质量确实不够,否则没必要。
生产环境配置建议
// 推荐的生产配置
processor := NewBatchProcessor(
client,
500, // rpmLimit: 比账户上限留 30% 余量
10, // workers: 不要超过 20
)监控指标:latency_p99、error_rate、token_per_request、daily_cost 这四个是最重要的。
