Langchain-Chatchat 结合 Celery 实现异步任务处理
在企业级 AI 应用日益普及的今天,一个现实问题反复浮现:如何在保障数据隐私的前提下,依然提供流畅、高效的智能服务?尤其是在金融、医疗和法律等行业,敏感信息不容外泄,但用户又期待即时响应。这就像要求一辆装甲车既要坚不可摧,又要跑出赛车的速度——看似矛盾,却正是现代本地化 AI 系统必须面对的挑战。
Langchain-Chatchat 正是为解决这一难题而生的开源方案。它允许企业将私有文档作为知识源,在本地完成从解析到问答的全流程处理,真正实现“数据不出内网”。然而,理想很丰满,现实却常因性能瓶颈而打折:上传一份百页 PDF,前端卡住三分钟无响应;批量导入合同文件时,服务器负载飙升甚至崩溃……这些体验上的“断点”,往往成为技术落地的最后一道坎。
于是,我们把目光投向了Celery——这个在 Python 世界中久经考验的分布式任务队列。它的价值不在于炫技,而在于务实:把耗时的操作悄悄挪到后台,让主服务轻装上阵。当用户点击“上传”后立刻得到反馈,而不是盯着转圈的加载动画,那种顺畅感本身就是一种信任的建立。
要理解这套组合拳为何有效,先得看清 Langchain-Chatchat 的工作全貌。它本质上是一个基于 RAG(检索增强生成)架构的知识库系统,流程清晰且模块化:
- 文档加载:支持 TXT、PDF、Word 等多种格式,通过
Unstructured或PyPDF2提取原始文本。 - 文本切片:使用滑动窗口对长文分块,兼顾语义完整与上下文连贯。
- 向量化编码:调用本地部署的 Embedding 模型(如 BGE、m3e),将文本转为高维向量。
- 向量存储:写入 FAISS 或 Chroma 这类轻量级向量数据库,构建可快速检索的索引。
- 语义问答:用户提问时,问题也被向量化,系统找出最相关的知识片段,拼接后送入 LLM(如 ChatGLM、Qwen)生成回答。
整个过程全程本地运行,无需联网调用任何外部 API,彻底规避了数据泄露风险。这一点对于合规要求严格的场景至关重要。
但正因其“全链路本地化”的特性,计算压力也随之而来。尤其是文档解析和向量构建阶段,既吃 CPU 又耗内存,若采用同步处理模式,Web 服务线程会被长时间阻塞。想象一下,多个用户同时上传大文件,请求堆积如山,最终导致超时或崩溃——这不是功能缺陷,而是架构设计上的硬伤。
这时候,引入 Celery 就不是锦上添花,而是雪中送炭。
Celery 的核心思想很简单:解耦。它将 Web 服务与计算任务分离,形成“生产者-消息代理-消费者”的经典模型。具体来说:
- Producer:FastAPI 或 Flask 接收到文件上传请求后,不做实际处理,只负责把任务推送到消息队列;
- Broker:Redis 或 RabbitMQ 扮演中间人的角色,暂存任务消息,确保不丢失;
- Worker:独立运行的 Celery 进程监听队列,一旦发现新任务就立即拉取并执行。
这种结构带来的好处是立竿见影的。从前端角度看,无论后台任务多复杂,接口都能在毫秒级返回结果,比如:“任务已提交,ID 为 abc123,请稍后查看状态。” 用户不再需要干等,系统吞吐量也大幅提升。
更重要的是,这套机制天然具备容错与扩展能力。任务失败可以自动重试,日志可追踪,Worker 节点还能横向扩容。哪怕某个节点宕机,其他 Worker 仍能继续消费队列中的任务,整体系统的鲁棒性远超传统的多线程方案。
来看一段关键代码实现:
# celery_app.py from celery import Celery import os CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0") CELERY_RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/1") celery_app = Celery( "chatchat_tasks", broker=CELERY_BROKER_URL, backend=CELER_RESULT_BACKEND ) celery_app.conf.update( accept_content=['json'], task_serializer='json', result_serializer='json', timezone='Asia/Shanghai' )这段配置定义了一个基于 Redis 的 Celery 实例,指定了消息代理和结果后端。选择 Redis 是因为它轻量、易部署,特别适合开发和中小规模生产环境。当然,如果对可靠性要求更高,也可以切换到 RabbitMQ 或 Redis Cluster。
接下来是真正的“干活”逻辑:
# tasks.py from celery_app import celery_app from langchain.document_loaders import UnstructuredFileLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from vector_store import load_to_vectorstore @celery_app.task(bind=True, max_retries=3) def process_document_task(self, file_path: str, collection_name: str): try: loader = UnstructuredFileLoader(file_path) documents = loader.load() splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50) chunks = splitter.split_documents(documents) load_to_vectorstore(chunks, collection_name) return { "status": "success", "file": file_path, "chunks_count": len(chunks) } except Exception as exc: raise self.retry(exc=exc, countdown=60)这个任务函数封装了完整的文档处理流程,并加入了重试机制。一旦发生异常(比如临时文件读取失败),Celery 会自动在 60 秒后重试,最多尝试三次。这种“自我修复”能力大大降低了人工干预的需求。
再看前端如何触发任务:
# api.py from fastapi import FastAPI, UploadFile from tasks import process_document_task import uuid import os app = FastAPI() UPLOAD_DIR = "./uploads" os.makedirs(UPLOAD_DIR, exist_ok=True) @app.post("/upload/") async def upload_file(file: UploadFile, collection: str = "default_kb"): file_path = os.path.join(UPLOAD_DIR, f"{uuid.uuid4()}_{file.filename}") with open(file_path, "wb") as f: f.write(await file.read()) task = process_document_task.delay(file_path, collection) return {"task_id": task.id, "status": "processing", "message": "文件已接收,正在后台处理"} @app.get("/task_status/{task_id}") def get_task_status(task_id: str): task = process_document_task.AsyncResult(task_id) if task.state == 'PENDING': response = {'state': task.state, 'status': '等待执行'} elif task.state == 'SUCCESS': response = {'state': task.state, 'result': task.result, 'status': '处理完成'} else: response = {'state': task.state, 'status': str(task.info)} return response这里有两个关键接口:/upload/接收文件并异步提交任务,立即返回任务 ID;/task_status/{task_id}则供前端轮询查询进度。典型的使用方式是前端每两秒请求一次状态,直到收到“处理完成”的信号,然后提示用户知识库已更新。
整个系统架构也因此变得更加清晰和稳健:
graph TD A[Web Frontend] --> B[FastAPI Server] B --> C[Redis Broker] C --> D[Celery Worker] D --> E[Vector DB FAISS] D --> F[LLM 如 ChatGLM] E --> G[问答查询] F --> G所有组件各司其职:Web 层专注交互,Worker 层承担重算,Redis 缓冲流量洪峰,向量库与 LLM 分别负责知识存储与语义生成。这种职责分离不仅提升了性能,也为未来的扩展打下基础——比如你可以单独升级 Worker 的 GPU 配置,而不影响其他部分。
在实际部署中,还有一些工程细节值得留意:
- 任务粒度:不要把整个知识库构建作为一个大任务,而应按文件拆分成多个子任务。这样即使某一个文件解析失败,也不会影响整体进度,还能实现更细粒度的状态反馈。
- 临时文件清理:任务完成后记得删除上传的原始文件,避免磁盘被占满。可以在任务成功回调中加入
os.remove(file_path)。 - 幂等性控制:防止同一文件被重复处理,可在任务开始前计算文件哈希值,并在 Redis 中记录已处理列表。
- 资源隔离:建议将 Web Server 和 Celery Worker 部署在不同容器或主机上,避免 CPU 和内存争抢。
- 监控告警:集成 Flower 可视化工具实时查看队列状态,结合 Prometheus + Grafana 做指标采集,关键错误可通过钉钉或企业微信机器人通知运维人员。
这套架构的价值远不止于提升响应速度。它实际上为企业搭建了一套可控、可审计、可持续演进的本地智能中枢。无论是法务部门的合同检索、客服系统的知识辅助,还是内部培训资料的智能问答,都可以基于此框架快速落地。
更深远的意义在于,它代表了一种趋势:AI 应用正从“云端霸权”走向“本地主权”。越来越多的企业意识到,真正的智能化不应以牺牲安全为代价。而像 Celery 这样的异步任务框架,则成为了连接高性能与高安全之间的桥梁。
未来,随着轻量化模型(如 Qwen-Max、Phi-3)和高效向量引擎的发展,本地 AI 系统的门槛将进一步降低。但无论技术如何迭代,合理的架构设计始终是工程落地的核心。Langchain-Chatchat 与 Celery 的结合,正是这样一个值得借鉴的范例——它不追求极致的技术堆砌,而是用成熟、稳定、可维护的方式,解决了真实世界中的关键痛点。
创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考