当前位置: 首页 > news >正文

WebSocket分布式实现方案

版本使用

Springboot使用3.3.1版本

jdk21

实现逻辑

使用Redis记录用户连接地址,RabbitMQ为每个微服务所在的服务器创建对应的交换机或特定的路由规则,每个微服务监听自己的交换机。当连接不同服务器的两个用户进行通信时,通过MQ将消息进行转发到对应的队列中,由监听该队列的微服务获取消息进行转发到连接该服务器的用户。例如:用户A连接到120.78.2.56服务器,用户B连接到120.78.2.57服务器,当用户A要给用户B发送消息时,用户A请求56服务器的接口,将消息发送到56服务器,56服务器上的应用判断用户B的连接是否在自己这里,如果在自己这里则直接进行消息转发即可,如果不在则查询Redis中用户B连接的服务器地址,然后将消息发送到MQ对应的队列或交换机上,由57服务器监听消息,然后发送给连接在57服务器上的用户B。

代码实现
pom依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.creatar</groupId><artifactId>creatar</artifactId><version>1.0</version><packaging>jar</packaging><properties><maven.compiler.source>21</maven.compiler.source><maven.compiler.target>21</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.3.1</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-undertow</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- https://mvnrepository.com/artifact/io.netty/netty-all --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.112.Final</version></dependency><!-- https://mvnrepository.com/artifact/org.postgresql/postgresql --><dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId><version>42.7.4</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-3-starter --><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-3-starter</artifactId><version>1.2.23</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-spring-boot3-starter</artifactId><version>3.5.7</version></dependency><!-- https://mvnrepository.com/artifact/com.github.yulichang/mybatis-plus-join-boot-starter --><dependency><groupId>com.github.yulichang</groupId><artifactId>mybatis-plus-join-boot-starter</artifactId><version>1.4.13</version></dependency><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-redis --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.26.0</version></dependency></dependencies></project>
创建Netty服务器
package com.creatar.service.common.im.core;import com.creatar.properties.SocketProperties;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
/*** Netty服务器** @author: 张定辉* @date: 2025-05-03* @description: Netty服务器*/
@Component
@RequiredArgsConstructor
public class NettyServer implements ApplicationRunner {private static final NioEventLoopGroup bossGroup = new NioEventLoopGroup();private static final NioEventLoopGroup workerGroup = new NioEventLoopGroup();private final SocketProperties socketProperties;private final SocketMessageHandler socketMessageHandler;private Channel serverChannel;private ServerBootstrap bootstrap() {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new HttpServerCodec());socketChannel.pipeline().addLast(new ChunkedWriteHandler());socketChannel.pipeline().addLast(new HttpObjectAggregator(65536));socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler(socketProperties.getHost()));socketChannel.pipeline().addLast(new LoggingHandler());socketChannel.pipeline().addLast(socketMessageHandler);}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);return serverBootstrap;}@Overridepublic void run(ApplicationArguments args) throws Exception {serverChannel = bootstrap().bind(socketProperties.getPort()).sync().channel().closeFuture().sync().channel();}@PreDestroypublic void stop() {serverChannel.close();serverChannel.parent().close();}
}
用户连接/在线情况业务类
package com.creatar.service.common.im.core;import com.creatar.properties.SocketProperties;
import com.creatar.util.HttpUtil;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;/*** 用户在线情况业务类** @author: 张定辉* @date: 2025-05-03* @description: 用户在线情况业务类*/
@Service
@RequiredArgsConstructor
public class OnlineService {private final SocketProperties socketProperties;private final RedisTemplate<String, String> redisTemplate;/*** 设置用户上线** @param userId 用户ID*/public void setUserOnline(String userId) {if (StringUtils.isBlank(userId)) {return;}redisTemplate.opsForValue().setBit(socketProperties.getOnlineKey(), Long.parseLong(userId), true);}/*** 设置用户下线*/public void setUserOffline(String userId) {if (StringUtils.isBlank(userId)) {return;}redisTemplate.opsForValue().setBit(socketProperties.getOnlineKey(), Long.parseLong(userId), false);}/*** 统计在线人数** @return 返回在线人数*/public Long statOnline() {return redisTemplate.execute((RedisCallback<Long>) con -> con.stringCommands().bitCount(socketProperties.getOnlineKey().getBytes()));}/*** 设置用户连接节点*/public void setUserConnectionNode(String userId) {redisTemplate.opsForValue().set(socketProperties.getUserConnectionNode() + userId, HttpUtil.getHostname());redisTemplate.expire(socketProperties.getUserConnectionNode() + userId, 1, TimeUnit

文章转载自:

http://4jUqnPZc.ghsLr.cn
http://exbxSS16.ghsLr.cn
http://kiLUg9UY.ghsLr.cn
http://dVXNeuHl.ghsLr.cn
http://h5p0FCWf.ghsLr.cn
http://H4uxCjv0.ghsLr.cn
http://zIF3kDfb.ghsLr.cn
http://Q3d0agz8.ghsLr.cn
http://ZVUKJNKE.ghsLr.cn
http://RJ1bPwMT.ghsLr.cn
http://qACaoITC.ghsLr.cn
http://u8OPXtfm.ghsLr.cn
http://SDdyRoxI.ghsLr.cn
http://62ApuvX6.ghsLr.cn
http://8CyoLVjG.ghsLr.cn
http://ZOGmZvog.ghsLr.cn
http://QZ1SR7ck.ghsLr.cn
http://IPODJ53N.ghsLr.cn
http://PzRly9ub.ghsLr.cn
http://ldsG5xaV.ghsLr.cn
http://IeYghTAJ.ghsLr.cn
http://HFCi9oEb.ghsLr.cn
http://8tXwlnTR.ghsLr.cn
http://rh24u9ke.ghsLr.cn
http://VSo2AJvd.ghsLr.cn
http://M6dg1hN9.ghsLr.cn
http://ezxOI1Ih.ghsLr.cn
http://ChZRXj3y.ghsLr.cn
http://qd6OHqbV.ghsLr.cn
http://d2m7TxP5.ghsLr.cn
http://www.dtcms.com/a/169676.html

相关文章:

  • 【LLaMA-Factory实战】1.3命令行深度操作:YAML配置与多GPU训练全解析
  • 数据库 1.0
  • wpf CommandParameter 传递MouseWheelEventArgs参数 ,用 MvvmLight 实现
  • 里氏替换原则(LSP)
  • Java 网络安全新技术:构建面向未来的防御体系
  • 强化学习--2.数学
  • 【Java学习】通配符?
  • 内存性能测试方法
  • 如何通过文理工三类AI助理赋能HI,从而,颠覆“隔行如隔山”的旧观念和“十万小时定律”的成长限制
  • 穿越数据森林与网络迷宫:树与图上动态规划实战指南
  • 【CF】Day50——Codeforces Round 960 (Div. 2) BCD
  • AVFormatContext 再分析零
  • 《告别试错式开发:TDD的精准质量锻造术》
  • Vivado FPGA 开发 | 创建工程 / 仿真 / 烧录
  • 分布式事务解决方案
  • 生成对抗网络(GAN, Generative Adversarial Network)​
  • ES6/ES11知识点
  • 深入理解C++类型转换:从基础到高级应用
  • 【前缀和】和为 K 的连续子数组
  • 【iview】es6变量结构赋值(对象赋值)
  • list的迭代器详讲
  • 2025 年最新 Python 语言实现网易企业邮箱邮件推送验证码详细教程(更新中)
  • 【Redis】redis的数据类型、单线程模型和String的使用
  • 19. LangChain安全与伦理:如何避免模型“幻觉“与数据泄露?
  • 单细胞测序试验设计赏析(一)
  • C++ 友元:打破封装的钥匙
  • 希洛激活器策略思路
  • Java接口全面教程:从入门到精通
  • 智能决策支持系统的基本构建
  • 软件测试名词科普:驱动模块、桩模块