Langchain-Chatchat 结合 OpenTelemetry 实现统一观测
在企业级 AI 应用日益复杂的今天,一个智能问答系统不仅要“答得准”,更要“看得清”。尤其是在金融、医疗、法律等对数据隐私和合规性要求极高的领域,将知识库部署于本地内网已成为标配。然而,当整个 RAG(检索增强生成)流程从文档解析、文本分块、向量检索到大模型推理全部在本地运行时,一旦出现响应缓慢或答案失真,开发者往往面临“黑盒”困境——不知道问题出在哪个环节。
这正是Langchain-Chatchat与OpenTelemetry联手解决的核心挑战。前者提供了一套完整的本地化智能问答能力,后者则赋予系统“透明运行”的可观测基因。两者的融合,不是简单的功能叠加,而是一次 AI 工程化实践的跃迁:让原本难以追踪的 AI 推理链路变得可测量、可分析、可优化。
为什么需要为 Langchain-Chatchat 构建可观测体系?
Langchain-Chatchat 本质上是一个高度模块化的 RAG 系统。它支持用户上传 PDF、Word 等私有文档,经过本地处理后构建专属知识库,并结合 LLM 实现精准问答。整个流程涉及多个关键阶段:
- 文档加载与解析
- 文本切片与清洗
- 向量化编码与索引存储
- 用户查询语义匹配
- 上下文拼接与大模型生成
每个环节都可能成为性能瓶颈或质量短板。比如:
- 某个 PDF 解析耗时异常?
- 分块策略导致语义断裂?
- 嵌入模型不匹配中文场景?
- 向量数据库查询延迟突增?
- LLM 生成陷入无限循环?
如果没有统一的观测手段,排查这些问题只能依赖零散的日志打印和手动计时,效率低下且容易遗漏上下文关联。更严重的是,在多用户并发访问场景下,单一请求的问题可能会被淹没在日志洪流中。
因此,引入标准化的遥测机制不再是“锦上添花”,而是保障系统稳定性和可维护性的必要条件。而 OpenTelemetry 正是目前最成熟、最开放的选择。
Langchain-Chatchat 是如何工作的?
Langchain-Chatchat 并非凭空而来,它是基于 LangChain 框架对 RAG 架构的一次工程化封装。其核心思想是:将私有知识转化为向量形式,通过语义搜索召回相关内容,再交由大语言模型进行自然语言回答生成。
这套流程的最大优势在于“数据不出内网”。所有文档解析、向量计算、模型推理均可在本地完成,彻底规避了云端 API 的数据泄露风险。同时,由于答案来源于真实文档片段,也有效缓解了 LLM “幻觉”问题。
系统的关键组件包括:
- 文档加载器:支持 PyPDF2、python-docx 等库读取多种格式文件。
- 文本分块器:使用
RecursiveCharacterTextSplitter按字符滑动窗口切分,保留语义连续性。 - 嵌入模型:可选本地部署的 BGE、Sentence-BERT 等模型,实现文本到向量的转换。
- 向量数据库:FAISS、Chroma 或 Milvus 存储向量并支持快速相似度检索。
- LLM 引擎:接入 ChatGLM、Qwen、Llama3 等本地模型,完成最终生成任务。
这种高度解耦的设计使得系统极具灵活性——你可以自由替换任一组件以适应不同硬件资源或业务需求。但也正因为模块众多,调用链路变长,带来了可观测性的新挑战。
下面是一段典型的实现代码,展示了从文档加载到问答输出的完整流程:
from langchain_community.document_loaders import PyPDFLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_community.vectorstores import FAISS from langchain.chains import RetrievalQA from langchain_community.llms import HuggingFaceHub # 1. 加载 PDF 文档 loader = PyPDFLoader("knowledge.pdf") pages = loader.load_and_split() # 2. 文本分块 text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50) docs = text_splitter.split_documents(pages) # 3. 初始化嵌入模型(本地) embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-small-zh") # 4. 构建向量数据库 db = FAISS.from_documents(docs, embeddings) # 5. 创建检索器 retriever = db.as_retriever(search_kwargs={"k": 3}) # 6. 初始化本地 LLM(示例使用 HuggingFace Hub 接口) llm = HuggingFaceHub( repo_id="THUDM/chatglm3-6b", model_kwargs={"temperature": 0.7, "max_length": 512}, huggingfacehub_api_token="your_token" ) # 7. 构建 QA 链 qa_chain = RetrievalQA.from_chain_type( llm=llm, chain_type="stuff", retriever=retriever, return_source_documents=True ) # 8. 执行查询 query = "公司年假政策是如何规定的?" result = qa_chain.invoke({"query": query}) print(result["result"])这段代码结构清晰、模块分明,非常适合做插桩观测——每一个函数调用都可以成为一个独立的追踪节点(Span),从而形成完整的调用树。
OpenTelemetry:给 AI 系统装上“仪表盘”
如果说 Langchain-Chatchat 提供了“动力系统”,那么 OpenTelemetry 就是它的“仪表盘”。它由 CNCF 主导,旨在为现代分布式系统提供统一的遥测标准,涵盖三大核心信号:
- Traces(追踪):记录一次请求在整个系统中的流转路径,识别延迟热点。
- Metrics(指标):采集系统级别的统计数据,如 QPS、响应时间 P95、资源占用等。
- Logs(日志):结构化记录事件详情,辅助调试与审计。
三者协同工作,构成了现代可观测性的“黄金三角”。
OpenTelemetry 的工作模式可以概括为四个步骤:
- 插桩(Instrumentation):在应用中植入探针,捕获关键操作。
- 采集(Collection):SDK 收集 trace、metric 和 log 数据。
- 传输(Export):通过 OTLP 协议发送至 OpenTelemetry Collector。
- 展示(Visualization):Collector 将数据转发至 Jaeger、Prometheus、Grafana 等后端平台。
它的最大优势在于标准化。无论你用 Python、Java 还是 Go 开发,无论底层是 Flask、FastAPI 还是 gRPC,只要遵循 OpenTelemetry 规范,就能实现跨服务、跨语言的数据聚合。
来看一段实际集成代码:
from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.instrumentation.requests import RequestsInstrumentor from opentelemetry.sdk.resources import Resource # 设置资源属性 resource = Resource(attributes={ "service.name": "langchain-chatchat", "environment": "production", "telemetry.sdk.language": "python" }) # 初始化 Tracer Provider trace.set_tracer_provider(TracerProvider(resource=resource)) tracer = trace.get_tracer(__name__) # 配置 OTLP Exporter(发送至 Collector) otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True) span_processor = BatchSpanProcessor(otlp_exporter) trace.get_tracer_provider().add_span_processor(span_processor) # 自动插桩:监控 HTTP 请求(如调用 HuggingFace API) RequestsInstrumentor().instrument() # 示例:手动创建 span 记录问答流程 with tracer.start_as_current_span("document_loading") as span: loader = PyPDFLoader("policy.pdf") pages = loader.load() span.set_attribute("document.pages.count", len(pages)) span.add_event("Document loaded successfully") with tracer.start_as_current_span("question_answering") as span: result = qa_chain.invoke({"query": "年假如何申请?"}) span.set_attribute("llm.response.length", len(result["result"])) span.set_attribute("retrieval.top_k", 3)这里做了几件事:
- 初始化全局 Tracer,绑定服务名;
- 配置 OTLP 导出器,将数据发往本地 Collector;
- 使用RequestsInstrumentor自动追踪所有外部 HTTP 调用(比如调用远程 LLM API);
- 在关键业务逻辑中手动创建 Span,记录自定义属性和事件。
这些 Span 最终会在 Jaeger 中呈现为一棵完整的调用树,清晰展示每一步耗时和上下文信息。
典型架构与观测链路设计
在一个生产级部署中,Langchain-Chatchat 通常与 OpenTelemetry 构成如下架构:
graph TD A[用户界面] <--> B[FastAPI / Gradio Server] B --> C[Langchain-Chatchat Core] C --> D[OpenTelemetry SDK (Python)] D --> E[OpenTelemetry Collector] E --> F[Jaeger: 分布式追踪] E --> G[Prometheus: 指标监控] E --> H[Loki: 日志查询] E --> I[Grafana: 统一可视化]在这个体系中:
- 用户通过 Web 界面发起提问;
- FastAPI 接收到请求后触发 Langchain 流程;
- OpenTelemetry SDK 在后台自动或手动采集遥测数据;
- 所有数据经由 OTLP 协议汇总到 Collector;
- Collector 负责路由、批处理、过滤后分发至各后端系统;
- 最终通过 Grafana 实现 traces、metrics、logs 的联动分析。
当一次问答请求被执行时,完整的观测链路如下:
- 入口层:FastAPI 收到 HTTP 请求,OpenTelemetry 自动生成 root span,记录 method、path、client IP。
- 文档加载:手动创建
document_loadingspan,记录文件类型、页数、加载耗时。 - 文本分块:创建
text_chunkingspan,统计 chunk 数量、平均长度。 - 向量化:若调用远程 embedding API,会被自动追踪;本地模型可通过手动埋点记录推理时间。
- 向量检索:创建
vector_searchspan,记录 top-k、相似度阈值、查询延迟。 - LLM 推理:自动捕获
/generate类型的请求,记录 prompt_tokens、completion_tokens、生成耗时。 - 结果返回:关闭 root span,整体响应时间自动计算。
所有 Span 形成父子关系,构成一棵完整的调用树。你可以在 Jaeger 中点击任意一条 trace,逐层展开查看每个环节的具体表现。
真实问题排查案例
场景一:问答响应慢,用户抱怨超时
某天运营反馈部分用户提问响应超过 10 秒。我们立即登录 Jaeger 查找对应 trace,发现调用树中vector_search占据了 8 秒以上,其余环节均正常。
进一步查看该 Span 的 Attributes:
-search_kwargs.k: 10
-vector_db.size: 52,341
-embedding_model:BAAI/bge-small-zh
初步判断是检索规模过大导致性能下降。虽然 FAISS 在小规模数据下表现优异,但当向量数量突破 5 万后,查询延迟显著上升。
解决方案:
- 优化分块策略,减少冗余 chunk(例如合并短段落);
- 引入两级检索:先用 BM25 做关键词粗筛,再进入向量空间精排;
- 对于大规模知识库,考虑升级为 Milvus 或 Weaviate 提供更高性能支持。
更重要的是,这次问题暴露了缺乏指标监控的风险。后续我们在 Prometheus 中增加了vector_search_duration_seconds指标,并设置 P95 > 2s 时触发告警。
场景二:答案频繁返回“我不知道”
另一个常见问题是答案质量差。用户提问“报销流程是什么”,系统却回复“抱歉,我无法回答这个问题”。
我们检查 trace 发现,retrieval阶段返回的 top-3 documents 内容与问题完全无关。进一步追溯发现,当前使用的 embedding model 名称为all-MiniLM-L6-v2—— 这是一个英文模型,误用于中文文档导致语义错位。
根本原因:模型不匹配导致向量空间扭曲,即使原文存在相关信息也无法正确召回。
解决方案:
- 立即切换为中文专用嵌入模型,如BAAI/bge-small-zh或text2vec-large-chinese;
- 在 Span 中添加embedding.model.name属性,便于后续审计;
- 建立上线前模型校验清单,防止配置错误。
这个案例说明,可观测性不仅能发现问题,还能帮助建立预防机制。
工程实践建议
在真实部署中,以下几点值得特别注意:
合理设置采样率
生产环境不应全量采集 trace,否则会带来巨大网络和存储开销。建议采用动态采样策略,例如:
# OpenTelemetry Collector 配置片段 processors: probabilistic_sampler: sampling_percentage: 5 # 仅采样5%也可以根据请求特征进行智能采样,比如错误请求、高延迟请求强制采样。
敏感信息脱敏
AI 系统常涉及敏感内容,必须避免在 trace 中泄露完整 prompt 或 document。可通过以下方式防护:
- 使用正则表达式过滤 Span attributes 中的敏感字段;
- 启用 Collector 的
attributes/redaction处理器; - 不记录原始文本,只记录 hash 或摘要。
异步导出避免阻塞
务必使用BatchSpanProcessor异步发送数据,防止网络抖动影响主流程性能:
span_processor = BatchSpanProcessor( exporter=OTLPSpanExporter(), max_queue_size=2000, schedule_delay_millis=5000 )Collector 高可用部署
在 Kubernetes 环境中,推荐以 DaemonSet 模式运行 Collector,确保每台节点都有本地代理,降低网络跳数和延迟。
联合 Metrics 做告警
仅靠 trace 无法实现主动预警。应结合 Prometheus 抓取关键指标:
llm_generation_duration_secondsvector_search_duration_secondsdocument_load_duration_secondsrequest_countby status code
然后在 Grafana 中设置 P95 超过阈值时发送企业微信/钉钉告警。
结语
Langchain-Chatchat 与 OpenTelemetry 的结合,标志着 AI 应用从“能用”走向“可控”的重要一步。它不仅解决了本地化部署下的数据安全问题,更通过标准化观测手段打破了 AI 系统的“黑盒”状态。
未来的智能系统不会只是功能强大的工具,更应该是可见、可管、可信的工程产品。每一次问答的背后,都应该有一张清晰的调用图谱;每一次性能波动,都应该有数据支撑的归因分析。
而这,正是现代 AI 工程化的真正起点。
创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考