You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

159 lines
5.9 KiB

package com.ruoyi.websocket.service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson2.JSON;
import com.ruoyi.common.core.redis.RedisCache;
import com.ruoyi.websocket.dto.RoomMemberDTO;
import com.ruoyi.websocket.dto.RoomSessionInfo;
/**
* WebSocket 房间 Redis 管理服务
*
* @author ruoyi
*/
@Service
public class RoomWebSocketService {
private static final String ROOM_MEMBERS_PREFIX = "room:";
private static final String ROOM_MEMBERS_SUFFIX = ":members";
private static final String USER_SESSIONS_PREFIX = "user:";
private static final String USER_SESSIONS_SUFFIX = ":sessions";
private static final String SESSION_PREFIX = "session:";
private static final int SESSION_EXPIRE_MINUTES = 30;
@Autowired
private RedisCache redisCache;
@Autowired
@Qualifier("stringObjectRedisTemplate")
private RedisTemplate<String, Object> redisTemplate;
private String roomMembersKey(Long roomId) {
return ROOM_MEMBERS_PREFIX + roomId + ROOM_MEMBERS_SUFFIX;
}
private String userSessionsKey(Long userId) {
return USER_SESSIONS_PREFIX + userId + USER_SESSIONS_SUFFIX;
}
private String sessionKey(String sessionId) {
return SESSION_PREFIX + sessionId;
}
/**
* 用户加入房间
* 同一用户只保留最新会话加入前会先移除该用户在房间内的所有旧会话避免刷新/重连后重复显示
*/
public void joinRoom(Long roomId, String sessionId, RoomMemberDTO member) {
removeStaleSessionsForUser(roomId, member.getUserId(), sessionId);
String key = roomMembersKey(roomId);
redisCache.setCacheMapValue(key, sessionId, member);
redisCache.expire(key, 24, TimeUnit.HOURS);
redisTemplate.opsForSet().add(userSessionsKey(member.getUserId()), sessionId);
RoomSessionInfo sessionInfo = new RoomSessionInfo();
sessionInfo.setRoomId(roomId);
sessionInfo.setMember(member);
redisCache.setCacheObject(sessionKey(sessionId), sessionInfo, SESSION_EXPIRE_MINUTES, TimeUnit.MINUTES);
}
/**
* 移除同一用户在房间内的旧会话历史残留如服务重启前未收到断开事件前端每次加载生成新 deviceId 导致无法按设备匹配
*/
private void removeStaleSessionsForUser(Long roomId, Long userId, String currentSessionId) {
String key = roomMembersKey(roomId);
Map<String, Object> map = redisCache.getCacheMap(key);
if (map == null || map.isEmpty()) return;
List<String> toRemove = new ArrayList<>();
for (Map.Entry<String, Object> e : map.entrySet()) {
String sid = e.getKey();
if (sid.equals(currentSessionId)) continue;
Object val = e.getValue();
RoomMemberDTO dto = val instanceof RoomMemberDTO ? (RoomMemberDTO) val
: JSON.parseObject(JSON.toJSONString(val), RoomMemberDTO.class);
if (dto != null && userId.equals(dto.getUserId())) {
toRemove.add(sid);
}
}
for (String sid : toRemove) {
Object val = redisCache.getCacheMapValue(key, sid);
RoomMemberDTO dto = val instanceof RoomMemberDTO ? (RoomMemberDTO) val
: (val != null ? JSON.parseObject(JSON.toJSONString(val), RoomMemberDTO.class) : null);
leaveRoom(roomId, sid, dto != null ? dto.getUserId() : null);
}
}
/**
* 用户离开房间
*/
public void leaveRoom(Long roomId, String sessionId, Long userId) {
String key = roomMembersKey(roomId);
redisCache.deleteCacheMapValue(key, sessionId);
if (userId != null) {
redisTemplate.opsForSet().remove(userSessionsKey(userId), sessionId);
}
redisCache.deleteObject(sessionKey(sessionId));
}
/**
* 获取房间成员列表
*/
public List<RoomMemberDTO> getRoomMembers(Long roomId) {
String key = roomMembersKey(roomId);
Map<String, Object> map = redisCache.getCacheMap(key);
List<RoomMemberDTO> list = new ArrayList<>();
if (map != null && !map.isEmpty()) {
for (Object val : map.values()) {
if (val != null) {
RoomMemberDTO dto = val instanceof RoomMemberDTO ? (RoomMemberDTO) val : JSON.parseObject(JSON.toJSONString(val), RoomMemberDTO.class);
if (dto != null) list.add(dto);
}
}
}
return list;
}
/**
* 刷新会话心跳延长过期时间
*/
public void refreshSessionHeartbeat(String sessionId) {
String key = sessionKey(sessionId);
Object val = redisCache.getCacheObject(key);
if (val != null) {
redisCache.setCacheObject(key, val, SESSION_EXPIRE_MINUTES, TimeUnit.MINUTES);
}
}
/**
* 根据会话ID离开房间用于连接断开时清理
*
* @param sessionId 会话ID
* @return 若该会话在房间中返回会话信息 roomIdmember用于广播否则返回 null
*/
public RoomSessionInfo leaveBySessionId(String sessionId) {
Object val = redisCache.getCacheObject(sessionKey(sessionId));
if (val == null) return null;
RoomSessionInfo info = val instanceof RoomSessionInfo ? (RoomSessionInfo) val
: JSON.parseObject(JSON.toJSONString(val), RoomSessionInfo.class);
if (info == null || info.getRoomId() == null || info.getMember() == null) return null;
Long roomId = info.getRoomId();
RoomMemberDTO member = info.getMember();
leaveRoom(roomId, sessionId, member.getUserId());
return info;
}
}