20 changed files with 946 additions and 26 deletions
@ -0,0 +1,27 @@ |
|||
package com.ruoyi.websocket.config; |
|||
|
|||
import java.security.Principal; |
|||
import com.ruoyi.common.core.domain.model.LoginUser; |
|||
|
|||
/** |
|||
* 将 LoginUser 包装为 Principal,供 WebSocket 使用 |
|||
* |
|||
* @author ruoyi |
|||
*/ |
|||
public class LoginUserPrincipal implements Principal { |
|||
|
|||
private final LoginUser loginUser; |
|||
|
|||
public LoginUserPrincipal(LoginUser loginUser) { |
|||
this.loginUser = loginUser; |
|||
} |
|||
|
|||
@Override |
|||
public String getName() { |
|||
return loginUser != null ? loginUser.getUsername() : null; |
|||
} |
|||
|
|||
public LoginUser getLoginUser() { |
|||
return loginUser; |
|||
} |
|||
} |
|||
@ -0,0 +1,37 @@ |
|||
package com.ruoyi.websocket.config; |
|||
|
|||
import java.util.Map; |
|||
import org.springframework.core.Ordered; |
|||
import org.springframework.core.annotation.Order; |
|||
import org.springframework.messaging.Message; |
|||
import org.springframework.messaging.MessageChannel; |
|||
import org.springframework.messaging.simp.stomp.StompCommand; |
|||
import org.springframework.messaging.simp.stomp.StompHeaderAccessor; |
|||
import org.springframework.messaging.support.ChannelInterceptor; |
|||
import org.springframework.messaging.support.MessageHeaderAccessor; |
|||
import org.springframework.stereotype.Component; |
|||
import com.ruoyi.common.core.domain.model.LoginUser; |
|||
import com.ruoyi.websocket.config.LoginUserPrincipal; |
|||
|
|||
/** |
|||
* WebSocket 通道拦截器:将握手时的 loginUser 注入到消息头 |
|||
* |
|||
* @author ruoyi |
|||
*/ |
|||
@Component |
|||
@Order(Ordered.HIGHEST_PRECEDENCE + 99) |
|||
public class WebSocketChannelInterceptor implements ChannelInterceptor { |
|||
|
|||
@Override |
|||
public Message<?> preSend(Message<?> message, MessageChannel channel) { |
|||
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); |
|||
if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) { |
|||
Map<String, Object> sessionAttrs = accessor.getSessionAttributes(); |
|||
if (sessionAttrs != null && sessionAttrs.containsKey("loginUser")) { |
|||
LoginUser loginUser = (LoginUser) sessionAttrs.get("loginUser"); |
|||
accessor.setUser(new LoginUserPrincipal(loginUser)); |
|||
} |
|||
} |
|||
return message; |
|||
} |
|||
} |
|||
@ -0,0 +1,47 @@ |
|||
package com.ruoyi.websocket.config; |
|||
|
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.context.annotation.Configuration; |
|||
import org.springframework.core.Ordered; |
|||
import org.springframework.core.annotation.Order; |
|||
import org.springframework.messaging.simp.config.ChannelRegistration; |
|||
import org.springframework.messaging.simp.config.MessageBrokerRegistry; |
|||
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; |
|||
import org.springframework.web.socket.config.annotation.StompEndpointRegistry; |
|||
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; |
|||
|
|||
/** |
|||
* WebSocket STOMP 配置 |
|||
* |
|||
* @author ruoyi |
|||
*/ |
|||
@Configuration |
|||
@EnableWebSocketMessageBroker |
|||
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { |
|||
|
|||
@Autowired |
|||
private WebSocketHandshakeHandler webSocketHandshakeHandler; |
|||
|
|||
@Autowired |
|||
private WebSocketChannelInterceptor webSocketChannelInterceptor; |
|||
|
|||
@Override |
|||
public void configureMessageBroker(MessageBrokerRegistry config) { |
|||
config.enableSimpleBroker("/topic", "/queue"); |
|||
config.setApplicationDestinationPrefixes("/app"); |
|||
config.setUserDestinationPrefix("/user"); |
|||
} |
|||
|
|||
@Override |
|||
public void registerStompEndpoints(StompEndpointRegistry registry) { |
|||
registry.addEndpoint("/ws") |
|||
.setAllowedOriginPatterns("*") |
|||
.addInterceptors(webSocketHandshakeHandler) |
|||
.withSockJS(); |
|||
} |
|||
|
|||
@Override |
|||
public void configureClientInboundChannel(ChannelRegistration registration) { |
|||
registration.interceptors(webSocketChannelInterceptor); |
|||
} |
|||
} |
|||
@ -0,0 +1,80 @@ |
|||
package com.ruoyi.websocket.config; |
|||
|
|||
import java.util.Collections; |
|||
import java.util.Enumeration; |
|||
import java.util.Map; |
|||
import javax.servlet.http.HttpServletRequest; |
|||
import javax.servlet.http.HttpServletRequestWrapper; |
|||
import org.springframework.http.server.ServerHttpRequest; |
|||
import org.springframework.http.server.ServerHttpResponse; |
|||
import org.springframework.http.server.ServletServerHttpRequest; |
|||
import org.springframework.stereotype.Component; |
|||
import org.springframework.web.socket.WebSocketHandler; |
|||
import org.springframework.web.socket.server.HandshakeInterceptor; |
|||
import com.ruoyi.common.constant.Constants; |
|||
import com.ruoyi.common.core.domain.model.LoginUser; |
|||
import com.ruoyi.common.utils.StringUtils; |
|||
import com.ruoyi.framework.web.service.TokenService; |
|||
|
|||
/** |
|||
* WebSocket 握手拦截器:校验 JWT Token(支持 query 参数 token 或 Authorization header) |
|||
* |
|||
* @author ruoyi |
|||
*/ |
|||
@Component |
|||
public class WebSocketHandshakeHandler implements HandshakeInterceptor { |
|||
|
|||
private final TokenService tokenService; |
|||
|
|||
public WebSocketHandshakeHandler(TokenService tokenService) { |
|||
this.tokenService = tokenService; |
|||
} |
|||
|
|||
@Override |
|||
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, |
|||
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { |
|||
if (!(request instanceof ServletServerHttpRequest)) { |
|||
return false; |
|||
} |
|||
ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request; |
|||
HttpServletRequest req = servletRequest.getServletRequest(); |
|||
String token = req.getParameter("token"); |
|||
if (StringUtils.isEmpty(token)) { |
|||
String auth = req.getHeader("Authorization"); |
|||
if (StringUtils.isNotEmpty(auth) && auth.startsWith(Constants.TOKEN_PREFIX)) { |
|||
token = auth.substring(Constants.TOKEN_PREFIX.length()).trim(); |
|||
} |
|||
} |
|||
if (StringUtils.isEmpty(token)) { |
|||
return false; |
|||
} |
|||
final String tokenFinal = token; |
|||
HttpServletRequest wrappedReq = new HttpServletRequestWrapper(req) { |
|||
@Override |
|||
public String getHeader(String name) { |
|||
if ("Authorization".equalsIgnoreCase(name)) { |
|||
return Constants.TOKEN_PREFIX + tokenFinal; |
|||
} |
|||
return super.getHeader(name); |
|||
} |
|||
@Override |
|||
public Enumeration<String> getHeaders(String name) { |
|||
if ("Authorization".equalsIgnoreCase(name)) { |
|||
return Collections.enumeration(Collections.singletonList(Constants.TOKEN_PREFIX + tokenFinal)); |
|||
} |
|||
return super.getHeaders(name); |
|||
} |
|||
}; |
|||
LoginUser loginUser = tokenService.getLoginUser(wrappedReq); |
|||
if (loginUser == null) { |
|||
return false; |
|||
} |
|||
attributes.put("loginUser", loginUser); |
|||
return true; |
|||
} |
|||
|
|||
@Override |
|||
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, |
|||
WebSocketHandler wsHandler, Exception exception) { |
|||
} |
|||
} |
|||
@ -0,0 +1,146 @@ |
|||
package com.ruoyi.websocket.controller; |
|||
|
|||
import java.util.HashMap; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.UUID; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.messaging.handler.annotation.DestinationVariable; |
|||
import org.springframework.messaging.handler.annotation.MessageMapping; |
|||
import org.springframework.messaging.handler.annotation.Payload; |
|||
import org.springframework.messaging.simp.SimpMessageHeaderAccessor; |
|||
import org.springframework.messaging.simp.SimpMessagingTemplate; |
|||
import org.springframework.stereotype.Controller; |
|||
import com.alibaba.fastjson2.JSON; |
|||
import com.ruoyi.common.core.domain.model.LoginUser; |
|||
import com.ruoyi.websocket.config.LoginUserPrincipal; |
|||
import com.ruoyi.common.utils.StringUtils; |
|||
import com.ruoyi.system.domain.Rooms; |
|||
import com.ruoyi.system.service.IRoomsService; |
|||
import com.ruoyi.websocket.dto.RoomMemberDTO; |
|||
import com.ruoyi.websocket.service.RoomWebSocketService; |
|||
|
|||
/** |
|||
* WebSocket 房间消息控制器 |
|||
* |
|||
* @author ruoyi |
|||
*/ |
|||
@Controller |
|||
public class RoomWebSocketController { |
|||
|
|||
@Autowired |
|||
private SimpMessagingTemplate messagingTemplate; |
|||
|
|||
@Autowired |
|||
private RoomWebSocketService roomWebSocketService; |
|||
|
|||
@Autowired |
|||
private IRoomsService roomsService; |
|||
|
|||
private static final String TYPE_JOIN = "JOIN"; |
|||
private static final String TYPE_LEAVE = "LEAVE"; |
|||
private static final String TYPE_PING = "PING"; |
|||
private static final String TYPE_MEMBER_JOINED = "MEMBER_JOINED"; |
|||
private static final String TYPE_MEMBER_LEFT = "MEMBER_LEFT"; |
|||
private static final String TYPE_MEMBER_LIST = "MEMBER_LIST"; |
|||
private static final String TYPE_PONG = "PONG"; |
|||
|
|||
/** |
|||
* 处理房间消息:JOIN、LEAVE、PING |
|||
*/ |
|||
@MessageMapping("/room/{roomId}") |
|||
public void handleRoomMessage(@DestinationVariable Long roomId, @Payload String payload, |
|||
SimpMessageHeaderAccessor accessor) { |
|||
LoginUser loginUser = null; |
|||
if (accessor.getUser() instanceof LoginUserPrincipal) { |
|||
loginUser = ((LoginUserPrincipal) accessor.getUser()).getLoginUser(); |
|||
} |
|||
if (loginUser == null) return; |
|||
|
|||
Map<String, Object> body = parsePayload(payload); |
|||
if (body == null) return; |
|||
|
|||
String type = (String) body.get("type"); |
|||
String sessionId = accessor.getSessionId(); |
|||
if (sessionId == null) sessionId = UUID.randomUUID().toString(); |
|||
|
|||
if (TYPE_JOIN.equals(type)) { |
|||
handleJoin(roomId, sessionId, loginUser, body); |
|||
} else if (TYPE_LEAVE.equals(type)) { |
|||
handleLeave(roomId, sessionId, loginUser); |
|||
} else if (TYPE_PING.equals(type)) { |
|||
handlePing(roomId, sessionId, loginUser); |
|||
} |
|||
} |
|||
|
|||
@SuppressWarnings("unchecked") |
|||
private Map<String, Object> parsePayload(String payload) { |
|||
try { |
|||
return JSON.parseObject(payload, Map.class); |
|||
} catch (Exception e) { |
|||
return null; |
|||
} |
|||
} |
|||
|
|||
private void handleJoin(Long roomId, String sessionId, LoginUser loginUser, Map<String, Object> body) { |
|||
Rooms room = roomsService.selectRoomsById(roomId); |
|||
if (room == null) return; |
|||
|
|||
RoomMemberDTO member = buildMember(loginUser, sessionId, roomId, body); |
|||
roomWebSocketService.joinRoom(roomId, sessionId, member); |
|||
|
|||
List<RoomMemberDTO> members = roomWebSocketService.getRoomMembers(roomId); |
|||
Map<String, Object> memberListMsg = new HashMap<>(); |
|||
memberListMsg.put("type", TYPE_MEMBER_LIST); |
|||
memberListMsg.put("members", members); |
|||
|
|||
Map<String, Object> joinedMsg = new HashMap<>(); |
|||
joinedMsg.put("type", TYPE_MEMBER_JOINED); |
|||
joinedMsg.put("member", member); |
|||
|
|||
String topic = "/topic/room/" + roomId; |
|||
messagingTemplate.convertAndSend(topic, memberListMsg); |
|||
} |
|||
|
|||
private void handleLeave(Long roomId, String sessionId, LoginUser loginUser) { |
|||
RoomMemberDTO member = buildMember(loginUser, sessionId, roomId, null); |
|||
roomWebSocketService.leaveRoom(roomId, sessionId, loginUser.getUserId()); |
|||
|
|||
Map<String, Object> msg = new HashMap<>(); |
|||
msg.put("type", TYPE_MEMBER_LEFT); |
|||
msg.put("member", member); |
|||
msg.put("sessionId", sessionId); |
|||
|
|||
messagingTemplate.convertAndSend("/topic/room/" + roomId, msg); |
|||
} |
|||
|
|||
private void handlePing(Long roomId, String sessionId, LoginUser loginUser) { |
|||
roomWebSocketService.refreshSessionHeartbeat(sessionId); |
|||
|
|||
Map<String, Object> msg = new HashMap<>(); |
|||
msg.put("type", TYPE_PONG); |
|||
|
|||
messagingTemplate.convertAndSendToUser(loginUser.getUsername(), "/queue/private", msg); |
|||
} |
|||
|
|||
private RoomMemberDTO buildMember(LoginUser loginUser, String sessionId, Long roomId, Map<String, Object> body) { |
|||
RoomMemberDTO dto = new RoomMemberDTO(); |
|||
dto.setUserId(loginUser.getUserId()); |
|||
dto.setUserName(loginUser.getUsername()); |
|||
dto.setNickName(loginUser.getUser().getNickName()); |
|||
dto.setAvatar(loginUser.getUser().getAvatar()); |
|||
dto.setSessionId(sessionId); |
|||
dto.setDeviceId(body != null && body.containsKey("deviceId") ? String.valueOf(body.get("deviceId")) : "default"); |
|||
dto.setJoinedAt(System.currentTimeMillis()); |
|||
|
|||
Rooms room = roomsService.selectRoomsById(roomId); |
|||
if (room != null && loginUser.getUserId().equals(room.getOwnerId())) { |
|||
dto.setRole("owner"); |
|||
} else if (StringUtils.isNotEmpty(loginUser.getUser().getUserLevel())) { |
|||
dto.setRole(loginUser.getUser().getUserLevel()); |
|||
} else { |
|||
dto.setRole("member"); |
|||
} |
|||
return dto; |
|||
} |
|||
} |
|||
@ -0,0 +1,93 @@ |
|||
package com.ruoyi.websocket.dto; |
|||
|
|||
import java.io.Serializable; |
|||
|
|||
/** |
|||
* WebSocket 房间成员信息 DTO(仅用于 Redis 序列化,非数据库实体) |
|||
* |
|||
* @author ruoyi |
|||
*/ |
|||
public class RoomMemberDTO implements Serializable { |
|||
private static final long serialVersionUID = 1L; |
|||
|
|||
/** 用户ID */ |
|||
private Long userId; |
|||
/** 用户账号 */ |
|||
private String userName; |
|||
/** 用户昵称 */ |
|||
private String nickName; |
|||
/** 头像地址 */ |
|||
private String avatar; |
|||
/** WebSocket 会话ID */ |
|||
private String sessionId; |
|||
/** 设备标识 */ |
|||
private String deviceId; |
|||
/** 加入时间戳 */ |
|||
private Long joinedAt; |
|||
/** 角色:owner=房主, admin=管理员, member=成员 */ |
|||
private String role; |
|||
|
|||
public Long getUserId() { |
|||
return userId; |
|||
} |
|||
|
|||
public void setUserId(Long userId) { |
|||
this.userId = userId; |
|||
} |
|||
|
|||
public String getUserName() { |
|||
return userName; |
|||
} |
|||
|
|||
public void setUserName(String userName) { |
|||
this.userName = userName; |
|||
} |
|||
|
|||
public String getNickName() { |
|||
return nickName; |
|||
} |
|||
|
|||
public void setNickName(String nickName) { |
|||
this.nickName = nickName; |
|||
} |
|||
|
|||
public String getAvatar() { |
|||
return avatar; |
|||
} |
|||
|
|||
public void setAvatar(String avatar) { |
|||
this.avatar = avatar; |
|||
} |
|||
|
|||
public String getSessionId() { |
|||
return sessionId; |
|||
} |
|||
|
|||
public void setSessionId(String sessionId) { |
|||
this.sessionId = sessionId; |
|||
} |
|||
|
|||
public String getDeviceId() { |
|||
return deviceId; |
|||
} |
|||
|
|||
public void setDeviceId(String deviceId) { |
|||
this.deviceId = deviceId; |
|||
} |
|||
|
|||
public Long getJoinedAt() { |
|||
return joinedAt; |
|||
} |
|||
|
|||
public void setJoinedAt(Long joinedAt) { |
|||
this.joinedAt = joinedAt; |
|||
} |
|||
|
|||
public String getRole() { |
|||
return role; |
|||
} |
|||
|
|||
public void setRole(String role) { |
|||
this.role = role; |
|||
} |
|||
} |
|||
@ -0,0 +1,33 @@ |
|||
package com.ruoyi.websocket.dto; |
|||
|
|||
import java.io.Serializable; |
|||
|
|||
/** |
|||
* WebSocket 会话与房间关联信息(用于断开连接时清理) |
|||
* |
|||
* @author ruoyi |
|||
*/ |
|||
public class RoomSessionInfo implements Serializable { |
|||
private static final long serialVersionUID = 1L; |
|||
|
|||
/** 房间ID */ |
|||
private Long roomId; |
|||
/** 成员信息 */ |
|||
private RoomMemberDTO member; |
|||
|
|||
public Long getRoomId() { |
|||
return roomId; |
|||
} |
|||
|
|||
public void setRoomId(Long roomId) { |
|||
this.roomId = roomId; |
|||
} |
|||
|
|||
public RoomMemberDTO getMember() { |
|||
return member; |
|||
} |
|||
|
|||
public void setMember(RoomMemberDTO member) { |
|||
this.member = member; |
|||
} |
|||
} |
|||
@ -0,0 +1,46 @@ |
|||
package com.ruoyi.websocket.listener; |
|||
|
|||
import java.util.HashMap; |
|||
import java.util.Map; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.context.ApplicationListener; |
|||
import org.springframework.messaging.simp.SimpMessagingTemplate; |
|||
import org.springframework.stereotype.Component; |
|||
import org.springframework.web.socket.messaging.SessionDisconnectEvent; |
|||
import com.ruoyi.websocket.dto.RoomSessionInfo; |
|||
import com.ruoyi.websocket.service.RoomWebSocketService; |
|||
|
|||
/** |
|||
* WebSocket 断开连接监听器 |
|||
* 当客户端断开(刷新、关闭标签页、网络异常等)时,自动从房间移除该会话,避免同一账号重复出现在在线列表中 |
|||
* |
|||
* @author ruoyi |
|||
*/ |
|||
@Component |
|||
public class WebSocketDisconnectListener implements ApplicationListener<SessionDisconnectEvent> { |
|||
|
|||
private static final String TYPE_MEMBER_LEFT = "MEMBER_LEFT"; |
|||
|
|||
@Autowired |
|||
private RoomWebSocketService roomWebSocketService; |
|||
|
|||
@Autowired |
|||
private SimpMessagingTemplate messagingTemplate; |
|||
|
|||
@Override |
|||
public void onApplicationEvent(SessionDisconnectEvent event) { |
|||
String sessionId = event.getSessionId(); |
|||
if (sessionId == null) return; |
|||
|
|||
RoomSessionInfo info = roomWebSocketService.leaveBySessionId(sessionId); |
|||
if (info == null) return; |
|||
|
|||
Map<String, Object> msg = new HashMap<>(); |
|||
msg.put("type", TYPE_MEMBER_LEFT); |
|||
msg.put("member", info.getMember()); |
|||
msg.put("sessionId", sessionId); |
|||
|
|||
String topic = "/topic/room/" + info.getRoomId(); |
|||
messagingTemplate.convertAndSend(topic, msg); |
|||
} |
|||
} |
|||
@ -0,0 +1,158 @@ |
|||
package com.ruoyi.websocket.service; |
|||
|
|||
import java.util.ArrayList; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.concurrent.TimeUnit; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.beans.factory.annotation.Qualifier; |
|||
import org.springframework.data.redis.core.RedisTemplate; |
|||
import org.springframework.stereotype.Service; |
|||
import com.alibaba.fastjson2.JSON; |
|||
import com.ruoyi.common.core.redis.RedisCache; |
|||
import com.ruoyi.websocket.dto.RoomMemberDTO; |
|||
import com.ruoyi.websocket.dto.RoomSessionInfo; |
|||
|
|||
/** |
|||
* WebSocket 房间 Redis 管理服务 |
|||
* |
|||
* @author ruoyi |
|||
*/ |
|||
@Service |
|||
public class RoomWebSocketService { |
|||
|
|||
private static final String ROOM_MEMBERS_PREFIX = "room:"; |
|||
private static final String ROOM_MEMBERS_SUFFIX = ":members"; |
|||
private static final String USER_SESSIONS_PREFIX = "user:"; |
|||
private static final String USER_SESSIONS_SUFFIX = ":sessions"; |
|||
private static final String SESSION_PREFIX = "session:"; |
|||
private static final int SESSION_EXPIRE_MINUTES = 30; |
|||
|
|||
@Autowired |
|||
private RedisCache redisCache; |
|||
|
|||
@Autowired |
|||
@Qualifier("stringObjectRedisTemplate") |
|||
private RedisTemplate<String, Object> redisTemplate; |
|||
|
|||
private String roomMembersKey(Long roomId) { |
|||
return ROOM_MEMBERS_PREFIX + roomId + ROOM_MEMBERS_SUFFIX; |
|||
} |
|||
|
|||
private String userSessionsKey(Long userId) { |
|||
return USER_SESSIONS_PREFIX + userId + USER_SESSIONS_SUFFIX; |
|||
} |
|||
|
|||
private String sessionKey(String sessionId) { |
|||
return SESSION_PREFIX + sessionId; |
|||
} |
|||
|
|||
/** |
|||
* 用户加入房间。 |
|||
* 同一用户只保留最新会话,加入前会先移除该用户在房间内的所有旧会话,避免刷新/重连后重复显示。 |
|||
*/ |
|||
public void joinRoom(Long roomId, String sessionId, RoomMemberDTO member) { |
|||
removeStaleSessionsForUser(roomId, member.getUserId(), sessionId); |
|||
|
|||
String key = roomMembersKey(roomId); |
|||
redisCache.setCacheMapValue(key, sessionId, member); |
|||
redisCache.expire(key, 24, TimeUnit.HOURS); |
|||
|
|||
redisTemplate.opsForSet().add(userSessionsKey(member.getUserId()), sessionId); |
|||
|
|||
RoomSessionInfo sessionInfo = new RoomSessionInfo(); |
|||
sessionInfo.setRoomId(roomId); |
|||
sessionInfo.setMember(member); |
|||
redisCache.setCacheObject(sessionKey(sessionId), sessionInfo, SESSION_EXPIRE_MINUTES, TimeUnit.MINUTES); |
|||
} |
|||
|
|||
/** |
|||
* 移除同一用户在房间内的旧会话(历史残留,如服务重启前未收到断开事件;前端每次加载生成新 deviceId 导致无法按设备匹配) |
|||
*/ |
|||
private void removeStaleSessionsForUser(Long roomId, Long userId, String currentSessionId) { |
|||
String key = roomMembersKey(roomId); |
|||
Map<String, Object> map = redisCache.getCacheMap(key); |
|||
if (map == null || map.isEmpty()) return; |
|||
|
|||
List<String> toRemove = new ArrayList<>(); |
|||
for (Map.Entry<String, Object> e : map.entrySet()) { |
|||
String sid = e.getKey(); |
|||
if (sid.equals(currentSessionId)) continue; |
|||
Object val = e.getValue(); |
|||
RoomMemberDTO dto = val instanceof RoomMemberDTO ? (RoomMemberDTO) val |
|||
: JSON.parseObject(JSON.toJSONString(val), RoomMemberDTO.class); |
|||
if (dto != null && userId.equals(dto.getUserId())) { |
|||
toRemove.add(sid); |
|||
} |
|||
} |
|||
for (String sid : toRemove) { |
|||
Object val = redisCache.getCacheMapValue(key, sid); |
|||
RoomMemberDTO dto = val instanceof RoomMemberDTO ? (RoomMemberDTO) val |
|||
: (val != null ? JSON.parseObject(JSON.toJSONString(val), RoomMemberDTO.class) : null); |
|||
leaveRoom(roomId, sid, dto != null ? dto.getUserId() : null); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 用户离开房间 |
|||
*/ |
|||
public void leaveRoom(Long roomId, String sessionId, Long userId) { |
|||
String key = roomMembersKey(roomId); |
|||
redisCache.deleteCacheMapValue(key, sessionId); |
|||
|
|||
if (userId != null) { |
|||
redisTemplate.opsForSet().remove(userSessionsKey(userId), sessionId); |
|||
} |
|||
|
|||
redisCache.deleteObject(sessionKey(sessionId)); |
|||
} |
|||
|
|||
/** |
|||
* 获取房间成员列表 |
|||
*/ |
|||
public List<RoomMemberDTO> getRoomMembers(Long roomId) { |
|||
String key = roomMembersKey(roomId); |
|||
Map<String, Object> map = redisCache.getCacheMap(key); |
|||
List<RoomMemberDTO> list = new ArrayList<>(); |
|||
if (map != null && !map.isEmpty()) { |
|||
for (Object val : map.values()) { |
|||
if (val != null) { |
|||
RoomMemberDTO dto = val instanceof RoomMemberDTO ? (RoomMemberDTO) val : JSON.parseObject(JSON.toJSONString(val), RoomMemberDTO.class); |
|||
if (dto != null) list.add(dto); |
|||
} |
|||
} |
|||
} |
|||
return list; |
|||
} |
|||
|
|||
/** |
|||
* 刷新会话心跳(延长过期时间) |
|||
*/ |
|||
public void refreshSessionHeartbeat(String sessionId) { |
|||
String key = sessionKey(sessionId); |
|||
Object val = redisCache.getCacheObject(key); |
|||
if (val != null) { |
|||
redisCache.setCacheObject(key, val, SESSION_EXPIRE_MINUTES, TimeUnit.MINUTES); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 根据会话ID离开房间(用于连接断开时清理) |
|||
* |
|||
* @param sessionId 会话ID |
|||
* @return 若该会话在房间中,返回会话信息(含 roomId、member)用于广播;否则返回 null |
|||
*/ |
|||
public RoomSessionInfo leaveBySessionId(String sessionId) { |
|||
Object val = redisCache.getCacheObject(sessionKey(sessionId)); |
|||
if (val == null) return null; |
|||
|
|||
RoomSessionInfo info = val instanceof RoomSessionInfo ? (RoomSessionInfo) val |
|||
: JSON.parseObject(JSON.toJSONString(val), RoomSessionInfo.class); |
|||
if (info == null || info.getRoomId() == null || info.getMember() == null) return null; |
|||
|
|||
Long roomId = info.getRoomId(); |
|||
RoomMemberDTO member = info.getMember(); |
|||
leaveRoom(roomId, sessionId, member.getUserId()); |
|||
return info; |
|||
} |
|||
} |
|||
@ -0,0 +1,167 @@ |
|||
/** |
|||
* WebSocket 房间连接服务(SockJS + STOMP) |
|||
*/ |
|||
import SockJS from 'sockjs-client' |
|||
import { Client } from '@stomp/stompjs' |
|||
import { getToken } from '@/utils/auth' |
|||
|
|||
const WS_BASE = process.env.VUE_APP_BASE_API || '/dev-api' |
|||
|
|||
/** |
|||
* 创建房间 WebSocket 连接 |
|||
* @param {Object} options |
|||
* @param {string|number} options.roomId - 房间 ID |
|||
* @param {Function} options.onMembers - 收到成员列表回调 (members) => {} |
|||
* @param {Function} options.onMemberJoined - 成员加入回调 (member) => {} |
|||
* @param {Function} options.onMemberLeft - 成员离开回调 (member, sessionId) => {} |
|||
* @param {Function} options.onConnected - 连接成功回调 |
|||
* @param {Function} options.onDisconnected - 断开回调 |
|||
* @param {Function} options.onError - 错误回调 |
|||
* @param {string} [options.deviceId] - 设备标识 |
|||
*/ |
|||
export function createRoomWebSocket(options) { |
|||
const { |
|||
roomId, |
|||
onMembers, |
|||
onMemberJoined, |
|||
onMemberLeft, |
|||
onConnected, |
|||
onDisconnected, |
|||
onError, |
|||
deviceId = 'web-' + Math.random().toString(36).slice(2, 10) |
|||
} = options |
|||
|
|||
let client = null |
|||
let subscription = null |
|||
let heartbeatTimer = null |
|||
let reconnectAttempts = 0 |
|||
const maxReconnectAttempts = 10 |
|||
const reconnectDelay = 2000 |
|||
|
|||
function getWsUrl() { |
|||
const token = getToken() |
|||
const base = window.location.origin + WS_BASE |
|||
const sep = base.includes('?') ? '&' : '?' |
|||
return base + '/ws' + (token ? sep + 'token=' + encodeURIComponent(token) : '') |
|||
} |
|||
|
|||
function sendJoin() { |
|||
if (client && client.connected) { |
|||
client.publish({ |
|||
destination: '/app/room/' + roomId, |
|||
body: JSON.stringify({ type: 'JOIN', deviceId }) |
|||
}) |
|||
} |
|||
} |
|||
|
|||
function sendLeave() { |
|||
if (client && client.connected) { |
|||
client.publish({ |
|||
destination: '/app/room/' + roomId, |
|||
body: JSON.stringify({ type: 'LEAVE' }) |
|||
}) |
|||
} |
|||
} |
|||
|
|||
function sendPing() { |
|||
if (client && client.connected) { |
|||
client.publish({ |
|||
destination: '/app/room/' + roomId, |
|||
body: JSON.stringify({ type: 'PING' }) |
|||
}) |
|||
} |
|||
} |
|||
|
|||
function startHeartbeat() { |
|||
stopHeartbeat() |
|||
heartbeatTimer = setInterval(sendPing, 30000) |
|||
} |
|||
|
|||
function stopHeartbeat() { |
|||
if (heartbeatTimer) { |
|||
clearInterval(heartbeatTimer) |
|||
heartbeatTimer = null |
|||
} |
|||
} |
|||
|
|||
function handleMessage(message) { |
|||
try { |
|||
const body = JSON.parse(message.body) |
|||
const type = body.type |
|||
if (type === 'MEMBER_LIST' && body.members) { |
|||
onMembers && onMembers(body.members) |
|||
} else if (type === 'MEMBER_JOINED' && body.member) { |
|||
onMemberJoined && onMemberJoined(body.member) |
|||
} else if (type === 'MEMBER_LEFT' && body.member) { |
|||
onMemberLeft && onMemberLeft(body.member, body.sessionId) |
|||
} |
|||
} catch (e) { |
|||
console.warn('[WebSocket] parse message error:', e) |
|||
} |
|||
} |
|||
|
|||
function connect() { |
|||
const token = getToken() |
|||
if (!token) { |
|||
onError && onError(new Error('未登录')) |
|||
return |
|||
} |
|||
|
|||
const sock = new SockJS(getWsUrl()) |
|||
client = new Client({ |
|||
webSocketFactory: () => sock, |
|||
reconnectDelay: 0, |
|||
heartbeatIncoming: 0, |
|||
heartbeatOutgoing: 0, |
|||
onConnect: () => { |
|||
reconnectAttempts = 0 |
|||
subscription = client.subscribe('/topic/room/' + roomId, handleMessage) |
|||
sendJoin() |
|||
startHeartbeat() |
|||
onConnected && onConnected() |
|||
}, |
|||
onStompError: (frame) => { |
|||
console.warn('[WebSocket] STOMP error:', frame) |
|||
onError && onError(new Error(frame.headers?.message || '连接错误')) |
|||
}, |
|||
onWebSocketClose: () => { |
|||
stopHeartbeat() |
|||
subscription = null |
|||
onDisconnected && onDisconnected() |
|||
} |
|||
}) |
|||
client.activate() |
|||
} |
|||
|
|||
function disconnect() { |
|||
stopHeartbeat() |
|||
sendLeave() |
|||
if (subscription) { |
|||
subscription.unsubscribe() |
|||
subscription = null |
|||
} |
|||
if (client) { |
|||
client.deactivate() |
|||
client = null |
|||
} |
|||
} |
|||
|
|||
function tryReconnect() { |
|||
if (reconnectAttempts >= maxReconnectAttempts) return |
|||
reconnectAttempts++ |
|||
setTimeout(() => { |
|||
disconnect() |
|||
connect() |
|||
}, reconnectDelay) |
|||
} |
|||
|
|||
connect() |
|||
|
|||
return { |
|||
disconnect, |
|||
reconnect: connect, |
|||
get connected() { |
|||
return client && client.connected |
|||
} |
|||
} |
|||
} |
|||
Loading…
Reference in new issue