当前位置: 首页 > news >正文

websocket集群部署遇到的一些事

最近刚好有个场景,业务处理一份报告需要关注实时处理的进度。

本来打算使用前端轮训方式,但是考虑到这样效率比较低,也无法精确知道处理进度,就想到用websocket和前端实时交互,进度有更新就通知前端,避免了无用的空轮训请求。

websocket通过session链接,和前端保持链接,将客户的信息存储在内存中,用户每个请求都有一个唯一的uuid,后续通过uuid查找对应客户的session发送websocket。

系统架构图

流程图

import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author 
 * @since 2024/8/26
 */
@Configuration
@EnableAutoConfiguration
@EnableWebSocket
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}


import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * @author 
 * @since 2024/8/26
 */
@ServerEndpoint(value = "/websocket/progress")
@Component
@Slf4j
public class WebSocketServer {

    //静态变量,记录当前在线连接数
    private static int onlineCount = 0;
    //存储客户端对应的WebSocket对象
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet();
    //与某个客户端的连接会话
    private Session session;
    //客户端sid
    private String sid = "";

    //建立连接成功后调用的方法
    @OnOpen
    public void onOpen(Session session) {
        //获取当前会话
        this.session = session;

        //存储WebSocket对象
        String sid = getUuid();
        //客户端标识
        this.sid = sid;
        webSocketSet.add(this);
        //在线连接计数
        addOnlineCount();
        try {
            //调用服务端发送信息方法
            sendMessage("连接成功" + this.sid);
        } catch (Exception e) {
            log.error("IO异常:{} ", e);
        }
    }

    private String getUuid() {
        IdWorker idWorker = new IdWorker();
        long l = idWorker.nextId();
        return String.valueOf(l);
    }

    @OnClose
    public void onClose() {
        //移除WebSocket对象
        webSocketSet.remove(this);
        subOnlineCount();
    }

    @OnMessage
    public void onMessage(String message) {
        log.info("WebSocketServer 收到message{}", this.sid);
    }

    @OnError
    public void onError(Throwable error) {
        log.error("onError,{}", error);
    }

    public void sendMessage(String message) throws IOException {
        this.session.getBasicRemote().sendText(message);
    }

    public static void sendInfo(String message, String sid) {
        if (sid == null) {
            return;
        }
        for (WebSocketServer item : webSocketSet) {
            try {
                if (item.sid.equals(sid)) {
                    //发给单一用户
                    item.sendMessage(message);
                }
            } catch (IOException e) {
                log.error("发送websocket失败{}", e);
            }
        }
    }

    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount += 1;
    }

    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount -= 1;
    }
}

和前端本地测试的好好的,一点毛病没有,以为万事大吉了。

结果一发布到测试环境,gg了,进度条时好时坏,一开始还没查出来问题,没有往集群的方面想,还以为是哪里不小心自动关闭了websocket,查了n久之后,突然顿悟了,有集群啊,多台服务器,前端链接之后,session是保持了,但是一开始链接的可能是服务器A,后续的请求可能就到服务器B了,那就拿不到客户的session,就无法发送消息了。

找到问题,就得想解决方法了

方法一 固定ip请求,用nginx的ip_hash负载均衡请求,可惜行不通,问了运维那边,ip是动态了,搞不定。

方法二 改网关的负载均衡策略,因为原来的负载均衡策略是均衡分发,改到源地址策略,就会固定ip请求到同一服务器了,和nginx ip_hash类似,但是也行不通,不能改策略。

方法三 最后只剩这个方法,通过rabbitmq广播各个服务器,根据服务器本身存储的session信息(uuid),判断这个消息时候需要处理。

customer自己发送消息,自己监听,发送mq的时候会自动在头尾带上",以及会对字符串中的“引号进行转义,加上\,所以都要进行替换


import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author 
 * @since 2024/9/19
 */
@Component
public class WebsocketMqCustomer {

    @RabbitListener(queues = "#{psQueue.name}")
    public void pubsubMqMsg(String message) throws IOException {
        String[] xxxx = message.replace("\\", "").split(";;;");
        WebSocketServer.sendInfo(xxxx[1].substring(0, xxxx[1].length() - 1).replace("\\", ""), xxxx[0].substring(1));
    }

    public static void sendInfo(String message, String sid) {
        if (StringUtils.isNotEmpty(sid)) {
            RabbitTemplate rabbitTemplate = SpringUtil.getBean(RabbitTemplate.class);
            rabbitTemplate.convertAndSend(MainRabbitMqCreateConfig.EXCHANGE_WEBSOCKET, sid, sid + ";;;" + message);
        }
    }
}

还遇见了一个问题,@ServerEndpoint注解添加的类无法通过@Autowired直接注入,因为websocket处理是基于Servlet的,而Servlet容器并不像Spring mvc那样自动管理bean的依赖注入,可以通过Application来获取其他bean。

相关文章:

  • SD2.0 Specification之大容量卡与标准容量卡的命令差异
  • [Redis][典型运用][缓存]详细讲解
  • 5 apache poi实现excel的动态下拉框功能
  • RTA-OS Port Guide学习(三)-基于S32K324 OS
  • 一步一步丰富生成式语言模型系统
  • 计算机毕业设计Python+Tensorflow股票推荐系统 股票预测系统 股票可视化 股票数据分析 量化交易系统 股票爬虫 股票K线图 大数据毕业设计 AI
  • 安全点的应用场景及其原理详解
  • 828华为云征文|WordPress部署
  • jupyter安装与使用——Ubuntu服务器
  • 《工程科学与技术》
  • 上交所服务器崩溃:金融交易背后的技术隐患暴露杭州BGP高防服务器43.228.71.X
  • 设计模式之装饰模式(Decorator)
  • 数据结构-3.5.队列的顺序实现
  • 搭建高效知识库:教培机构数字教学的关键一步
  • 搭建本地AI聊天界面:Open WebUI与Ollama实战指南
  • 如何使用ssm实现北关村基本办公管理系统的设计与实现
  • 828华为云征文|华为云Flexus云服务器X实例Windows系统部署一键短视频生成AI工具moneyprinter
  • Xiaojie雷达之路---doa估计(dbf、capon、music算法)
  • Spring Security 是一个强大的和高度可定制的身份验证和访问控制框架。它是 Spring 项目家族的一员,用于构建安全的 Java 应用程序。
  • 代码随想录_刷题记录_第四次
  • 印方称所有敌对行动均得到反击和回应,不会升级冲突
  • 这座古村,藏着多少赣韵风华
  • 湖北宜昌:在青山绿水间解锁乡村振兴“密码”
  • 工行回应两售出金条发现疑似杂质:情况不属实,疑似杂质应为金条售出后的外部附着物
  • 一热就出汗 VS 热死都不出汗的人,哪个更健康?
  • 《2025城市青年旅行消费报告》发布,解码青年出行特征