本文介绍了智能体的核心概念和技术实现。智能体是具有自主规划能力的AI系统,关键技术包括CoT思维链、AgentLoop执行循环和ReAct模式。文章详细阐述了智能体的分层架构设计,重点讲解了工具系统设计(终止工具、询问工具)和MCP协议支持。通过代码示例实践展示了智能体的实现过程,包括状态管理、执行循环、工具调用等核心功能。同时提出了防止无限循环的机制和流式输出接口的实现方法。最终构建了一个具备自主规划、工具调用和流式交互能力的智能体系统。
智能体介绍
什么是智能体呢?简单说就是一个能够感知环境、进行推理、制定计划、做出决策,拥有自主规划能力的AI系统,智能体和普通AI的区别就是大家应该都用过的深度思考功能。
的关键技术(规划执行机制):CoT思维链、Agent Loop执行循环、ReAct模式、所需支持系统
CoT思维链:顾名思义像人类那样思考,实现方法很简单就是在输入Prompt时给模型提供额外的提示或引导,让模型一步一步思考问题,以逐步推理的方式生成答案,还可以运用few shot的优化技巧,让模型学习如何构建自己的思维链。
Agent Loop执行循环:是智能体最核心的工作机制,指智能体在没有用户输入的情况下,自主重复执行推理和工具调用的过程,实现的参考代码如下:
public String execute() { List<String> results = new ArrayList<>(); while (currentStep < MAX_STEPS && !isFinished) { currentStep++; // 这里实现具体的步骤逻辑 String stepResult = executeStep(); results.add("步骤 " + currentStep + ": " + stepResult); } if (currentStep >= MAX_STEPS) { results.add("达到最大步骤数: " + MAX_STEPS); } return String.join("\n", results); }ReAct模式:是结合了推理(Reasoning)和行动(Acting)的智能体架构,模仿人类解决问题时“推理-行动-观察”的循环,智能体常用的工作模式之一。
AI大模型、记忆系统、知识库、工具调用(如搜索引擎,API接口等外部服务,MCP也算工具调用的一种),补充一种特殊常用的工具调用叫Compute Use,允许智能体和计算机环境交互,比如执行代码、操作文件系统等,像Claude等可以自己修改我们代码。
使用智能体:1、平台中使用 2、软件中使用 3、程序中使用
我前面几篇博客所开发的那些,并不具备完整的自主规划和执行能力,本文让我们来实现高级的自主规划智能体吧!可以通过阅读Open Manus的源码查看Agent的架构。
AI智能体核心实现:学习Agent分层代理架构:
1、BaseAgent,是所有代理的基础,定义了代理状态管理和执行循环的核心逻辑
2、ReActAgent,实现了ReAct模式,将代理的执行过程分成思考和行动两个步骤
3、ToolCallAgent,在ReAct模式的基础上增加了工具调用的能力,就是工具调用机制的实现
4、Manus,是OpenManus的核心智能体实例,集成了各种工具和能力
关键实现细节:1、工具系统设计 2、MCP协议支持
工具系统设计:
1、工具抽象层BaseTool,所有工具都是继承自BaseTool抽象基类,提供统一的接口和行为
2、终止工具Terminate:允许智能体通过AI大模型决定何时结束任务,避免无限循环和过早结束
3、询问工具AskHuman:允许智能体遇到无法解决问题向人类求助,给用户一个输入框让我们能更好地干预智能体完成任务的进程,提升了智能体的实用性和安全性
4、工具集合ToolCollection:管理多个工具实例,使OpenManus能灵活管理、添加、移除工具
MCP协议支持:
1、MCP与工具系统的集成
2、动态工具代理
3、Manus中的MCP集成机制
自主实现Manus智能体
可参考Open Manus的结构:
在dogaiagent根包下新建一个agent包,再在agent包下新建一个model包,再在model包下新建一个AgentState枚举类Enum,列举状态代码如下:
/** * 代理执行状态的枚举类 */ public enum AgentState { /** * 空闲状态 */ IDLE, /** * 运行中状态 */ RUNNING, /** * 已完成状态 */ FINISHED, /** * 错误状态 */ ERROR }然后在agent包下新建BaseAgent类、ReActAgent类、ToolCallAgent类、DogManus类
/** *抽象基础代理类,用于状态管理和执行流程 * 提供状态转换,内存管理和基于步骤的执行循环功能 */ @Data @Slf4j public abstract class BaseAgent { //核心属性 private String name; //提示词 private String systemPrompt; private String nextPrompt; //代理状态 private AgentState state = AgentState.IDLE; //执行步骤控制 private int currentStep = 0; private int maxSteps = 10; //LLM大模型 private ChatClient chatClient; //Memory会话记忆(自主维护会话上下文) private List<Message> messageList = new ArrayList<>(); /** * 执行代理 * @param userPrompt 用户输入 * @return */ public String run(String userPrompt){ //基础校验 if(this.state != AgentState.IDLE){ throw new RuntimeException("Cannot run agent from state"+ this.state); } if(StrUtil.isBlank(userPrompt)){ throw new RuntimeException("Cannot run agent with empty user prompt"); } //执行更改状态 this.state = AgentState.RUNNING; //记录消息上下文 messageList.add(new UserMessage(userPrompt)); //保存结果列表 List<String> results = new ArrayList<>(); try { //执行循环 for (int i = 0; i < maxSteps && state !=AgentState.FINISHED; i++){ int stepNumber = i + 1; currentStep = stepNumber; log.info("Executing step {}/{}", stepNumber,maxSteps); //单步执行 String stepResult = step(); String result = "Step " + stepNumber + ": " + stepResult; results.add(result); } //检查是否超出步骤限制 if(currentStep >= maxSteps){ state = AgentState.FINISHED; results.add("Terminated:Reached max steps("+ maxSteps+")"); } return String.join("\n",results); }catch (Exception e){ state = AgentState.ERROR; log.error("error executing agent",e); return "执行错误"+e.getMessage(); }finally { //清理资源 this.cleanup(); } } /** * 定义单个步骤 * @return */ public abstract String step(); /** * 清理资源 */ protected void cleanup(){ //子类可以重写这个方法来清理资源 } }在ReActAgent类里编写以下代码:
/** * ReAct代理抽象类 * 实现思考-行动的循环模式 */ @EqualsAndHashCode(callSuper = true) @Data @Slf4j public abstract class ReActAgent extends BaseAgent { public abstract boolean think(); public abstract String act(); @Override public String step() { try{ //思考 boolean shouldAct = think(); if(!shouldAct){ return "思考完成-无需行动"; } //行动 return act(); }catch (Exception e){ //记录异常日志 e.printStackTrace(); return "执行错误"+e.getMessage(); } } }实现ToolCallAgent的两种方法
ToolCallAgent继承了ReActAgent,具体实现think()和act(),然后Spring AI的ChatClient已经支持工具调用,内部自动完成了think、act、observe,那我们该如何控制工具调用的执行呢?可以手动控制工具执行Spring AI 的这行代码控制了内部自动完成.internalToolExecutionEnabled(false),我们只需关掉就可以自己手动控制了。然后呢注意一个点,咱们使用的是阿里的DashScopeChatModel大模型,不能使用.internalToolExecutionEnabled(false)这个来关掉,而是等会要用把withProxyToolCalls选项设为true,来禁止托管工具调用
托管的工具调用能力,我们可以把所有的工具调用代码作为think方法,而act不定义任何动作。
在ToolCallAgent类里编写以下代码:
/** * 处理工具调用代理抽象类,实现think和act方法,可以用作创建Manus实例的父类 */ @EqualsAndHashCode(callSuper = true) @Data @Slf4j public class ToolCallAgent extends ReActAgent { //定义可用的工具列表,Spring AI的工具对象是ToolCallback private final ToolCallback[] availableTools; //保存工具调用信息的响应 private ChatResponse toolCallChatResponse; //工具调用管理者 private final ToolCallingManager toolCallingManager; //禁用内置的工具调用机制,自己维护上下文 private final ChatOptions chatOptions; public ToolCallAgent(ToolCallback[] availableTools) { super(); this.availableTools = availableTools; this.toolCallingManager = ToolCallingManager.builder().build(); // 禁用 Spring AI 内置的工具调用机制,自己维护选项和消息上下文 this.chatOptions = DashScopeChatOptions.builder() .withInternalToolExecutionEnabled(false) .build(); } @Override public boolean think() { //1、校验提示词,拼接用户提示词 if (StrUtil.isNotBlank(getNextPrompt())) { UserMessage userMessage = new UserMessage(getNextPrompt()); getMessageList().add(userMessage); } //2、调用AI大模型,获取工具调用结果 List<Message> messageList = getMessageList(); Prompt prompt = new Prompt(messageList, this.chatOptions); try{ ChatResponse chatResponse = getChatClient().prompt(prompt) .system(getSystemPrompt()) .tools(availableTools) .call() .chatResponse(); //记录响应,用于等下的act步骤 this.toolCallChatResponse = chatResponse; //3、解析工具调用结果,获取要调用的工具 //助手消息 AssistantMessage assistantMessage = chatResponse.getResult().getOutput(); //获取要调用的工具列表 List<AssistantMessage.ToolCall> toolCallList = assistantMessage.getToolCalls(); //输出提示信息 String result = assistantMessage.getText(); log.info(getName() + "的思考:" + result); log.info(getName() + "选择了:" + toolCallList.size() + "个工具来使用"); String toolCallInfo = toolCallList.stream() .map(toolCall -> String.format("工具名称:%s, 参数:%s", toolCall.name(), toolCall.arguments()) ) .collect(Collectors.joining("\n")); log.info(toolCallInfo); if (toolCallList.isEmpty()) { //只有不调用工具时,才记录助手消息 getMessageList().add(assistantMessage); return false; } else { //需要调用工具时,无需记录助手消息,因为调用工具时会自动记录 return true; } } catch (Exception e){ log.error(getName() + "的思考过程遇到了问题:" + e.getMessage()); getMessageList().add( new AssistantMessage("处理时遇到错误:" + e.getMessage())); return false; } } /** * 执行工具调用并处理结果 * * @return 执行结果 */ @Override public String act() { if (!toolCallChatResponse.hasToolCalls()) { return "没有工具调用"; } // 调用工具 Prompt prompt = new Prompt(getMessageList(), chatOptions); ToolExecutionResult toolExecutionResult = toolCallingManager.executeToolCalls(prompt, toolCallChatResponse); // 记录消息上下文,conversationHistory 已经包含了助手消息和工具调用返回的结果 setMessageList(toolExecutionResult.conversationHistory()); // 当前工具调用的结果 ToolResponseMessage toolResponseMessage = (ToolResponseMessage) CollUtil.getLast(toolExecutionResult.conversationHistory()); String results = toolResponseMessage.getResponses().stream() .map(response -> "工具 " + response.name() + " 完成了它的任务!结果: " + response.responseData()) .collect(Collectors.joining("\n")); log.info(results); return results; } }然后我们还需要有一个终止的工具,让智能体能够自行决定任务结束,在tools包下新建一个TerminateTool类,编写以下代码:
public class TerminateTool { @Tool(description = """ Terminate the interaction when the request is met OR if the assistant cannot proceed further with the task. "When you have finished all the tasks, call this tool to end the work. """) public String doTerminate() { return "任务结束"; } }然后在集中的工具注册ToolRegistration类里加上这个终止工具,最终代码如下:
/** * 集中的工具注册 */ @Configuration public class ToolRegistration { @Value("${search-api.api-key}") private String searchApiKey; @Bean public ToolCallback[] allTools() { FileOperationTool fileOperationTool = new FileOperationTool(); WebSearchTool webSearchTool = new WebSearchTool(searchApiKey); WebScrapingTool webScrapingTool = new WebScrapingTool(); ResourceDownloadTool resourceDownloadTool = new ResourceDownloadTool(); TerminalOperationTool terminalOperationTool = new TerminalOperationTool(); PDFGenerationTool pdfGenerationTool = new PDFGenerationTool(); TerminateTool terminateTool = new TerminateTool(); return ToolCallbacks.from( fileOperationTool, webSearchTool, webScrapingTool, resourceDownloadTool, terminalOperationTool, pdfGenerationTool, terminateTool ); } }接着我们需要完善我们的act方法,判断是否调用了终止工具当调用了终止工具,修改状态为“已结束”,代码如下:
@Override public String act() { if (!toolCallChatResponse.hasToolCalls()) { return "没有工具调用"; } // 调用工具 Prompt prompt = new Prompt(getMessageList(), chatOptions); ToolExecutionResult toolExecutionResult = toolCallingManager.executeToolCalls(prompt, toolCallChatResponse); // 记录消息上下文,conversationHistory 已经包含了助手消息和工具调用返回的结果 setMessageList(toolExecutionResult.conversationHistory()); // 当前工具调用的结果 ToolResponseMessage toolResponseMessage = (ToolResponseMessage) CollUtil.getLast(toolExecutionResult.conversationHistory()); String results = toolResponseMessage.getResponses().stream() .map(response -> "工具 " + response.name() + " 完成了它的任务!结果: " + response.responseData()) .collect(Collectors.joining("\n")); //判断是否调用了终止工具 boolean terminateToolCalled = toolResponseMessage.getResponses().stream() .anyMatch(response -> "doTerminate".equals(response.name())); if(terminateToolCalled){ setState(AgentState.FINISHED); } log.info(results); return results; } }然后咱们来编写DogManus类,代码如下:
@Component public class DogManus extends ToolCallAgent { public DogManus(ToolCallback[] allTools, ChatModel dashscopeChatModel) { super(allTools); this.setName("dogManus"); String SYSTEM_PROMPT = """ You are DogManus, an all-capable AI assistant, aimed at solving any task presented by the user. You have various tools at your disposal that you can call upon to efficiently complete complex requests. """; this.setSystemPrompt(SYSTEM_PROMPT); String NEXT_STEP_PROMPT = """ Based on user needs, proactively select the most appropriate tool or combination of tools. For complex tasks, you can break down the problem and use different tools step by step to solve it. After using each tool, clearly explain the execution results and suggest the next steps. If you want to stop the interaction at any point, use the `terminate` tool/function call. """; this.setNextPrompt(NEXT_STEP_PROMPT); this.setMaxSteps(20); // 初始化客户端 ChatClient chatClient = ChatClient.builder(dashscopeChatModel) .defaultAdvisors(new MyLoggerAdvisor()) .build(); this.setChatClient(chatClient); } }生成对应单元测试,测试一下,代码如下:
class DogManusTest { @Resource private DogManus yuManus; @Test void run() { String userPrompt = """ 我和家人要在国庆去福建福州旅行,给我推荐一个美丽景点, 并结合一些网络图片,制定一份详细的旅游计划, 并以 PDF 格式输出"""; String answer = yuManus.run(userPrompt); Assertions.assertNotNull(answer); } }智能体陷入无限循环
咱们思考一个问题?如果设定的步骤数很大,然后智能体在步骤限制内又一直无法解决用户的问题,就会一直重复回答反复执行,这时岂不是陷入了无限循环问题?所以我们可以添加一个循环检测和处理机制,来防止陷入无限循环,代码如下:
//定义出现重复助手信息的阈值 private int duplicateThreshold = 2; //解决陷入无限循环问题 // 每一步 step 执行完都要检查是否陷入循环 if (isStuck()) { handleStuckState(); } /** * 处理陷入循环的状态 */ protected void handleStuckState() { String stuckPrompt = "观察到重复响应。考虑新策略,避免重复已尝试过的无效路径。"; this.nextPrompt = stuckPrompt + "\n" + (this.nextPrompt != null ? this.nextPrompt : ""); System.out.println("Agent detected stuck state. Added prompt: " + stuckPrompt); } /** * 检查代理是否陷入循环 * * @return 是否陷入循环 */ protected boolean isStuck() { List<Message> messages = this.messageList; if (messages.size() < 2) { return false; } Message lastMessage = messages.get(messages.size() - 1); if (lastMessage.getText() == null || lastMessage.getText().isEmpty()) { return false; } // 计算重复内容出现次数 int duplicateCount = 0; for (int i = messages.size() - 2; i >= 0; i--) { Message msg = messages.get(i); //instanceof判断是否是助手消息,只有是助手消息重复才++ if (msg instanceof AssistantMessage && lastMessage.getText().equals(msg.getText())) { duplicateCount++; } } return duplicateCount >= this.duplicateThreshold; }然后咱们前面提到一个叫AskHuman的功能,就是当AI没法回答时向用户寻求帮助,提示让用户进一步输入提示给AI,这边可以扩展补充一下,代码如下:
@Component public class AskHumanTool { private BaseAgent agent; public AskHumanTool(BaseAgent agent) { this.agent = agent; } /** * 思考是否需要向用户提问 * @return 是否需要继续执行 */ public boolean think() { // 获取消息列表 if (agent.getMessageList().isEmpty()) { return false; } // 获取最新的助手消息 Message lastMessage = agent.getMessageList().get(agent.getMessageList().size() - 1); if (lastMessage instanceof AssistantMessage) { String content = lastMessage.getText(); // 检查是否包含向用户询问的标记 if (content != null && content.contains("[ASK_USER]")) { // 提取问题 String question = content.substring(content.indexOf("[ASK_USER]") + 10); // 向用户输出问题 System.out.println("智能体需要你的帮助: " + question); // 获取用户输入 Scanner scanner = new Scanner(System.in); String userAnswer = scanner.nextLine(); // 添加用户回答到消息列表 UserMessage userResponse = new UserMessage("用户回答: " + userAnswer); agent.getMessageList().add(userResponse); // 需要继续思考 return true; } } return false; } }智能体工作流(Workflow):单个智能体无法满足需求时,通过编排允许多个智能体协同工作,将复杂的任务分解为连贯的节点链,每个节点由最适合的智能体处理
Sping AI Alibaba Graph,相比于传统的工作流它们添加了AI的东西
OWL框架:本质就是让AI智能体来增强传统办公的场景,给AI工具调用让AI来数据分析、让AI来检索网页
如何交流协作的协议。
AI服务化(流式输出)
AI应用接口开发:平时开发的接口大多是同步的,就是等后端处理完再返回,那对于我们目前这个,导致AI响应速度比较慢,那用户要等很久肯定是不行的,所以目前大多都采用SSE技术实现流式输出,就像打字机一样一个一个字输出的效果,提升用户体验。
用的SSE流式输出,首先在TravelApp类里添加流式调用方法,代码如下:
public Flux<String> doChatByStream(String message , String chatId){ return chatClient .prompt() .user(message) .advisors(spec -> spec.param(CHAT_MEMORY_CONVERSATION_ID_KEY, chatId) .param(CHAT_MEMORY_RETRIEVE_SIZE_KEY, 10)) .stream() .content(); }然后我们来编写接口,在controller包下新建一个AiController类,实现接口代码如下:
@RestController @RequestMapping("/ai") public class AiController { @Resource private TravelApp travelApp; @Resource private ToolCallback[] allTools; @Resource private ChatModel dashscopeChatModel; //创建同步接口 @GetMapping("/travel_app/chat/sync") public String doChatWithTravelAppSync(String message , String chatId){ return travelApp.doChat(message,chatId); } //使用SSEEmiter创建异步接口,通过send方法持续向SseEmitter发送消息(类似IO操作) @GetMapping("/travel_app/chat/sse/emitter") public SseEmitter doChatWithTravelAppSseEmitter(String message , String chatId){ //创建一个超时时间较长的SseEmitter,3分钟超时 SseEmitter emitter = new SseEmitter(180000L); //获取Flux数据流并直接订阅 travelApp.doChatByStream(message,chatId) .subscribe( //处理每条消息 chunk ->{ try { emitter.send(chunk); } catch (IOException e) { emitter.completeWithError(e); } }, //处理错误 emitter::completeWithError, //处理完成 emitter::complete ); return emitter; }然后可以用Swagger接口文档来测试,或者使用CURL工具来测试(一般来说电脑自带了这个工具),可以打开终端输入以下命令:curl 'http://localhost:8123/api/ai/love_app/chat/sse?message=hello&chatId=1'或者可以在浏览器的F12里面选中网络请求来复杂CURL命令,很方便测试。
首先在BaseAgent类里面添加流式输出方法,代码如下:
/** * 运行代理并返回结果流(流式输出) * @param userPrompt * @return */ public SseEmitter runStream(String userPrompt){ //创建SseEmitter,设置较长的超时时间,5分钟超时 SseEmitter emitter = new SseEmitter(300000L); //使用线程异步处理,避免阻塞主线程 CompletableFuture.runAsync(() -> { try { if (this.state != AgentState.IDLE) { emitter.send("错误,无法从状态运行代理:" + this.state); emitter.complete(); return; } if (StrUtil.isBlank(userPrompt)) { emitter.send("错误,不能使用空提示词运行代理"); emitter.complete(); return; } //更改状态 state = AgentState.RUNNING; //记录消息上下文 messageList.add(new UserMessage(userPrompt)); try { //执行循环 for (int i = 0; i < maxSteps && state != AgentState.FINISHED; i++) { int stepNumber = i + 1; currentStep = stepNumber; log.info("Executing step {}/{}", stepNumber, maxSteps); //单步执行 String stepResult = step(); String result = "Step " + stepNumber + ": " + stepResult; //解决陷入无限循环问题 // 每一步 step 执行完都要检查是否陷入循环 if (isStuck()) { handleStuckState(); } //发送每一步的结果 emitter.send(result); } //检查是否超出步骤限制 if (currentStep >= maxSteps) { state = AgentState.FINISHED; emitter.send("执行结束,达到最大步骤 (" + maxSteps + ")"); } //正常完成 emitter.complete(); } catch (Exception e) { state = AgentState.ERROR; log.error("执行智能体失败", e); try { emitter.send("执行错误:" + e.getMessage()); emitter.complete(); } catch (Exception ex) { emitter.completeWithError(ex); } } finally { //清理资源 this.cleanup(); } } catch (Exception e) { emitter.completeWithError(e); } }); //设置超时和完成回调 emitter.onTimeout(() -> { this.state = AgentState.ERROR; this.cleanup(); log.warn("SSE connection timed out"); }); emitter.onCompletion(() -> { if(this.state == AgentState.RUNNING){ this.state = AgentState.FINISHED; } this.cleanup(); log.info("SSE connection completed"); }); return emitter; }接着我们在AiController里编写智能体的接口,代码如下:
/** * 流式调用Manus超级智能体 * @param message * @return */ @GetMapping("/manus/chat") public SseEmitter doChatWithManus(String message){ DogManus dogManus = new DogManus(allTools,dashscopeChatModel); return dogManus.runStream(message); }然后跟前面一样的方法测试接口。
那要使前端能调用后端,我们需要在后端配置跨域支持。在dogaiagent包下新建一个config包,再在config包下新建一个CorsConfig类,全局跨域配置代码如下:
/** * 全局跨域配置 */ @Configuration public class CorsConfig implements WebMvcConfigurer { @Override public void addCorsMappings(CorsRegistry registry) { // 覆盖所有请求 registry.addMapping("/**") // 允许发送 Cookie .allowCredentials(true) // 放行哪些域名(必须用 patterns,否则 * 会和 allowCredentials 冲突) .allowedOriginPatterns("*") .allowedMethods("GET", "POST", "PUT", "DELETE", "OPTIONS") .allowedHeaders("*") .exposedHeaders("*"); } }至此,基本智能体具备的功能就搞定了!