news 2026/1/31 7:05:04

《LangGraph》57个小项目带你从入门到入坟《四》(代码全开源,一键运行)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
《LangGraph》57个小项目带你从入门到入坟《四》(代码全开源,一键运行)

LangGraph 流式输出学习总结

> github链接:https://github.com/hzqjgthy/LangGraph_TL (求star)

本文档总结了 7 个脚本中关于 LangGraph 和 LangChain 流式输出的核心知识点。


📚 目录

  1. 基础知识
  2. 脚本概览
  3. 核心知识点详解
  4. 实战案例
  5. 最佳实践

1. 基础知识

1.1 环境配置

所有脚本都使用了以下基础配置:

fromlangchain.chat_modelsimportinit_chat_modelfromdotenvimportload_dotenvimportos load_dotenv(override=True)# 初始化模型llm=init_chat_model(model="deepseek-chat",model_provider="deepseek",api_key=os.getenv("DEEPSEEK_API_KEY"),base_url=os.getenv("DEEPSEEK_URL"),temperature=0,)

1.2 核心依赖库

  • langchain/langchain_core: LLM 框架
  • langgraph: 构建 Agent 工作流
  • sqlalchemy: 数据库 ORM 操作
  • requests: HTTP 请求(API 调用)
  • pydantic: 数据验证和模型定义

2. 脚本概览

脚本行数核心功能关键知识点
test_29.py45OpenWeather API 测试API 调用基础、JSON 解析
test_30.py242LangGraph Agent 基础create_react_agent、工具集成、非流式输出
test_31.py43LangChain 流式输出astream 方法、chunk 累加
test_32.py266LangGraph 同步流式输出values/updates 模式
test_33.py245LangGraph 异步流式输出异步 astream
test_34.py245messages 模式流式输出增量 token 处理
test_35.py254astream_events 事件流事件驱动流式输出

3. 核心知识点详解

3.1 OpenWeather API 集成 (test_29.py)

功能:查询城市天气信息

代码要点

defget_weather(loc):url="https://api.openweathermap.org/data/2.5/weather"params={"q":loc,# 城市名称(英文)"appid":"YOUR_API_KEY","units":"metric",# 摄氏度"lang":"zh_cn"# 简体中文}response=requests.get(url,params=params)returnjson.dumps(response.json())

注意事项

  • 中国城市需使用英文名称(如 Beijing, Shanghai)
  • API Key 需要从 OpenWeather 网站注册获取
  • 返回值为 JSON 字符串格式

3.2 LangGraph Agent 基础 (test_30.py)

3.2.1 工具定义

使用@tool装饰器定义工具,并使用 Pydantic 模型定义参数:

classWeatherLoc(BaseModel):location:str=Field(description="城市名称")@tool(args_schema=WeatherLoc)defget_weather(location):"""查询当前天气"""# 实现代码
3.2.2 数据库集成

使用 SQLAlchemy 定义数据模型:

fromsqlalchemyimportcreate_engine,Column,Integer,String,Floatfromsqlalchemy.ormimportsessionmaker,declarative_base Base=declarative_base()classWeather(Base):__tablename__='weather_11'city_id=Column(Integer,primary_key=True)city_name=Column(String(50))temperature=Column(Float)# ... 其他字段

数据库连接

DATABASE_URI='mysql+pymysql://user:password@host:3306/database?charset=utf8mb4'engine=create_engine(DATABASE_URI)Base.metadata.create_all(engine)Session=sessionmaker(bind=engine)
3.2.3 创建 Agent
fromlanggraph.prebuiltimportcreate_react_agent tools=[fetch_real_time_info,get_weather,insert_weather_to_db,query_weather_from_db]graph=create_react_agent(llm,tools=tools)
3.2.4 可视化
png_bytes=graph.get_graph(xray=True).draw_mermaid_png()withopen("graph_30.png","wb")asf:f.write(png_bytes)
3.2.5 非流式调用
response=graph.invoke({"messages":["北京今天的天气怎么样?"]})print(response["messages"][-1].content)

3.3 LangChain 流式输出基础 (test_31.py)

异步流式输出

asyncdefstream_function():chunks=[]asyncforchunkinllm.astream("你好,请你详细的介绍一下你自己。"):chunks.append(chunk)print(chunk.content,end="|",flush=True)# chunk 可以累加combined=chunks[0]+chunks[1]+chunks[2]print(combined)asyncio.run(stream_function())

关键点

  • 使用astream()方法进行异步流式输出
  • 每个 chunk 包含部分响应内容
  • chunk 对象支持+操作符进行累加
  • 需要asyncio.run()来执行异步函数

3.4 LangGraph 流式输出模式

LangGraph 提供了 5 种流式输出模式:

模式返回内容使用场景
values每个步骤后的完整状态需要完整上下文
updates每个节点的增量更新按节点处理
messages增量 token 流实时文本输出
debug详细调试信息调试程序
custom自定义流高级定制

3.5 同步流式输出 (test_32.py)

3.5.1 values 模式

特点:返回每个步骤后的完整状态

defprint_stream(stream):forsub_streaminstream:# sub_stream 是字典,包含 messages 字段message=sub_stream["messages"][-1]message.pretty_print()input_message={"messages":["你好,南京现在的天气怎么样?"]}print_stream(graph.stream(input_message,stream_mode="values"))

输出结构

{"messages":[HumanMessage(...),AIMessage(...),ToolMessage(...)]}
3.5.2 updates 模式

特点:返回每个节点的增量更新

defprint_stream_updates(stream):forsub_streaminstream:# sub_stream 结构: {节点名称: {messages: [消息]}}fornode_name,node_datainsub_stream.items():print(f"---{node_name.upper()}节点 ---")if"messages"innode_data:formessageinnode_data["messages"]:message.pretty_print()print_stream_updates(graph.stream(input_message,stream_mode="updates"))

输出结构

{"agent":{"messages":[AIMessage(...)]},"tools":{"messages":[ToolMessage(...)]}}

节点类型

  • agent: LLM 的决策或响应
  • tools: 工具执行结果

3.6 异步流式输出 (test_33.py)

3.6.1 异步 values 模式
asyncdefstream_function():asyncforchunkingraph.astream(input={"messages":["你好,成都的天气怎么样?"]},stream_mode="values"):message=chunk["messages"][-1].pretty_print()asyncio.run(stream_function())
3.6.2 异步 updates 模式
asyncdefstream_function_2():inputs={"messages":[("human","你好,乌鲁木齐的天气怎么样?")]}asyncforchunkingraph.astream(inputs,stream_mode="updates"):fornode,valuesinchunk.items():print(f"接收到的更新节点: '{node}'")message=values["messages"][0]message.pretty_print()asyncio.run(stream_function_2())

同步 vs 异步

  • 同步(stream): 阻塞式,适合简单脚本
  • 异步(astream): 非阻塞,适合高并发场景

3.7 messages 模式流式输出 (test_34.py)

特点:记录每个消息中的增量 token,实现逐字输出

asyncdefstream_function():asyncformsg,metadataingraph.astream({"messages":["你好,帮我查询一下数据库中北京的天气数据"]},stream_mode="messages"):# 只输出非 HumanMessage 的内容ifmsg.contentandnotisinstance(msg,HumanMessage):print(msg.content,end="|",flush=True)# 处理 AIMessageChunkifisinstance(msg,AIMessageChunk):iffirst:gathered=msg first=Falseelse:gathered=gathered+msg# 累加 chunk# 输出工具调用信息ifmsg.tool_call_chunks:print(gathered.tool_calls)asyncio.run(stream_function())

适用场景

  • 实时显示 LLM 生成的文本
  • 类似 ChatGPT 的打字机效果
  • 需要逐 token 处理的场景

3.8 astream_events 事件流 (test_35.py)

特点:事件驱动的流式输出,提供更细粒度的控制

3.8.1 基础用法
asyncdefstream_function():asyncforeventingraph.astream_events({"messages":["北京的天气怎么样"]},version="v2"):kind=event["event"]print(f"{kind}:{event['name']}----------------{event['data']}")asyncio.run(stream_function())
3.8.2 提取 AIMessageChunk
asyncdefstream_function():asyncforeventingraph.astream_events({"messages":["北京的天气怎么样"]},version="v2"):kind=event["event"]# 过滤聊天模型流事件ifkind=="on_chat_model_stream":chunk=event["data"]["chunk"]# 输出文本内容ifchunk.content:print(chunk.content,end="",flush=True)# 输出工具调用信息elifchunk.tool_calls:fortool_callinchunk.tool_calls:iftool_call.get('name'):print(f"\n[调用工具:{tool_call['name']}]\n")asyncio.run(stream_function())

事件类型

  • on_chat_model_stream: 聊天模型输出流
  • on_tool_start: 工具开始执行
  • on_tool_end: 工具执行结束
  • 其他更多事件类型…

优势

  • 精细控制每个事件的处理
  • 可以区分模型响应和工具调用
  • 适合复杂的 UI 交互场景

4. 实战案例

4.1 多城市天气查询与存储

需求:查询多个城市天气并存储到数据库

response=graph.invoke({"messages":["帮我查一下北京、上海、哈尔滨三个城市的天气,并存储到数据库"]})

Agent 执行流程

  1. 解析用户意图(需要查询 3 个城市)
  2. 并行调用get_weather工具 3 次
  3. 提取天气数据中的关键字段
  4. 调用insert_weather_to_db工具 3 次存储数据
  5. 返回执行结果

4.2 数据库天气对比分析

需求:从数据库读取天气数据并进行对比分析

response=graph.invoke({"messages":["帮我分析一下数据库中北京和哈尔滨城市天气的信息,做详细对比"]})

Agent 执行流程

  1. 调用query_weather_from_db查询北京天气
  2. 调用query_weather_from_db查询哈尔滨天气
  3. LLM 对比分析温度、天气状况等数据
  4. 生成详细的对比报告和出行建议

4.3 实时信息检索

需求:获取最新的互联网信息

response=graph.invoke({"messages":["你知道 Claude 3.5 发布的 computer use 吗?请用中文回复"]})

Agent 执行流程

  1. 识别需要实时信息
  2. 调用fetch_real_time_info工具搜索
  3. 解析搜索结果
  4. 生成中文回答

5. 最佳实践

5.1 流式输出选择指南

场景推荐模式理由
简单问答values完整上下文,易于调试
UI 实时显示messages逐字输出,用户体验好
调试工具调用updates清晰区分 agent 和 tools
复杂事件处理astream_events精细控制每个事件
性能要求高异步 (astream)非阻塞,高并发

5.2 工具设计原则

  1. 明确的文档字符串:描述工具功能、参数、返回值
  2. 使用 Pydantic 模型:定义清晰的参数结构
  3. 错误处理:捕获异常并返回友好的错误信息
  4. 返回标准格式:统一返回 JSON 字符串或字典

5.3 数据库操作最佳实践

@tool(args_schema=QueryWeatherSchema)defquery_weather_from_db(city_name:str):session=Session()try:# 执行查询weather_data=session.query(Weather).filter(Weather.city_name==city_name).first()ifweather_data:return{"city_name":weather_data.city_name,"temperature":weather_data.temperature,# ... 其他字段}else:return{"messages":[f"未找到城市 '{city_name}' 的天气信息。"]}exceptExceptionase:return{"messages":[f"查询失败,错误原因:{e}"]}finally:session.close()# 确保关闭会话

关键点

  • 使用try-except-finally确保资源释放
  • 每次操作创建新的 session
  • 使用merge()实现插入或更新
  • 发生错误时rollback()

5.4 流式输出性能优化

  1. 使用异步:对于 I/O 密集型操作
  2. 批量处理:减少网络往返次数
  3. 合理的 flushprint(..., end="", flush=True)
  4. 避免过度打印:只输出必要信息

5.5 消息类型总结

消息类型用途来源
HumanMessage用户输入用户
AIMessage模型响应LLM
ToolMessage工具执行结果工具
AIMessageChunk模型流式输出片段LLM (流式)

6. 常见问题

6.1 为什么使用英文城市名称?

OpenWeather API 只支持英文城市名称。对于中国城市:

  • 北京 → Beijing
  • 上海 → Shanghai
  • 哈尔滨 → Harbin

6.2 如何处理 API Key?

使用环境变量:

fromdotenvimportload_dotenv load_dotenv(override=True)api_key=os.getenv("OPENWEATHER_API_KEY")

.env文件示例:

OPENWEATHER_API_KEY=your_key_here DEEPSEEK_API_KEY=your_key_here DEEPSEEK_URL=https://api.deepseek.com

6.3 流式输出没有实时显示?

确保:

  1. 使用flush=True参数
  2. 不要使用print()的缓冲
  3. 在 Jupyter Notebook 中可能需要IPython.display

6.4 数据库连接失败?

检查:

  1. 数据库服务是否运行
  2. 连接字符串是否正确
  3. 用户权限是否足够
  4. 防火墙设置

7. 技术栈总结

7.1 核心框架

  • LangChain: LLM 应用开发框架
  • LangGraph: 构建多步骤 Agent 工作流
  • SQLAlchemy: Python SQL 工具包和 ORM

7.2 外部服务

  • OpenWeather API: 天气数据服务
  • Serper API: Google 搜索 API 代理
  • DeepSeek: 大语言模型服务
  • MySQL: 关系型数据库

7.3 Python 标准库

  • asyncio: 异步 I/O
  • requests: HTTP 库
  • json: JSON 处理
  • os: 操作系统接口

8. 学习路径建议

  1. 第一步:运行test_29.py理解 API 调用基础
  2. 第二步:运行test_30.py理解 Agent 的构建和工具集成
  3. 第三步:运行test_31.py理解基础流式输出
  4. 第四步:运行test_32.pytest_33.py对比同步/异步流式输出
  5. 第五步:运行test_34.py理解 messages 模式的实时输出
  6. 第六步:运行test_35.py掌握事件驱动的高级流式输出

9. 扩展阅读

  • LangChain 官方文档
  • LangGraph 官方文档
  • OpenWeather API 文档
  • SQLAlchemy 官方文档
  • Pydantic 官方文档

10. 总结

本系列脚本从基础的 API 调用到复杂的 Agent 流式输出,全面展示了 LangGraph 的核心功能:

工具集成:如何定义和使用工具
数据库操作:SQLAlchemy ORM 的最佳实践
流式输出:5 种不同模式的适用场景
异步编程:提升并发性能
事件驱动:精细控制 Agent 执行流程

掌握这些知识点后,可以构建功能强大的 LLM 应用,实现复杂的多步骤推理、工具调用和实时交互。


版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/1/29 6:45:32

30、Shell脚本编写与Bash安装指南

Shell脚本编写与Bash安装指南 1. Shell脚本编写最佳实践 在编写Shell脚本时,我们不仅要让代码完成任务,还要确保其具有可维护性,遵循良好的编程实践。 1.1 代码可读性 编写脚本时,代码的可读性至关重要。例如,六个月前编写的100行脚本,现在可能自己都难以理解其功能。…

作者头像 李华
网站建设 2026/1/29 16:08:40

31、Bash使用与相关Shell比较全解析

Bash使用与相关Shell比较全解析 1. Bash配置与测试 在Bash的使用过程中,配置和测试是重要的环节。 - 配置选项 : disabled-builtins 和 xpg-echo-default 选项默认是禁用的,其他选项则是启用的。很多其他的Shell特性可以通过修改 config-.top.h 文件来开启或关闭…

作者头像 李华
网站建设 2026/1/29 2:36:26

33、Bash 环境变量、操作符及选项全解析

Bash 环境变量、操作符及选项全解析 在使用 Bash 进行编程和操作时,了解其内置的环境变量、测试操作符、 set 选项、 shopt 选项以及 I/O 重定向等知识至关重要。以下将详细介绍这些内容。 内置 shell 变量 Bash 3.0 中有一系列内置的环境变量,这些变量在不同的场景下…

作者头像 李华
网站建设 2026/1/31 7:50:19

前端环境配置(nvm、nodejs、npm)

一、安装nvm 1. 下载vnm url: https://nvm.uihtm.com/doc/download-nvm.html 2. 解压文件后双击exe文件进行安装 ​ ​ 3. 选择nvm的安装地址,我是安装在D:\App\nvm ​ 4. 选择nodejs的安装地址,我是安装在C:\Program Files\nodejs ​ 5. 点击next 一直…

作者头像 李华
网站建设 2026/1/30 5:19:24

RK3588语音AI部署终极指南:算子兼容性深度优化与实战解决方案

RK3588语音AI部署终极指南:算子兼容性深度优化与实战解决方案 【免费下载链接】sherpa-onnx k2-fsa/sherpa-onnx: Sherpa-ONNX 项目与 ONNX 格式模型的处理有关,可能涉及将语音识别或者其他领域的模型转换为 ONNX 格式,并进行优化和部署。 …

作者头像 李华
网站建设 2026/1/30 23:10:57

EmotiVoice语音好奇感模拟促进知识探索

EmotiVoice语音好奇感模拟促进知识探索 在AI助手越来越频繁地出现在我们生活中的今天,一个明显的问题浮出水面:为什么大多数语音交互仍然让人感觉“冷冰冰”?无论是车载导航的一板一眼,还是智能音箱千篇一律的回答,用户…

作者头像 李华