Langfuse 可观测性搭建——你的 RAG 在偷偷出错你知道吗
Langfuse 可观测性搭建——你的 RAG 在偷偷出错你知道吗
适读人群:AI 应用已上线但不知道哪里出问题的开发者 | 阅读时长:约 15 分钟 | 核心价值:完整的 Langfuse 集成方案,让你的 RAG 不再是黑盒
有一次我给客户做月度汇报,他们的 AI 客服系统已经上线两个月了。我打开自己做的监控面板,展示了各种指标:请求量、响应时间、错误率……
然后客户突然问:"我们有一类关于退款政策的问题,用户说答得不准,你能看看是什么原因吗?"
我愣了一下。我的监控面板有延迟、有错误率,但根本没有"哪类问题回答质量差"这个维度。
我不知道。
系统跑了两个月,一直有某类问题在悄悄给出错误答案,用户投诉了,客户才告诉我,我才意识到问题。
那之后我认真接入了 Langfuse,第一周就发现了两个从来没注意到的问题。今天把完整方案写出来。
普通监控 vs AI 可观测性
普通的服务监控关注的是:响应时间、错误率、QPS、CPU/内存。
这些对 AI 应用来说远远不够。因为 AI 的"失败"大多数情况下不会抛出异常,不会返回 HTTP 500,只是静静地给出一个看起来正常、实际上不对的回答。
AI 应用需要额外关注:
- 检索到的文档是否真的相关?
- LLM 有没有产生幻觉?
- 哪类问题的回答质量差?
- Token 消耗在哪里最集中?
- 某次具体的对话,每一步发生了什么?
Langfuse 就是为这些场景设计的。
Langfuse 的核心概念
在接代码之前,先把几个概念说清楚:
Trace(追踪):一次完整的用户交互,比如用户发一条消息、系统回复一条消息,就是一个 trace。
Span(跨度):trace 里的一个处理步骤,比如"向量检索"、"Re-ranking"、"LLM 生成"各是一个 span。
Generation(生成):特指 LLM 调用的 span,会自动追踪 prompt、response、token 数量。
Score(评分):对一个 trace 或 generation 的质量评分,可以是人工打的,也可以是自动化评估程序打的。
一个 RAG 请求的追踪树大概是这样的:
Trace: "用户询问退款政策"
├─ Span: "向量检索" (耗时: 23ms, 检索到5个文档)
├─ Span: "BM25检索" (耗时: 8ms)
├─ Span: "Re-ranking" (耗时: 180ms)
└─ Generation: "LLM生成" (耗时: 1.2s, prompt_tokens: 2341, completion_tokens: 187)部署 Langfuse
自部署用 Docker Compose,生产稳定版:
# docker-compose.yml
version: '3.8'
services:
langfuse-server:
image: langfuse/langfuse:2
restart: unless-stopped
ports:
- "3000:3000"
depends_on:
- langfuse-db
environment:
DATABASE_URL: postgresql://langfuse:langfuse_password@langfuse-db:5432/langfuse
NEXTAUTH_SECRET: ${LANGFUSE_SECRET} # 随机字符串
NEXTAUTH_URL: http://your-domain.com:3000
SALT: ${LANGFUSE_SALT} # 随机字符串
# 可选:限制注册,只允许特定邮箱
# AUTH_DISABLE_SIGNUP: true
langfuse-db:
image: postgres:15
restart: unless-stopped
environment:
POSTGRES_DB: langfuse
POSTGRES_USER: langfuse
POSTGRES_PASSWORD: langfuse_password
volumes:
- langfuse_db_data:/var/lib/postgresql/data
volumes:
langfuse_db_data:Python 集成
基础集成:手动 SDK
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
import os
from openai import OpenAI
from typing import List
# 初始化
langfuse = Langfuse(
public_key=os.environ["LANGFUSE_PUBLIC_KEY"],
secret_key=os.environ["LANGFUSE_SECRET_KEY"],
host=os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")
)
openai_client = OpenAI()
class ObservableRAGPipeline:
def __init__(self, vector_store, bm25_retriever, reranker):
self.vector_store = vector_store
self.bm25_retriever = bm25_retriever
self.reranker = reranker
def run(self, query: str, session_id: str = None, user_id: str = None) -> dict:
# 创建 Trace
trace = langfuse.trace(
name="rag-pipeline",
input={"query": query},
session_id=session_id,
user_id=user_id,
metadata={"pipeline_version": "2.1"}
)
try:
# Step 1: 向量检索
with trace.span(
name="vector-retrieval",
input={"query": query}
) as vector_span:
vector_results = self.vector_store.search(query, top_k=20)
vector_span.end(output={
"retrieved_count": len(vector_results),
"top_score": vector_results[0]["score"] if vector_results else 0
})
# Step 2: BM25 检索
with trace.span(
name="bm25-retrieval",
input={"query": query}
) as bm25_span:
bm25_results = self.bm25_retriever.search(query, top_k=20)
bm25_span.end(output={
"retrieved_count": len(bm25_results)
})
# 合并结果
merged = self._rrf_merge(vector_results, bm25_results)
candidates = merged[:20]
# Step 3: Re-ranking
with trace.span(
name="reranking",
input={
"query": query,
"candidate_count": len(candidates)
}
) as rerank_span:
reranked = self.reranker.rerank(
query,
[c["doc"] for c in candidates],
top_n=5
)
rerank_span.end(output={
"top_doc_score": reranked[0]["rerank_score"] if reranked else 0,
"top_doc_preview": reranked[0]["content"][:200] if reranked else ""
})
# 构建 prompt
context = "\n\n---\n\n".join([r["content"] for r in reranked])
system_prompt = """你是一个专业的客服助手。请根据以下提供的参考文档回答用户问题。
如果参考文档中没有相关信息,请明确告知用户,不要编造答案。"""
user_prompt = f"""参考文档:
{context}
用户问题:{query}"""
# Step 4: LLM 生成(用 Generation 类型,会自动追踪 token)
generation = trace.generation(
name="llm-generation",
model="gpt-4o",
input=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
metadata={
"context_doc_count": len(reranked),
"top_rerank_score": reranked[0]["rerank_score"] if reranked else 0
}
)
response = openai_client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.1
)
answer = response.choices[0].message.content
# 记录生成结果和 token 使用
generation.end(
output=answer,
usage={
"input": response.usage.prompt_tokens,
"output": response.usage.completion_tokens
}
)
# 记录最终结果到 Trace
trace.update(
output={
"answer": answer,
"retrieved_docs": [r["content"][:100] for r in reranked]
}
)
return {
"answer": answer,
"retrieved_docs": reranked,
"trace_id": trace.id # 返回 trace_id,方便后续追溯
}
except Exception as e:
trace.update(
output={"error": str(e)},
level="ERROR"
)
raise
finally:
langfuse.flush()
def _rrf_merge(self, list1, list2, k=60):
scores = {}
docs = {}
for rank, item in enumerate(list1):
doc_id = item["doc"]["id"]
scores[doc_id] = scores.get(doc_id, 0) + 1.0 / (k + rank + 1)
docs[doc_id] = item["doc"]
for rank, item in enumerate(list2):
doc_id = item["doc"]["id"]
scores[doc_id] = scores.get(doc_id, 0) + 1.0 / (k + rank + 1)
docs[doc_id] = item["doc"]
sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
return [{"doc": docs[i], "rrf_score": scores[i]} for i in sorted_ids]自动化质量评分
光记录 trace 还不够,要加自动评分,才能做批量分析。
from langfuse import Langfuse
from openai import OpenAI
langfuse = Langfuse(...)
openai_client = OpenAI()
LLM_JUDGE_PROMPT = """你是一个RAG系统的质量评估专家。
请评估以下RAG系统的回答质量:
用户问题:
{question}
系统使用的参考文档:
{context}
系统给出的回答:
{answer}
请从以下3个维度评分(每项0-1分,0.1为最小粒度):
1. 忠实度(faithfulness):回答是否完全基于参考文档,有无幻觉内容
2. 答案相关性(answer_relevance):回答是否直接回答了用户问题
3. 上下文相关性(context_relevance):检索到的文档是否真的和问题相关
以JSON格式返回:
{{"faithfulness": 0.0-1.0, "answer_relevance": 0.0-1.0, "context_relevance": 0.0-1.0, "reasoning": "简短说明"}}
"""
def evaluate_and_score_trace(trace_id: str, question: str, context: str, answer: str):
"""对一个 trace 做自动质量评分"""
response = openai_client.chat.completions.create(
model="gpt-4o-mini", # 用便宜的模型做评估
messages=[
{"role": "user", "content": LLM_JUDGE_PROMPT.format(
question=question,
context=context[:3000], # 避免 context 太长
answer=answer
)}
],
response_format={"type": "json_object"},
temperature=0
)
scores = json.loads(response.choices[0].message.content)
# 把评分写回 Langfuse
for metric_name in ["faithfulness", "answer_relevance", "context_relevance"]:
if metric_name in scores:
langfuse.score(
trace_id=trace_id,
name=metric_name,
value=scores[metric_name],
comment=scores.get("reasoning", "")
)
return scores我发现的那个隐藏问题
接入 Langfuse 一周后,我打开 Dashboard 看各类问题的指标分布,发现了一个奇怪的现象:
关于"退款政策"相关问题,context_relevance 分数平均只有 0.41,远低于其他类别的 0.78。
我调出这批 trace,逐条看检索结果,发现了根本原因:
用户问的是"退款政策",但知识库里相关文档的标题是"售后服务规范",里面有一章叫"费用退还流程"。
向量检索匹配"退款政策",但向量语义离"售后服务规范"有点远,这类文档经常排在 Top-5 之外。
等我发现这个问题,系统已经跑了两个月了。这两个月里,所有问退款的用户,都在得到质量很差的回答。
解决方案很简单:给这类文档加了同义词标签,优化了分块策略。但没有 Langfuse,我根本不知道这个问题存在。
最值得监控的指标
基于我的经验,这几个指标最有价值:
- context_relevance 按问题类别分组:找出哪类问题检索效果差
- faithfulness 的时间趋势:某次知识库更新后有没有引入幻觉
- P99 延迟按步骤分解:慢在检索还是慢在 LLM
- token 消耗分布:哪类问题的 context 被塞得太满
- 低分 trace 的人工复查:每天随机抽 10 条低分记录人工看
不需要同时监控所有东西,先把这 5 个跑起来,你对系统质量的认知会发生本质变化。
