当前位置: 首页 > news >正文

spring cloud 使用 webSocket

1.引入依赖,(在微服务模块中)

<!-- Spring WebSocket -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

2.新建文件

package com.ruoyi.foundation.webSocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * 开启WebSocket支持
 */

@Configuration
public class WebSocketConfig {
    // 使用boot内置tomcat时需要注入此bean
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
package com.ruoyi.foundation.webSocket;

import lombok.extern.slf4j.Slf4j;

import javax.websocket.Session;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class WebsocketUtil {
    private static final Map<String, Session> ONLINE_SESSION = new ConcurrentHashMap<>();

    /**
     * 添加session
     */
    public static void addSession(String userId, Session session){
        // 一个用户只允许一个session链接
        ONLINE_SESSION.putIfAbsent(userId, session);
        log.info("User [{}] connected. Total online users: {}", userId, ONLINE_SESSION.size());
    }

    /**
     * 移除session
     */
    public static void removeSession(String userId){
        ONLINE_SESSION.remove(userId);
        log.info("User [{}] disconnected. Total online users: {}", userId, ONLINE_SESSION.size());
    }

    /**
     * 给单个用户推送消息
     */
    public static void sendMessage(String userId, String message){
        Session session = ONLINE_SESSION.get(userId);
        if(session == null){
            log.warn("Session for user [{}] not found", userId);
            return;
        }
        sendMessage(session, message);
    }

    public static void sendMessage(Session session, String message) {
        if (session != null) {
            session.getAsyncRemote().sendText(message);
        }
    }

    /**
     * 给所有用户发消息
     */
    public static void sendMessageForAll(String message) {
        ONLINE_SESSION.forEach((userId, session) -> {
            CompletableFuture.runAsync(() -> sendMessage(session, message))
                    .exceptionally(ex -> {
                        log.error("Failed to send message to user [{}]: {}", userId, ex.getMessage());
                        return null;
                    });
        });
    }

    /**
     * 给指定的多个用户推送消息
     */
    public static void sendMessageForUsers(Set<String> userIds, String message) {
        userIds.forEach(userId -> {
            Session session = ONLINE_SESSION.get(userId);
            if (session == null) {
                log.warn("Session for user [{}] not found", userId);
                return;
            }
            CompletableFuture.runAsync(() -> sendMessage(session, message))
                    .exceptionally(ex -> {
                        log.error("Failed to send message to user [{}]: {}", userId, ex.getMessage());
                        return null;
                    });
        });
    }
}
package com.ruoyi.foundation.apicontroller;

import com.google.gson.Gson;
import com.ruoyi.foundation.apicontroller.req.MemorialHallWebsocketReq;
import com.ruoyi.foundation.webSocket.WebsocketUtil;
import io.seata.common.util.StringUtils;
import org.apache.poi.util.StringUtil;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@Component
@ServerEndpoint(value = "/api/memorialHallWebsocket/{dailyMissId}/{userId}")
public class MmMemorialHallWebsocketController {
    /**
     * 保存每日一念纪念馆中当前在线的用户ID
     */
    private static final Map<String, List<String>> memorialHallUsers = new ConcurrentHashMap<>();

    private Gson gson=new Gson();

    @OnOpen
    public void onOpen(@PathParam(value = "dailyMissId") String dailyMissId,@PathParam(value = "userId") String userId, Session session) {
        WebsocketUtil.addSession(userId, session);

        List<String> strings = memorialHallUsers.get(dailyMissId);
        if (strings == null){
            List<String> list=new ArrayList<>();
            list.add(userId);
            memorialHallUsers.put(dailyMissId,list);
        }else{
            strings.add(userId);
        }
    }

    @OnClose
    public void onClose(@PathParam(value = "dailyMissId") String dailyMissId,@PathParam(value = "userId") String userId, Session session) {
        WebsocketUtil.removeSession(userId);

        List<String> strings = memorialHallUsers.get(dailyMissId);
        if(strings != null){
            strings.remove(userId);
        }
    }

    @OnMessage
    public void onMessage(@PathParam(value = "dailyMissId") String dailyMissId,@PathParam(value = "userId") String userId, Session session, String message) {
        /*System.out.println(dailyMissId);
        System.out.println(userId);
        System.out.println(session);
        System.out.println(message);*/

        //MemorialHallWebsocketReq memorialHallWebsocketReq = gson.fromJson(message, MemorialHallWebsocketReq.class);

        List<String> strings = memorialHallUsers.get(dailyMissId);
        if(strings == null || strings.isEmpty()){
            return;
        }

        Set<String> collect = strings.stream().filter(userId1 -> !StringUtils.equals(userId1, userId)).collect(Collectors.toSet());

        //对同纪念馆的在线用户进行广播
        WebsocketUtil.sendMessageForUsers(collect,message);
    }

    @OnError
    public void onError(Session session, Throwable throwable) {
        try {
            session.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        throwable.printStackTrace();
    }
}

3.网关允许WebScoket

- id: ruoyi-foundationWebSocket
    uri: lb:ws://ruoyi-foundation
    predicates:
        - Path=/foundationWebSocket/**
    filters:
        - StripPrefix=1

4.测试

5.线上nginx配置


    location /mmwzGateWay/ {
        if ($request_method = OPTIONS) {
            add_header Access-Control-Allow-Origin $http_origin;
            add_header "Access-Control-Allow-Headers" "Authorization, Origin, X-Requested-With, Content-Type, Accept";
            add_header Access-Control-Allow-Methods GET,POST,OPTIONS,HEAD,PUT,DELETE;
            add_header Access-Control-Allow-Credentials true;
            return 200;
        }
 
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;

	   # WebSocket 相关的头部配置
	   proxy_http_version 1.1;
	   proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header X-Forwarded-Proto $scheme;
        
        proxy_pass http://mmwz-gateway:8080/;
        #proxy_pass http://www.baidu.com/;
    }

相关文章:

  • 怎麼使用靜態住宅IP進行多社媒帳號管理
  • A4988一款带转换器和过流保护的 DMOS 微步驱动器的使用方式
  • 探索高通骁龙游戏超分辨率技术:移动游戏的未来
  • 20240911 光迅科技 笔试
  • ProxySQL构建PolarDB-X标准版高可用路由服务三节点集群
  • 理解WebGPU 中的 GPUDevice :与 GPU 交互的核心接口
  • 【时时三省】(C语言基础)简单的算法举例
  • leetcode-495.提莫攻击
  • 或非门组成的SR锁存器真值表相关问题
  • LLM:GPT 系列
  • C#关于静态关键词static详解
  • vue 文件下载(导出)excel的方法
  • 【Elasticsearch】运行时字段(Runtime Fields)索引时定义运行时字段
  • EtherNetIP转ModbusTCP网关,给风电注入“超级赛亚人”能量
  • flutter ListView 局部刷新
  • 相得益彰,Mendix AI connector 秒连DeepSeek ,实现研发制造域场景
  • 在WPS中通过JavaScript宏(JSA)调用本地DeepSeek API优化文档教程
  • Jenkins 新建配置 Freestyle project 任务 六
  • 力扣142题——环形链表II
  • 项目中菜单按照层级展示sql
  • 刘洪洁已任六安市委副书记、市政府党组书记
  • 李公明︱一周书记:数字文化的乌托邦精神与……算法时代的生存指南
  • “网约摩托”在部分县城上线:起步价五六元,专家建议纳入监管
  • 证监会副主席王建军被查
  • 中方拟解除对5名欧洲议会议员制裁?外交部:望中欧立法机构相向而行
  • 魔都眼|静安光影派对五一启幕:苏河湾看徐悲鸿艺术画作