// 文件路径:src/main/java/com/ruoyi/api/WebSocketServerHandler.java package com.ruoyi.api; import org.springframework.stereotype.Component; import org.springframework.web.socket.*; import java.io.IOException; import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** * WebSocket 服务处理器 * 支持:按 userId 建立连接、发送消息、广播、查询在线用户 */ @Component public class WebSocketServerHandler implements WebSocketHandler { // 存储 userId -> WebSocketSession 映射(线程安全) private static final Map userSessionMap = new ConcurrentHashMap<>(); // 存储 session -> userId 映射(便于断开时清理) private static final Map sessionToUserMap = new ConcurrentHashMap<>(); /** * 广播消息给所有在线用户 */ public static void broadcastMessage(String message) { Objects.requireNonNull(message, "消息内容不能为空"); userSessionMap.forEach((userId, session) -> { if (session.isOpen()) { try { session.sendMessage(new TextMessage(message)); } catch (IOException e) { System.err.println("广播消息失败 - 用户: " + userId + ", 错误: " + e.getMessage()); // 清理失效连接 cleanupSession(session, userId); } } }); } /** * 发送消息给指定用户 */ public static void sendMessageToUser(String userId, String message) { Objects.requireNonNull(userId, "userId 不能为空"); Objects.requireNonNull(message, "消息内容不能为空"); WebSocketSession session = userSessionMap.get(userId); if (session != null && session.isOpen()) { try { session.sendMessage(new TextMessage(message)); } catch (IOException e) { System.err.println("发送消息失败 - 用户: " + userId + ", 错误: " + e.getMessage()); cleanupSession(session, userId); } } else { System.out.println("⚠️ 用户未连接或已断开: userId=" + userId); } } /** * 获取当前在线用户数 */ public static int getOnlineUserCount() { return userSessionMap.size(); } /** * 获取所有在线用户 ID */ public static Set getOnlineUserIds() { return new HashSet<>(userSessionMap.keySet()); } /** * 从 WebSocketSession 中提取 userId */ private String extractUserId(WebSocketSession session) { URI uri = session.getUri(); if (uri == null) return null; String query = uri.getQuery(); // 格式: userId=123&token=abc if (query == null || query.isEmpty()) return null; return Arrays.stream(query.split("&")) .map(param -> param.split("=", 2)) .filter(arr -> arr.length == 2) .filter(arr -> "userId".equals(arr[0])) .map(arr -> arr[1]) .findFirst() .orElse(null); } /** * 清理失效的 session */ private static void cleanupSession(WebSocketSession session, String userId) { userSessionMap.remove(userId); sessionToUserMap.remove(session.getId()); try { if (session.isOpen()) { session.close(); } } catch (IOException e) { System.err.println("关闭 session 失败: " + e.getMessage()); } } // ======================================== // WebSocket 回调方法 // ======================================== @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { String userId = extractUserId(session); if (userId == null || userId.trim().isEmpty()) { System.err.println("❌ 连接拒绝 - 缺少 userId: " + session.getId()); session.close(CloseStatus.NOT_ACCEPTABLE.withReason("Missing userId")); return; } // 防止重复连接(可选) if (userSessionMap.containsKey(userId)) { System.out.println("🔄 用户重新连接: userId=" + userId); WebSocketSession oldSession = userSessionMap.get(userId); if (oldSession.isOpen()) { try { oldSession.close(); } catch (IOException e) { System.err.println("关闭旧连接失败: " + e.getMessage()); } } } // 保存映射 userSessionMap.put(userId, session); sessionToUserMap.put(session.getId(), userId); // 记录日志 System.out.println("✅ 新用户连接: userId=" + userId + ", sessionId=" + session.getId()); // 可选:发送欢迎消息 session.sendMessage(new TextMessage("{\"type\":\"welcome\",\"msg\":\"欢迎, " + userId + "!\"}")); } @Override public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { String userId = sessionToUserMap.get(session.getId()); System.out.println(message); } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { String userId = sessionToUserMap.get(session.getId()); System.err.println("❌ 传输错误 - 用户: " + userId + ", sessionId: " + session.getId() + ", 错误: " + exception.getMessage()); cleanupSession(session, userId); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { String userId = sessionToUserMap.get(session.getId()); if (userId != null) { userSessionMap.remove(userId); sessionToUserMap.remove(session.getId()); System.out.println("⏹️ 用户断开: userId=" + userId + ", 原因: " + closeStatus.getReason()); } } @Override public boolean supportsPartialMessages() { return false; } }