spring cloud 同一服务多实例 websocket跨实例无法共享Session 的解决
思路>>使用redis发布消息,通知其他实例,查询符合条件的Session用于发送消息
package com.ruoyi.common.redis.message;import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;/*** ApplicationContext 本身就实现了 ApplicationEventPublisher 接口*/
@Component
public class ApplicationContextProvider implements ApplicationContextAware {private static ApplicationContext context;public static ApplicationContext getApplicationContext() {return context;}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) {context = applicationContext;}public static ApplicationEventPublisher getEventPublisher() {return context;}
}
package com.ruoyi.common.redis.message;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;@Configuration
public class RedisMessageConfig {/*** 创建消息监听器容器* @param connectionFactory Redis连接工厂* @param listenerAdapter 消息监听器适配器* @return RedisMessageListenerContainer*/@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 将监听器适配器添加到容器,并订阅指定频道container.addMessageListener(listenerAdapter, new PatternTopic("*"));// 你也可以订阅多个频道,或者使用PatternTopic进行模式匹配订阅// container.addMessageListener(listenerAdapter, new PatternTopic("news.*"));return container;}/*** 将我们实现了MessageListener接口的订阅者包装成MessageListenerAdapter* Spring会默认寻找名为"onMessage"的方法* @param subscriber 我们的订阅者实例* @return MessageListenerAdapter*/@Beanpublic MessageListenerAdapter listenerAdapter() {return new MessageListenerAdapter(new RedisMessageSubscriber(),"onMessage");}
}
package com.ruoyi.common.redis.message;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;@Component
public class RedisMessagePublisher {@Autowiredprivate RedisTemplate redisTemplate;/*** 发布消息* @param channel* @param message*/public void publish(String channel, Object message) {redisTemplate.convertAndSend(channel, message);}
}
package com.ruoyi.common.redis.message;import com.ruoyi.common.core.utils.SpringUtils;
import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.common.redis.message.event.Commemorate3dCemeteryEvent;
import com.ruoyi.common.redis.message.event.Commemorate3dHallEvent;
import com.ruoyi.common.redis.message.event.KinMemberChangeEvent;
import com.ruoyi.common.redis.message.event.MemorialHallEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;import java.nio.charset.StandardCharsets;
@Slf4j
@Component
public class RedisMessageSubscriber implements MessageListener {@Overridepublic void onMessage(Message message, byte[] pattern) {ApplicationEventPublisher eventPublisher = ApplicationContextProvider.getEventPublisher();String actualChannel = new String(message.getChannel(), StandardCharsets.UTF_8);System.out.println("redisMessage频道: " + actualChannel);System.out.println("redisMessage消息: " + message);//System.out.println("消息类型: " + message.getClass().getSimpleName());//举例 module:business:key websocket:kinMemberChange:kinMemberChange-useridString[] split = StringUtils.split(actualChannel, ':');if (split.length != 3) {log.error("redis onMessage channel error");}if(StringUtils.equals(split[0], "websocket")){//同一模块部署多实例的时候,任何一个模块发送websocket消息,都要通知其他实例,在内存中找session对象,进行发送消息if(StringUtils.equals(split[1], "kinMemberChange")){//其他实例,kinMemberChange 业务,发布了新消息String[] split1 = StringUtils.split(split[2], "_");eventPublisher.publishEvent(new KinMemberChangeEvent(this,split1[0],split1[1],""));}else if(StringUtils.equals(split[1], "commemorate3dCemetery")){String[] split1 = StringUtils.split(split[2], "_");byte[] bodyBytes = message.getBody();String messageBody = new String(bodyBytes);messageBody = removeExtraQuotes(messageBody);messageBody = StringUtils.replace(messageBody,"\\r\\n","");messageBody = StringUtils.replace(messageBody,"\\\"","\"");eventPublisher.publishEvent(new Commemorate3dCemeteryEvent(this,split1[0],split1[1],messageBody));}else if(StringUtils.equals(split[1], "commemorate3dHall")){String[] split1 = StringUtils.split(split[2], "_");byte[] bodyBytes = message.getBody();String messageBody = new String(bodyBytes);messageBody = removeExtraQuotes(messageBody);messageBody = StringUtils.replace(messageBody,"\\r\\n","");messageBody = StringUtils.replace(messageBody,"\\\"","\"");eventPublisher.publishEvent(new Commemorate3dHallEvent(this,split1[0],split1[1],messageBody));}else if(StringUtils.equals(split[1], "memorialHall")){String[] split1 = StringUtils.split(split[2], "_");byte[] bodyBytes = message.getBody();String messageBody = new String(bodyBytes);messageBody = removeExtraQuotes(messageBody);messageBody = StringUtils.replace(messageBody,"\\r\\n","");messageBody = StringUtils.replace(messageBody,"\\\"","\"");eventPublisher.publishEvent(new MemorialHallEvent(this,split1[0],split1[1],messageBody));}}}/*** 去除字符串首尾的多余引号*/private String removeExtraQuotes(String str) {if (str == null || str.isEmpty()) {return str;}// 去除首尾的双引号if (str.startsWith("\"") && str.endsWith("\"")) {return str.substring(1, str.length() - 1);}// 去除首尾的单引号if (str.startsWith("'") && str.endsWith("'")) {return str.substring(1, str.length() - 1);}return str;}
}
在websocket收到消息时,发布redis消息,通知所有实例
package com.ruoyi.foundation.apicontroller;import com.google.gson.Gson;
import com.ruoyi.common.redis.message.RedisMessagePublisher;
import com.ruoyi.foundation.apicontroller.req.MemorialHallWebsocketReq;
import com.ruoyi.foundation.event.dto.AnimationBroadcastDto;
import com.ruoyi.foundation.webSocket.WebsocketUtil;
import io.seata.common.util.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;@Service
@ServerEndpoint(value = "/api/mingmenCommemorate3dHallWebsocket/{commemorate3d}/{userId}")
public class MmCommemorate3dHallWebsocketController {/*** 保存每日一念纪念馆中当前在线的用户ID*/public static final Map<String, List<String>> memorialHallUsers = new ConcurrentHashMap<>();private Gson gson=new Gson();private static ApplicationEventPublisher eventPublisher;private static RedisMessagePublisher redisMessagePublisher;@Autowiredpublic void setEventPublisher(ApplicationEventPublisher publisher) {MmCommemorate3dHallWebsocketController.eventPublisher = publisher;}@Autowiredpublic void setRedisMessagePublisher(RedisMessagePublisher publisher) {MmCommemorate3dHallWebsocketController.redisMessagePublisher = publisher;}/*public MmMemorialHallWebsocketController() {// 从 Spring 容器中获取 ApplicationEventPublisherthis.eventPublisher = SpringContextUtil.getBean(ApplicationEventPublisher.class);}*/@OnOpenpublic void onOpen(@PathParam(value = "commemorate3d") String commemorate3d,@PathParam(value = "userId") String userId, Session session) {WebsocketUtil.addSession("commemorate3d:"+userId, session);List<String> strings = memorialHallUsers.get(commemorate3d);if (strings == null){List<String> list=new ArrayList<>();list.add("commemorate3d:"+userId);memorialHallUsers.put(commemorate3d,list);}else{strings.add("commemorate3d:"+userId);}}@OnClosepublic void onClose(@PathParam(value = "commemorate3d") String commemorate3d,@PathParam(value = "userId") String userId, Session session) {WebsocketUtil.removeSession("commemorate3d:"+userId);List<String> strings = memorialHallUsers.get(commemorate3d);if(strings != null){strings.remove("commemorate3d:"+userId);}}@OnMessagepublic void onMessage(@PathParam(value = "commemorate3d") String commemorate3d,@PathParam(value = "userId") String userId, Session session, String message) {/*System.out.println(dailyMissId);System.out.println(userId);System.out.println(session);System.out.println(message);*///发送redis消息,使消息扩散到每个spring boot 实例redisMessagePublisher.publish("websocket:commemorate3dHall:"+commemorate3d + "_" + userId,message);/*MemorialHallWebsocketReq memorialHallWebsocketReq = gson.fromJson(message, MemorialHallWebsocketReq.class);List<String> strings = memorialHallUsers.get(commemorate3d);if(strings == null || strings.isEmpty()){return;}Set<String> collect = strings.stream().filter(userId1 -> !StringUtils.equals(userId1, "commemorate3d:"+userId)).collect(Collectors.toSet());//对在线的用户广播前,记录动效已经播放if(StringUtils.isNotBlank(memorialHallWebsocketReq.getMingmenSacrificeRecordId()) && collect.size() > 0){List<Long> collect1 = collect.stream().map(u -> Long.valueOf(u.split(":")[1])).collect(Collectors.toList());eventPublisher.publishEvent(new AnimationBroadcastDto(this,Long.valueOf(memorialHallWebsocketReq.getMingmenSacrificeRecordId()),collect1));}//对同纪念馆的在线用户进行广播WebsocketUtil.sendMessageForUsers(collect,message);*/}@OnErrorpublic void onError(Session session, Throwable throwable) {try {session.close();} catch (IOException e) {e.printStackTrace();}throwable.printStackTrace();}/*** 消息来自redis,通过redis可使同一模块所有实例都收到消息* @param commemorate3d* @param userId* @param message*/public void onMessageFromRedis(String commemorate3d,String userId,String message){MemorialHallWebsocketReq memorialHallWebsocketReq = gson.fromJson(message, MemorialHallWebsocketReq.class);List<String> strings = memorialHallUsers.get(commemorate3d);if(strings == null || strings.isEmpty()){return;}Set<String> collect = strings.stream().filter(userId1 -> !StringUtils.equals(userId1, "commemorate3d:"+userId)).collect(Collectors.toSet());//对在线的用户广播前,记录动效已经播放if(StringUtils.isNotBlank(memorialHallWebsocketReq.getMingmenSacrificeRecordId()) && collect.size() > 0){List<Long> collect1 = collect.stream().map(u -> Long.valueOf(u.split(":")[1])).collect(Collectors.toList());eventPublisher.publishEvent(new AnimationBroadcastDto(this,Long.valueOf(memorialHallWebsocketReq.getMingmenSacrificeRecordId()),collect1));}//对同纪念馆的在线用户进行广播WebsocketUtil.sendMessageForUsers(collect,message);}
}
实例收到redis消息后,转化为ApplicationEventPublisher 消息,从基础层调用业务层
package com.ruoyi.foundation.redisevents;import com.ruoyi.common.redis.message.event.Commemorate3dHallEvent;
import com.ruoyi.foundation.apicontroller.MmCommemorate3dHallWebsocketController;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class Commemorate3dHallListen {@Async@EventListenerpublic void commemorate3dCemetery(Commemorate3dHallEvent event) {MmCommemorate3dHallWebsocketController mmCommemorate3dHallWebsocketController = new MmCommemorate3dHallWebsocketController();mmCommemorate3dHallWebsocketController.onMessageFromRedis(event.getCommemorate3d(),event.getUserId(),event.getMessage());}
}