在 Web 应用开发中,实时通信场景越来越常见 —— 比如社交软件的即时消息、后台系统的操作通知、电商平台的订单状态提醒等。传统的轮询方式(如定时 Ajax 请求)不仅会造成服务器资源浪费,还无法做到真正的 “实时”。而WebSocket作为一种全双工通信协议,能在客户端和服务器之间建立持久连接,完美解决实时通信的需求。
本文将以Spring Boot(后端)+ Vue3(前端)技术栈为例,从基础集成到高级功能(如指定用户推送、桌面通知),完整实现 WebSocket 实时消息推送功能,所有代码均可直接落地生产环境,并附带性能优化建议。
一、WebSocket 核心原理简述
WebSocket 与 HTTP 的区别在于:
- HTTP 是单向通信:客户端发起请求,服务器返回响应,连接立即关闭;
- WebSocket 是双向通信:客户端和服务器通过一次握手建立连接后,双方可随时互发消息,连接保持至主动关闭;
- 协议标识不同:WebSocket 使用
ws://(非加密)或wss://(加密)协议,握手阶段基于 HTTP,因此可兼容现有 HTTP 端口(80/443)。
整个通信流程:
- 客户端发送 HTTP 请求,请求头中包含
Upgrade: websocket表示要升级为 WebSocket 协议; - 服务器响应 101 状态码,表示协议切换成功;
- 双方建立 WebSocket 连接,开始双向通信。
二、后端实现:Spring Boot 集成 WebSocket
Spring Boot 对 WebSocket 提供了完善的自动配置支持,我们使用spring-boot-starter-websocket依赖快速集成。
1. 核心依赖(pom.xml)
<dependencies> <!-- Spring Boot Web核心 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- WebSocket支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <!-- Lombok简化代码(可选) --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!-- JSON处理(阿里FastJSON2) --> <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.48</version> </dependency> </dependencies>2. WebSocket 配置类
配置 WebSocket 的核心处理器和拦截器,注册端点供客户端连接:
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * WebSocket核心配置类 */ @Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { private final WebSocketMessageHandler webSocketMessageHandler; private final WebSocketAuthInterceptor webSocketAuthInterceptor; // 构造器注入处理器和拦截器 public WebSocketConfig(WebSocketMessageHandler webSocketMessageHandler, WebSocketAuthInterceptor webSocketAuthInterceptor) { this.webSocketMessageHandler = webSocketMessageHandler; this.webSocketAuthInterceptor = webSocketAuthInterceptor; } /** * 注册WebSocket端点,供客户端连接 */ @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { // 端点路径:ws://localhost:8080/ws/message registry.addHandler(webSocketMessageHandler, "/ws/message") // 允许跨域(开发环境,生产环境建议配置具体域名) .setAllowedOrigins("*") // 添加拦截器(用于用户认证,比如校验token) .addInterceptors(webSocketAuthInterceptor); } /** * 支持ServerEndpoint注解(可选,若使用注解方式则需配置) */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }3. WebSocket 拦截器(用户认证)
在建立连接前,校验客户端的身份(比如从请求参数中获取 token,解析用户 ID),这是生产环境的必要步骤:
import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.HandshakeInterceptor; import java.util.Map; /** * WebSocket握手拦截器:用于用户认证 */ public class WebSocketAuthInterceptor implements HandshakeInterceptor { /** * 握手前执行:校验用户身份 */ @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { // 从请求参数中获取token(客户端连接时传入:ws://localhost:8080/ws/message?token=xxx) ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request; String token = servletRequest.getServletRequest().getParameter("token"); // 此处简化处理:实际项目中需校验token有效性,解析出用户ID if (token == null || !token.equals("admin123")) { // 认证失败,拒绝连接 return false; } // 将用户ID存入属性,供后续处理器使用 attributes.put("userId", "1001"); return true; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { // 握手后执行(可留空) } }4. WebSocket 消息处理器(核心业务逻辑)
处理客户端的连接、断开、消息接收和发送,实现广播消息(发给所有用户)和指定用户推送(发给单个用户):
import com.alibaba.fastjson2.JSON; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * WebSocket消息处理器:处理连接、消息收发 */ @Slf4j @Component public class WebSocketMessageHandler extends TextWebSocketHandler { /** * 存储用户会话:key=userId,value=WebSocketSession * 使用ConcurrentHashMap保证线程安全 */ private static final Map<String, WebSocketSession> USER_SESSIONS = new ConcurrentHashMap<>(); /** * 连接建立成功时执行 */ @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { // 获取拦截器中存入的用户ID String userId = (String) session.getAttributes().get("userId"); if (userId != null) { // 存储用户会话 USER_SESSIONS.put(userId, session); log.info("用户{}建立WebSocket连接,当前在线人数:{}", userId, USER_SESSIONS.size()); // 发送欢迎消息 sendMessageToUser(userId, "连接成功,当前时间:" + System.currentTimeMillis()); } } /** * 接收客户端发送的消息 */ @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { // 获取客户端发送的消息内容 String payload = message.getPayload(); log.info("收到客户端消息:{}", payload); // 此处可处理业务逻辑,比如解析消息类型,执行对应操作 // 示例:将客户端消息广播给所有用户 broadcastMessage("用户" + session.getAttributes().get("userId") + ":" + payload); } /** * 连接关闭时执行 */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { String userId = (String) session.getAttributes().get("userId"); if (userId != null) { // 移除用户会话 USER_SESSIONS.remove(userId); log.info("用户{}断开WebSocket连接,当前在线人数:{}", userId, USER_SESSIONS.size()); } } /** * 发生错误时执行 */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { log.error("WebSocket连接发生错误:", exception); // 断开连接时清理会话 if (session.isOpen()) { session.close(); } String userId = (String) session.getAttributes().get("userId"); if (userId != null) { USER_SESSIONS.remove(userId); } } /** * 发送消息给指定用户 * @param userId 用户ID * @param message 消息内容 */ public void sendMessageToUser(String userId, String message) { WebSocketSession session = USER_SESSIONS.get(userId); if (session != null && session.isOpen()) { try { // 发送JSON格式消息(便于前端解析) session.sendMessage(new TextMessage(JSON.toJSONString(new MessageVo("info", message)))); } catch (IOException e) { log.error("发送消息给用户{}失败:", userId, e); } } else { log.warn("用户{}的WebSocket连接已关闭", userId); } } /** * 广播消息:发送给所有在线用户 * @param message 消息内容 */ public void broadcastMessage(String message) { for (Map.Entry<String, WebSocketSession> entry : USER_SESSIONS.entrySet()) { WebSocketSession session = entry.getValue(); if (session.isOpen()) { try { session.sendMessage(new TextMessage(JSON.toJSONString(new MessageVo("broadcast", message)))); } catch (IOException e) { log.error("广播消息给用户{}失败:", entry.getKey(), e); } } } } /** * 消息VO:封装消息类型和内容 */ @lombok.Data public static class MessageVo { private String type; // 消息类型:info/error/broadcast private String content; // 消息内容 public MessageVo(String type, String content) { this.type = type; this.content = content; } } }5. 消息推送接口(供业务系统调用)
在实际项目中,消息推送往往由业务逻辑触发(比如订单创建、审核通过),因此提供一个 HTTP 接口供其他模块调用:
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * 消息推送接口:供业务系统调用 */ @RestController @RequestMapping("/api/message") public class MessagePushController { @Resource private WebSocketMessageHandler webSocketMessageHandler; /** * 发送消息给指定用户 */ @GetMapping("/send/{userId}/{content}") public String sendMessageToUser(@PathVariable String userId, @PathVariable String content) { webSocketMessageHandler.sendMessageToUser(userId, content); return "消息发送成功"; } /** * 广播消息给所有用户 */ @GetMapping("/broadcast/{content}") public String broadcastMessage(@PathVariable String content) { webSocketMessageHandler.broadcastMessage(content); return "广播消息发送成功"; } }三、前端实现:Vue3 集成 WebSocket 与桌面通知
前端需要实现 WebSocket 连接、消息接收、桌面通知(浏览器通知)功能,这里以 Vue3 + Element Plus 为例。
1. WebSocket 工具类(utils/websocket.js)
封装 WebSocket 的连接、消息发送、重连逻辑(重连是生产环境的重要保障):
/** * WebSocket工具类 */ class WebSocketClient { constructor() { this.ws = null; // WebSocket实例 this.reconnectInterval = 3000; // 重连间隔(毫秒) this.maxReconnectTimes = 10; // 最大重连次数 this.reconnectTimes = 0; // 当前重连次数 this.callbacks = { onMessage: null, // 消息接收回调 onOpen: null, // 连接成功回调 onClose: null, // 连接关闭回调 onError: null // 错误回调 }; } /** * 初始化WebSocket连接 * @param {String} url 连接地址 * @param {Object} callbacks 回调函数 */ init(url, callbacks) { // 保存回调函数 this.callbacks = { ...this.callbacks, ...callbacks }; // 建立连接 this.connect(url); } /** * 建立WebSocket连接 * @param {String} url 连接地址 */ connect(url) { if (!window.WebSocket) { this.callbacks.onError?.("当前浏览器不支持WebSocket"); return; } this.ws = new WebSocket(url); // 连接成功 this.ws.onopen = () => { console.log("WebSocket连接成功"); this.reconnectTimes = 0; // 重置重连次数 this.callbacks.onOpen?.(); }; // 接收消息 this.ws.onmessage = (event) => { const message = JSON.parse(event.data); this.callbacks.onMessage?.(message); // 触发桌面通知 this.showNotification(message); }; // 连接关闭 this.ws.onclose = (event) => { console.log("WebSocket连接关闭,状态码:", event.code); this.callbacks.onClose?.(event); // 自动重连 this.reconnect(url); }; // 连接错误 this.ws.onerror = (error) => { console.error("WebSocket连接错误:", error); this.callbacks.onError?.(error); // 自动重连 this.reconnect(url); }; } /** * 发送消息 * @param {String} message 消息内容 */ sendMessage(message) { if (this.ws && this.ws.readyState === WebSocket.OPEN) { this.ws.send(message); } else { this.callbacks.onError?.("WebSocket连接未建立,无法发送消息"); } } /** * 关闭WebSocket连接 */ close() { if (this.ws) { this.ws.close(); this.ws = null; // 关闭后不再重连 this.reconnectTimes = this.maxReconnectTimes + 1; } } /** * 自动重连 * @param {String} url 连接地址 */ reconnect(url) { if (this.reconnectTimes >= this.maxReconnectTimes) { this.callbacks.onError?.("WebSocket重连次数已达上限,停止重连"); return; } this.reconnectTimes++; console.log(`WebSocket重连中,第${this.reconnectTimes}次`); setTimeout(() => { this.connect(url); }, this.reconnectInterval); } /** * 显示桌面通知 * @param {Object} message 消息对象 */ showNotification(message) { // 判断浏览器是否支持通知 if (!("Notification" in window)) { return; } // 请求通知权限 if (Notification.permission !== "granted") { Notification.requestPermission().then((permission) => { if (permission === "granted") { this.createNotification(message); } }); } else { this.createNotification(message); } } /** * 创建桌面通知 * @param {Object} message 消息对象 */ createNotification(message) { new Notification("系统通知", { body: message.content, icon: "https://img-blog.csdnimg.cn/202407201450001.png" // 通知图标(可选) }); } } // 导出单例 export default new WebSocketClient();2. Vue 组件中使用 WebSocket(components/Message.vue)
在组件中初始化 WebSocket 连接,处理消息接收和界面渲染:
<template> <div class="message-container"> <el-card title="WebSocket实时消息"> <!-- 消息列表 --> <div class="message-list"> <el-alert v-for="(msg, index) in messageList" :key="index" :title="msg.type === 'broadcast' ? '广播消息' : '系统消息'" :message="msg.content" :type="msg.type === 'error' ? 'error' : 'info'" show-icon style="margin-bottom: 10px" /> </div> <!-- 发送消息 --> <el-input v-model="inputMessage" placeholder="请输入消息内容" style="margin-top: 20px" @keyup.enter="sendMessage" /> <el-button type="primary" style="margin-top: 10px" @click="sendMessage">发送消息</el-button> </el-card> </div> </template> <script setup> import { ref, onMounted, onUnmounted } from 'vue'; import wsClient from '@/utils/websocket'; // 消息列表 const messageList = ref([]); // 输入的消息内容 const inputMessage = ref(''); /** * 初始化WebSocket */ const initWebSocket = () => { // WebSocket连接地址(带token认证) const wsUrl = 'ws://localhost:8080/ws/message?token=admin123'; wsClient.init(wsUrl, { onOpen: () => { messageList.value.push({ type: 'info', content: 'WebSocket连接成功' }); }, onMessage: (message) => { console.log('收到消息:', message); messageList.value.push(message); // 滚动到最新消息 const messageListEl = document.querySelector('.message-list'); messageListEl.scrollTop = messageListEl.scrollHeight; }, onClose: (event) => { messageList.value.push({ type: 'warning', content: `WebSocket连接关闭,状态码:${event.code}` }); }, onError: (error) => { messageList.value.push({ type: 'error', content: `WebSocket错误:${error}` }); } }); }; /** * 发送消息 */ const sendMessage = () => { if (!inputMessage.value) { return; } wsClient.sendMessage(inputMessage.value); inputMessage.value = ''; }; // 组件挂载时初始化WebSocket onMounted(() => { initWebSocket(); }); // 组件卸载时关闭WebSocket onUnmounted(() => { wsClient.close(); }); </script> <style scoped> .message-container { width: 600px; margin: 50px auto; } .message-list { height: 400px; overflow-y: auto; border: 1px solid #e6e6e6; padding: 10px; border-radius: 4px; } </style>四、功能测试与验证
- 启动 Spring Boot 项目,访问
http://localhost:8080/api/message/broadcast/Hello%20WebSocket,前端会收到广播消息并弹出桌面通知; - 访问 Vue 组件页面,输入消息并发送,后端会接收消息并广播给所有在线用户;
- 访问
http://localhost:8080/api/message/send/1001/您有新的订单提醒,指定用户会收到专属消息。
五、生产环境优化建议
- 连接管理:
- 使用 Redis 存储用户会话(分布式场景下,单节点的 ConcurrentHashMap 无法共享会话),结合 Spring Session 实现分布式 WebSocket;
- 设置连接超时时间,清理长时间空闲的连接,避免服务器资源浪费。
- 消息可靠性:
- 实现消息持久化(比如使用 RabbitMQ),若客户端离线,消息可暂存,待客户端重连后补发;
- 增加消息确认机制(客户端收到消息后回复 ACK,服务器未收到则重发)。
- 安全防护:
- 生产环境使用
wss://协议(WebSocket over SSL),防止消息被窃听; - 加强 token 认证,使用 JWT 生成 token,设置过期时间,避免 token 泄露导致的安全问题;
- 限制单用户的连接数,防止恶意攻击。
- 生产环境使用
- 性能优化:
- 对消息进行压缩(如使用 gzip),减少网络传输量;
- 使用线程池处理消息业务逻辑,避免阻塞 WebSocket 线程。
六、总结
本文通过 Spring Boot 和 Vue3 的集成,完整实现了 WebSocket 实时消息推送功能,包括用户认证、指定用户推送、广播消息、桌面通知和自动重连等核心特性。相比传统的轮询方式,WebSocket 能显著提升实时通信的性能和用户体验。
在实际项目中,可根据业务需求扩展功能 —— 比如添加消息历史记录、实现群聊功能、结合 MQTT 协议支持物联网设备的实时通信等。同时,生产环境中需重点关注连接管理、消息可靠性和安全防护,确保系统稳定运行。