news 2026/6/23 19:44:51

AI 流式响应实战:从同步等待到实时推送

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
AI 流式响应实战:从同步等待到实时推送

AI 流式响应实战:从同步等待到实时推送

在 IM 系统中集成 AI 时,流式响应能显著提升性能。本文介绍 AQChat 如何实现 AI 流式响应,从同步等待到实时推送。

一、为什么需要流式响应?

同步等待的问题

传统同步方式的问题

// ❌ 同步方式:用户需要等待AI完整响应StringaiResponse=aiService.getAnswer(userMessage);// 如果AI响应需要10秒,用户就要等待10秒sendMessage(aiResponse);

问题:

  1. 等待时间长:AI 生成可能需要 5-10 秒,用户长时间等待
  2. 体验差:无法看到生成过程,感觉卡顿
  3. 资源占用:长时间占用连线和线程

流式响应的优势

  1. 实时反馈:逐字显示,用户可立即看到内容
  2. 体验更好:类似 ChatGPT 的打字机效果
  3. 资源利用:边生成边推送,不阻塞

对比

方式首字延迟完整响应时间用户体验
同步等待10秒10秒
流式响应1-2秒10秒

回调函数模式的设计

统一接口设计

定义统一的 AI 服务接口

publicinterfaceIAiService{/** * 流式调用AI服务 * @param userMsg 用户消息 * @param consumer 回调函数,处理每个数据块 */voidstreamCallWithMessage(StringuserMsg,Consumer<AIResult>consumer);/** * 多轮对话 */defaultvoidchat(Stringmessage,List<MessageRecord>messages,Consumer<AIResult>consumer){}}

关键点

  • 使用Consumer<AIResult>作为回调
  • 每个数据块通过回调处理
  • 支持多轮对话

AIResult 设计

publicinterfaceAIResult{StringgetContent();// 当前数据块的内容intgetStatus();// 状态:WAIT(0-进行中)、END(1-结束)、FAIL(2-失败)}

状态枚举

publicenumAIMessageStatusEnum{WAIT(0,"wait"),// 流式响应进行中END(1,"end"),// 流式响应结束FAIL(2,"fail");// 流式响应失败}

三、WebSocket 实时推送的实现

整体流程

用户发送消息 ↓RocketMQ异步处理 ↓ AI服务流式调用 ↓ 回调函数处理每个数据块 ↓ 封装为 STREAM_MSG_NOTIFY ↓WebSocket实时推送

代码实现

  1. RocketMQ 消费者接收消息
@ComponentpublicclassAIHelperReceiverimplementsInitializingBean{@ResourceprivateIAiServiceaiService;@ResourceprivateGlobalChannelHolderglobalChannelHolder;publicvoidinitConsumer(){defaultMQPushConsumer.setMessageListener((MessageListenerConcurrently)(messageExtList,context)->{for(MessageExtmessageExt:messageExtList){MessageDtomessageDto=JSONObject.parseObject(msgStr,MessageDto.class);// 提交到独立线程池,不阻塞MQ消费线程threadPoolUtil.submitTask(()->{StringBuilderfullContent=newStringBuilder();try{// 流式调用AI服务aiService.streamCallWithMessage(messageDto.getMessageContent(),aiResult->{// 回调函数:处理每个数据块AIMessageDtoaiMessageDto=newAIMessageDto();aiMessageDto.setMessageId(messageDto.getMessageId());aiMessageDto.setRoomId(messageDto.getRoomId());aiMessageDto.setContent(aiResult.getContent());aiMessageDto.setStatus(aiResult.getStatus());// 实时推送globalChannelHolder.sendBroadcastAIMessage(aiMessageDto,AQBusinessConstant.AI_HELPER_ID);// 累积完整内容fullContent.append(aiResult.getContent());});}catch(Exceptione){// 错误处理LOGGER.error("AI助手处理消息失败",e);AIMessageDtofailMessage=newAIMessageDto();failMessage.setStatus(AIMessageStatusEnum.FAIL.getCode());globalChannelHolder.sendBroadcastAIMessage(failMessage,AQBusinessConstant.AI_HELPER_ID);}finally{// 流式响应结束后,持久化完整消息MessageDtostoreMessage=buildStoreMessage(messageDto,fullContent);messageService.saveMessage(storeMessage);}});}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;});}}
  1. 封装流式消息并推送
@ComponentpublicclassGlobalChannelHolder{publicvoidsendBroadcastAIMessage(AIMessageDtoaiMessageDto,StringaiId){// 1. 获取AI助手信息UserGlobalInfoDtouserInfo=userHolder.getUserInfo(aiId);// 2. 构建流式消息AQChatMsgProtocol.StreamMsgNotifystreamMsgNotify=AQChatMsgProtocol.StreamMsgNotify.newBuilder().setUser(userBuilder).setMsgId(aiMessageDto.getMessageId()).setRoomId(aiMessageDto.getRoomId()).setContent(aiMessageDto.getContent()==null?"":aiMessageDto.getContent()).setStreamType(aiMessageDto.getStatus())// 0-进行中,1-结束,2-失败.build();// 3. 广播到房间内所有用户messageBroadcaster.broadcast(aiMessageDto.getRoomId(),streamMsgNotify);}}
  1. 消息广播
@ComponentpublicclassMessageBroadcaster{privatefinalMap<String,ChannelGroup>channelGroupMap=newConcurrentHashMap<>();public<TextendsGeneratedMessageV3>voidbroadcast(StringroomId,Tmsg){ChannelGroupchannelGroup=channelGroupMap.get(roomId);if(channelGroup!=null){// 批量发送,高效channelGroup.writeAndFlush(msg);}}}

四、流式消息的封装(STREM_MSG_NOTIFY)

Protobuf 消息定义

// 流式消息通知messageStreamMsgNotify{string roomId=1;// 房间IDstring msgId=2;// 消息IDUseruser=3;// AI助手信息int32 streamType=4;// 流类型:0-进行中,1-结束,2-失败string content=5;// 当前数据块内容}

消息类型

enumMsgCommand{// ...STREAM_MSG_NOTIFY=32;// 流式消息通知// ...}

消息状态流转

用户发送消息 ↓ STREAM_MSG_NOTIFY(streamType=0,content="你")← 第一个数据块 ↓ STREAM_MSG_NOTIFY(streamType=0,content="好")← 第二个数据块 ↓ STREAM_MSG_NOTIFY(streamType=0,content=",")← 第三个数据块 ↓...↓ STREAM_MSG_NOTIFY(streamType=1,content="")← 结束标志

前端处理示例(伪代码)

websocket.onmessage=(event)=>{constmessage=JSON.parse(event.data);if(message.command==='STREAM_MSG_NOTIFY'){if(message.streamType===0){// 进行中:追加内容appendContent(message.content);}elseif(message.streamType===1){// 结束:显示完整消息showCompleteMessage();}elseif(message.streamType===2){// 失败:显示错误提示showErrorMessage();}}};

五、多 AI 平台集成的统一接口设计

问题:不同 AI 平台的 API 不同

  • 阿里百炼:使用Flowable<GenerationResult>
  • Gitee AI:使用MessageHandler<String>
  • 其他平台:可能有不同的流式接口

解决方案:统一接口 + 适配器模式

  1. 统一接口定义
publicinterfaceIAiService{/** * 流式调用,统一使用 Consumer<AIResult> 回调 */voidstreamCallWithMessage(StringuserMsg,Consumer<AIResult>consumer);}
  1. 阿里百炼实现
@Service@PrimarypublicclassQWAiServiceimplementsIAiService{@OverridepublicvoidstreamCallWithMessage(StringuserMsg,Consumer<AIResult>consumer){Generationgen=newGeneration();Messagemessage=Message.builder().role(Role.USER.getValue()).content(userMsg).build();// 调用阿里百炼流式APIFlowable<GenerationResult>result=gen.streamCall(generationParam);// 转换为统一格式result.blockingForEach(r->{Stringcontent=r.getOutput().getChoices().get(0).getMessage().getContent();StringfinishReason=r.getOutput().getChoices().get(0).getFinishReason();QWResultqwResult=newQWResult();qwResult.setContent(content);// 判断是否结束qwResult.setStatus("stop".equals(finishReason)?AIMessageStatusEnum.END.getCode():AIMessageStatusEnum.WAIT.getCode());// 调用统一回调consumer.accept(qwResult);});}}
  1. Gitee AI 实现
@ServicepublicclassGiteeAIServiceimplementsIAiService{@ResourceprivateGiteeAIClientgiteeAIClient;@OverridepublicvoidstreamCallWithMessage(StringuserMsg,Consumer<AIResult>consumer){// 调用Gitee AI流式APIgiteeAIClient.streamChat(message,messageList,data->{JSONObjectparse=JSONObject.parseObject(data);JSONArraychoices=parse.getJSONArray("choices");JSONObjectchoicesIn=choices.getJSONObject(0);StringfinishReason=choicesIn.getString("finish_reason");if(finishReason!=null&&finishReason.equals("stop")){// 结束GiteeResultgiteeResult=newGiteeResult();giteeResult.setStatus(AIMessageStatusEnum.END.getCode());consumer.accept(giteeResult);return;}// 进行中JSONObjectdelta=choicesIn.getJSONObject("delta");Stringcontent=delta.getString("content");if(content!=null&&!content.isEmpty()){GiteeResultgiteeResult=newGiteeResult();giteeResult.setContent(content);giteeResult.setStatus(AIMessageStatusEnum.WAIT.getCode());consumer.accept(giteeResult);}});}}

统一接口的优势

  1. 业务代码无需关心具体平台
  2. 易于扩展新平台
  3. 便于切换平台(通过@Prime注解)

使用示例

// 业务代码只需要调用统一接口@ResourceprivateIAiServiceaiService;// Spring会自动注入@Primary的实现aiService.streamCallWithMessage(userMsg,aiResult->{// 处理流式响应,不关心是哪个AI平台sendBroadcastAIMessage(aiResult);});

六、性能优化

  1. 独立线程池
// AI处理在独立线程池中执行,不阻塞MQ消费线程threadPoolUtil.submitTask(()->{aiService.streamCallWithMessage(userMsg,consumer);});

优势

  • 不阻塞 RocketMQ 消费线程
  • AI 处理失败不影响其他消息
  • 可控制并发数
  1. 异步处理
// 消息发送到RocketMQ,异步处理mqSendingAgent.aiHelper(messageDto);// 立即返回,不等待AI响应

优势

  • 用户发送消息后立即返回
  • AI 响应通过 WebSocket 实时推送
  • 提升响应速度

七、总结

关键点

  1. 流式响应:使用回调函数模式,实时推送每个数据块
  2. 统一接口:IAiservice统一不同 AI 平台的接口
  3. WebSocket 推送:通过STREAM_MSG_NOTIFY实时推送
  4. 异步处理:使用 RocketMQ + 独立线程池,不阻塞主流程

优化效果

指标同步流式响应提升
首字延迟10秒1-2秒5-10倍
用户体验显著提升
资源占用降低

经验总结

  1. 流式响应能显著提升性能
  2. 统一接口便于多平台集成
  3. 异步处理避免阻塞
  4. 回调函数模式适合流式场景

通过以上实现,AQChat 实现了类似 ChatGPT 的流式响应效果,提升了用户体验。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/23 8:57:51

Open-AutoGLM安装总失败?深度解析Python依赖树中的隐藏陷阱

第一章&#xff1a;Open-AutoGLM安装失败的常见现象与初步诊断在部署 Open-AutoGLM 过程中&#xff0c;用户常遇到安装失败问题&#xff0c;这些故障通常表现为依赖缺失、环境冲突或权限异常。识别典型现象并进行快速诊断是解决问题的第一步。典型安装失败表现 执行 pip instal…

作者头像 李华
网站建设 2026/6/23 16:55:09

软件测试生命周期管理的核心框架与实践策略

在当今快速迭代的软件开发环境中&#xff0c;有效的测试生命周期管理已成为保障产品质量、控制项目风险的关键环节。本文从测试从业者的实际工作场景出发&#xff0c;系统阐述测试生命周期各阶段的组织方法、技术实践与协作机制&#xff0c;帮助团队建立可持续改进的质量保障体…

作者头像 李华
网站建设 2026/6/23 16:57:02

计算机毕设java疫情背景下大学生宿舍管理系统 基于Java的疫情环境下高校宿舍智能化管理系统开发 疫情防控期间大学生宿舍管理的Java平台构建与应用

计算机毕设java疫情背景下大学生宿舍管理系统2eq859 &#xff08;配套有源码 程序 mysql数据库 论文&#xff09; 本套源码可以在文本联xi,先看具体系统功能演示视频领取&#xff0c;可分享源码参考。随着疫情的持续影响&#xff0c;高校的日常管理面临着诸多挑战&#xff0c;尤…

作者头像 李华
网站建设 2026/6/23 18:36:31

别再有线连接了!Open-AutoGLM无线调试究竟有多强大?

第一章&#xff1a;告别有线束缚——Open-AutoGLM无线调试的革命性意义在移动开发与嵌入式系统演进的浪潮中&#xff0c;调试方式的革新始终是提升效率的关键。传统依赖USB线缆的调试模式虽稳定可靠&#xff0c;却严重制约了设备布局的灵活性与多场景测试的可行性。Open-AutoGL…

作者头像 李华
网站建设 2026/6/23 11:20:34

Open-AutoGLM权限困境破解,一文搞定非root环境下的完整部署流程

第一章&#xff1a;Open-AutoGLM权限困境的核心挑战在部署和集成 Open-AutoGLM 模型时&#xff0c;权限管理成为系统安全与功能可用性之间的关键矛盾点。该模型通常需要访问敏感数据源、执行远程调用并动态加载插件&#xff0c;而传统权限模型难以平衡灵活性与控制力。权限粒度…

作者头像 李华