当前位置: 首页 > wzjs >正文

合肥建设网站淘宝关键词推广

合肥建设网站,淘宝关键词推广,菠菜导航网站可以做,企业网站模板下载网站模板下载直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理前言一. Socket服务整合RabbitMQ二. 弹幕服务创建2.1 创建一个公共maven项目2.2 弹幕服务项目创建2.2.1 创建队列和广播型交换机2.2.2 生产者发送最终弹幕数据2.2.3 消费者监听原始弹幕数据2.3 Soc…

直播弹幕系统(二)- 整合RabbitMQ进行消息广播和异步处理

  • 前言
  • 一. Socket服务整合RabbitMQ
  • 二. 弹幕服务创建
    • 2.1 创建一个公共maven项目
    • 2.2 弹幕服务项目创建
      • 2.2.1 创建队列和广播型交换机
      • 2.2.2 生产者发送最终弹幕数据
      • 2.2.3 消费者监听原始弹幕数据
    • 2.3 Socket服务监听弹幕数据并返回前端
      • 2.3.1 配置类
      • 2.3.2 消费者
    • 2.4 测试

前言

上一篇文章 SpringCloud网关对WebSocket链接进行负载均衡 中把主要的架子搭建好了,这一篇文章就要开始写业务逻辑了。在分布式系统下,如何达到SpringBoot - WebSocket的使用和聊天室练习的效果。

一. Socket服务整合RabbitMQ

我们页面上,通过WebSocket发送弹幕信息的时候,后端通过@OnMessage注解修饰的函数进行接收。这里我们统一将原始的弹幕消息丢给MQ。让另一个专业的弹幕服务去消费处理。目的也是希望WebSocket服务它只负责消息的传递和WebSocket信息的维护,业务逻辑啥也不做。

1.添加pom依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置文件bootstrap.yml,添加RabbitMQ相关配置

server:port: 81spring:application:name: tv-service-socketcloud:nacos:discovery:server-addr: 你的Nacos地址:8848rabbitmq:username: guestpassword: guest# 虚拟主机,默认是/virtual-host: /# 超时时间connection-timeout: 30000listener:simple:# 消费模式,手动acknowledge-mode: manual# 并发数concurrency: 5# 最大并发数max-concurrency: 10# 限流,如果严格控制消费顺序,这里应该填1,数值越大,消费处理速度越快。MQ会把这个数值的消息放到缓存当中。# 因此数值越大,内存占用越大,还需要考虑消费的速度prefetch: 10addresses: 你的RabbitMQ地址:5672

3.RabbitMQ配置类:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author Zong0915* @date 2022/12/15 下午1:29*/
@Configuration
public class RabbitMQConfig {@Beanpublic Queue initDirectQueue() {return new Queue("originBullet-queue", true);}@BeanDirectExchange initDirectExchange() {return new DirectExchange("bulletPreProcessor-exchange", true, false);}@BeanBinding initBindingDirect() {return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("bullet.originMessage");}
}

4.写一个简单的消息体OriginMessage,发送到MQ的:

import lombok.Data;/*** @author Zong0915* @date 2022/12/15 下午1:30*/
@Data
public class OriginMessage {private String sessionId;private String userId;private String roomId;private String message;
}

5.MQ生产者OriginMessageSender

/*** @author Zong0915* @date 2022/12/15 下午1:29*/
@Component
public class OriginMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(OriginMessage originMessage) {CorrelationData correlationData = new CorrelationData();correlationData.setId(UUID.randomUUID().toString());// 唯一IDMap<String, Object> map = new HashMap<>();map.put("message", JSONObject.toJSONString(originMessage));// 发送给消息预处理队列rabbitTemplate.convertAndSend("bulletPreProcessor-exchange",// 交换机名称"bullet.originMessage",// 路由Keymap, correlationData);}
}

6.我们再对WebSocket的监听类做一下小改动,将收到的消息,封装一下,然后调用生产者的API即可。只需要注意一下多例下属性的注入方式是怎么写的即可

import kz.cache.SocketCache;
import kz.entity.OriginMessage;
import kz.producer.OriginMessageSender;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
/*** @author Zong0915* @date 2022/12/9 下午3:45*/
@Component
@ServerEndpoint("/websocket/live/{roomId}/{userId}")
@Slf4j
@Getter
public class BulletScreenServer {/*** 多例模式下的赋值方式*/private static OriginMessageSender originMessageSender;/*** 多例模式下的赋值方式*/@Autowiredprivate void setOriginMessageSender(OriginMessageSender originMessageSender) {BulletScreenServer.originMessageSender = originMessageSender;}private static final AtomicLong count = new AtomicLong(0);private Session session;private String sessionId;private String userId;private String roomId;/*** 打开连接* @param session* @OnOpen 连接成功后会自动调用该方法*/@OnOpenpublic void openConnection(Session session, @PathParam("roomId") String roomId, @PathParam("userId") String userId) {// 如果是游客观看视频,虽然有弹幕,但是没有用户信息,所以需要用trycount.incrementAndGet();log.info("*************WebSocket连接次数: {} *************", count.longValue());this.userId = userId;this.roomId = roomId;// 保存session相关信息到本地this.sessionId = session.getId();this.session = session;SocketCache.put(sessionId, this);}/*** 客户端刷新页面,或者关闭页面,服务端断开连接等等操作,都需要关闭连接*/@OnClosepublic void closeConnection() {SocketCache.remove(sessionId);}/*** 客户端发送消息给服务端* @param message*/@OnMessagepublic void onMessage(String message) {if (StringUtils.isBlank(message)) {return;}// 将消息丢给MQ,业务上的处理什么也不管,交给弹幕业务来处理,并且达到削峰的目的originMessageSender.send(buildMessage(message));}private OriginMessage buildMessage(String message) {OriginMessage originMessage = new OriginMessage();originMessage.setMessage(message);originMessage.setRoomId(roomId);originMessage.setSessionId(sessionId);originMessage.setUserId(userId);return originMessage;}
}

备注:记得将另一个Socket项目也改造成同样的代码。

二. 弹幕服务创建

2.1 创建一个公共maven项目

我们创建一个maven项目:service-bulletcommon。先看下最终的项目架构:
在这里插入图片描述

1.pom依赖添加一些常用的工具:

<groupId>bullet-service</groupId>
<artifactId>service-bulletcommon</artifactId>
<version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.12.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.79</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency><dependency><groupId>commons-collections</groupId><artifactId>commons-collections</artifactId><version>3.2.2</version></dependency>
</dependencies>

2.创建一个常量定义类SocketConstants

/*** @author Zong0915* @date 2022/12/15 下午3:59*/
public class SocketConstants {/*** 这条消息是否处理过*/public static final String CORRELATION_SET_PRE = "Correlation_Set_";/*** 同一个房间里面有哪些SessionID*/public static final String ROOM_LIVE_USER_SET_PRE = "ROOM_LIVE_USER_Set_";public static final String MESSAGE = "message";public static final String ID = "id";/*** 原始消息所在队列*/public static final String ORIGIN_BULLET_QUEUE = "originBullet-queue";/*** 广播队列A*/public static final String BULLET_SOCKET_QUEUE_A = "bulletSocket-queueA";/*** 广播队列B*/public static final String BULLET_SOCKET_QUEUE_B = "bulletSocket-queueB";/*** 弹幕预处理交换机*/public static final String BULLET_PRE_PROCESSOR_EXCHANGE = "bulletPreProcessor-exchange";/*** 弹幕广播交换机*/public static final String BULLET_FANOUT_EXCHANGE = "bulletFanOut-exchange";/*** 弹幕预处理路由Key*/public static final String BULLET_ORIGIN_MESSAGE_ROUTE_KEY = "bullet.originMessage";
}

3.创建一个消息传输体OriginMessage

import lombok.Data;/*** @author Zong0915* @date 2022/12/15 下午2:07*/
@Data
public class OriginMessage {private String sessionId;private String userId;private String roomId;private String message;
}

2.2 弹幕服务项目创建

1.我们创建一个maven项目:service-bulletscreen。先看下最终的项目架构:
在这里插入图片描述

1.pom文件:

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.2.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId><version>2.2.1.RELEASE</version><exclusions><exclusion><artifactId>archaius-core</artifactId><groupId>com.netflix.archaius</groupId></exclusion><exclusion><artifactId>commons-io</artifactId><groupId>commons-io</groupId></exclusion><exclusion><artifactId>commons-lang3</artifactId><groupId>org.apache.commons</groupId></exclusion><exclusion><artifactId>fastjson</artifactId><groupId>com.alibaba</groupId></exclusion><exclusion><artifactId>guava</artifactId><groupId>com.google.guava</groupId></exclusion><exclusion><artifactId>httpclient</artifactId><groupId>org.apache.httpcomponents</groupId></exclusion><exclusion><artifactId>servo-core</artifactId><groupId>com.netflix.servo</groupId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.6.7</version><exclusions><exclusion><artifactId>log4j-api</artifactId><groupId>org.apache.logging.log4j</groupId></exclusion></exclusions></dependency><dependency><groupId>bullet-service</groupId><artifactId>service-bulletcommon</artifactId><version>1.0-SNAPSHOT</version></dependency>
</dependencies>

2.application.properties

spring.application.name=tv-service-bulletscreen
spring.cloud.nacos.discovery.server-addr=你的Nacos地址:8848

3.bootstrap.yml文件:

server:port: 83spring:application:name: tv-service-bulletscreenredis:database: 0 # Redis数据库索引(默认为0)host: 你的Redis地址 # Redis的服务地址port: 6379 # Redis的服务端口password: 密码jedis:pool:max-active: 8 # 连接池最大连接数(使用负值表示没有限制)max-wait: -1 # 连接池最大阻塞等待时间(使用负值表示没有限制)max-idle: 8 # 连接池中的最大空闲连接min-idle: 0 # 连接池中的最小空闲链接timeout: 30000 # 连接池的超时时间(毫秒)cloud:nacos:discovery:server-addr: 你的Nacos地址:8848rabbitmq:username: guestpassword: guest# 虚拟主机,默认是/virtual-host: /# 超时时间connection-timeout: 30000listener:simple:# 消费模式,手动acknowledge-mode: manual# 并发数concurrency: 5# 最大并发数max-concurrency: 10# 限流,如果严格控制消费顺序,这里应该填1,数值越大,消费处理速度越快。MQ会把这个数值的消息放到缓存当中。# 因此数值越大,内存占用越大,还需要考虑消费的速度prefetch: 10addresses: 你的RabbitMQ地址:5672

4.Redis配置类RedisConfig

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;@Configuration
public class RedisConfig {/*** 实例化 RedisTemplate 对象** @return*/@Beanpublic RedisTemplate<String, Object> functionDomainRedisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();initDomainRedisTemplate(redisTemplate, redisConnectionFactory);return redisTemplate;}/*** 设置数据存入 redis 的序列化方式,并开启事务** @param redisTemplate* @param factory*/private void initDomainRedisTemplate(RedisTemplate<String, Object> redisTemplate, RedisConnectionFactory factory) {//如果不配置Serializer,那么存储的时候缺省使用String,如果用User类型存储,那么会提示错误User can't cast to String!redisTemplate.setKeySerializer(new StringRedisSerializer());redisTemplate.setHashKeySerializer(new StringRedisSerializer());redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());// 开启事务redisTemplate.setEnableTransactionSupport(true);redisTemplate.setConnectionFactory(factory);}@Bean@ConditionalOnMissingBean(StringRedisTemplate.class)public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {StringRedisTemplate template = new StringRedisTemplate();template.setConnectionFactory(redisConnectionFactory);return template;}
}

2.2.1 创建队列和广播型交换机

创建一个广播模式的交换机bulletFanOut-exchange:其实用direct也可以,因为我只要监听的队列用同一个即可,这里只是进行一个模拟。
在这里插入图片描述

分别为两个Socket服务创建个队列,用来接收处理好的消息(练习下广播模式):

  • bulletSocket-queueA
  • bulletSocket-queueB

再分别为他们和上述创建好的交换机进行绑定。
在这里插入图片描述

我们的弹幕服务主要做两件事:

  • 监听预处理队列,数据来自:originBullet-queue
  • 将处理完的消息通过广播,发送给bulletSocket-queueA/B两个队列。

RabbitMQ配置类如下:

import kz.common.SocketConstants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author Zong0915* @date 2022/12/15 下午1:29*/
@Configuration
public class RabbitMQConfig {@Beanpublic Queue initDirectQueue() {return new Queue(SocketConstants.ORIGIN_BULLET_QUEUE, true);}@Beanpublic Queue initFanoutSocketQueueA() {return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_A, true);}@Beanpublic Queue initFanoutSocketQueueB() {return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_B, true);}@BeanDirectExchange initDirectExchange() {return new DirectExchange(SocketConstants.BULLET_PRE_PROCESSOR_EXCHANGE, true, false);}@Bean("fanoutExchange")FanoutExchange initFanoutExchange() {return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false);}@BeanBinding initBindingDirect() {return BindingBuilder.bind(initDirectQueue()).to(initDirectExchange()).with(SocketConstants.BULLET_ORIGIN_MESSAGE_ROUTE_KEY);}@BeanBinding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {return BindingBuilder.bind(initFanoutSocketQueueA()).to(fanoutExchange);}@BeanBinding initBindingFanoutB(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {return BindingBuilder.bind(initFanoutSocketQueueB()).to(fanoutExchange);}
}

2.2.2 生产者发送最终弹幕数据

创建FanoutMessageProducer类:记得向我们上面绑定的广播交换机发送数据。

import com.alibaba.fastjson.JSONObject;
import kz.entity.OriginMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;
import java.util.UUID;/*** @author Zong0915* @date 2022/12/15 下午2:51*/
@Component
public class FanoutMessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(OriginMessage originMessage) {CorrelationData correlationData = new CorrelationData();correlationData.setId(UUID.randomUUID().toString());// 唯一IDMap<String, Object> map = new HashMap<>();map.put("message", JSONObject.toJSONString(originMessage));rabbitTemplate.convertAndSend("bulletFanOut-exchange",// 交换机名称"",// 路由Keymap, correlationData);}
}

2.2.3 消费者监听原始弹幕数据

创建OriginMessageConsumer类:

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import kz.service.BulletScreenService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Map;/**- @author Zong0915- @date 2022/12/15 下午1:57*/
@Component
@Slf4j
public class OriginMessageConsumer {@Autowiredprivate BulletScreenService bulletScreenService;/*** 处理原始消息** @param testMessage Map类型的消息体* @param headers     消息头* @param channel     消息所在的管道*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "originBullet-queue", durable = "true"),// 默认的交换机类型就是directexchange = @Exchange(name = "bulletPreProcessor-exchange", type = "direct"),key = "bullet.originMessage"))@RabbitHandlerpublic void onOriginMessage(@Payload Map testMessage, @Headers Map<String, Object> headers,Channel channel) throws IOException {log.info("***********消费开始*************");log.info("消费体:{}", JSONObject.toJSONString(testMessage));bulletScreenService.processMessage(testMessage, headers, channel);}
}

2.创建BulletScreenService类用于原始弹幕的业务处理,主要考虑的几个点:

  • 消息的合法性校验。
  • 消息的幂等性保证,这里用了Redis做个存储。
  • 将原始数据处理完后,在丢给MQ进行广播。
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import kz.common.SocketConstants;
import kz.entity.OriginMessage;
import kz.producer.FanoutMessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** @author Zong0915* @date 2022/12/9 下午3:45*/
@Service
@Slf4j
public class BulletScreenService {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate FanoutMessageProducer fanoutMessageProducer;@Asyncpublic void processMessage(Map testMessage, Map<String, Object> headers,Channel channel) throws IOException {OriginMessage originMessage = getOriginMessage(testMessage);// 合法性校验if (!validMessage(testMessage, headers, originMessage)) {return;}// 处理消息log.info("***********业务处理,弹幕: {}***********", originMessage.getMessage());String correlationId = headers.get(SocketConstants.ID).toString();// 存入Redis并设置过期时间1天redisTemplate.opsForSet().add(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), correlationId);redisTemplate.expire(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), 1, TimeUnit.DAYS);// 将处理好的消息发送给MQ,通过广播队列,将消息发送给所有的Socket服务,一般这里还会对originMessage进行一些二次封装// 本案例就不做处理了,原样返回fanoutMessageProducer.send(originMessage);// 确认消息Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);channel.basicAck(deliveryTag, false);}public OriginMessage getOriginMessage(Map testMessage) {String messageJson = (String) testMessage.get(SocketConstants.MESSAGE);if (StringUtils.isBlank(messageJson)) {return null;}OriginMessage originMessage = JSONObject.parseObject(messageJson, OriginMessage.class);return originMessage;}/*** 对消息进行合法性校验*/public boolean validMessage(Map testMessage, Map<String, Object> headers, OriginMessage originMessage) {// 判空if (testMessage == null || testMessage.size() == 0 || originMessage == null) {return false;}if (headers == null || headers.size() == 0) {return false;}// 幂等性校验,如果消息已经被消费过了,那么这个弹幕消息就不应该被二次消费,这个消息就直接把他处理掉UUID correlationId = (UUID) headers.get(SocketConstants.ID);Boolean exist = redisTemplate.opsForSet().isMember(SocketConstants.CORRELATION_SET_PRE + originMessage.getRoomId(), correlationId.toString());return !Optional.ofNullable(exist).orElse(false);}
}

最后就是启动类BulletScreenApplication

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.scheduling.annotation.EnableAsync;/*** @author Zong0915* @date 2022/12/10 下午9:44*/
@SpringBootApplication
@EnableDiscoveryClient
@EnableAsync
public class BulletScreenApplication {public static void main(String[] args) {SpringApplication.run(BulletScreenApplication.class, args);}
}

2.3 Socket服务监听弹幕数据并返回前端

记得在pom依赖中引入上面的公共包:

<dependency><groupId>bullet-service</groupId><artifactId>service-bulletcommon</artifactId><version>1.0-SNAPSHOT</version>
</dependency>

2.3.1 配置类

RabbitMQ配置类增加下队列和交换机的配置信息:绑定bulletSocket-queueA

@Bean
public Queue initFanoutSocketQueueA() {return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_A, true);
}@Bean("fanoutExchange")
FanoutExchange initFanoutExchange() {return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false);
}@Bean
Binding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {return BindingBuilder.bind(initFanoutSocketQueueA()).to(fanoutExchange);
}

另一个Socket项目,添加以下配置:绑定bulletSocket-queueB

@Bean
public Queue initFanoutSocketQueueB() {return new Queue(SocketConstants.BULLET_SOCKET_QUEUE_B, true);
}@Bean("fanoutExchange")
FanoutExchange initFanoutExchange() {return new FanoutExchange(SocketConstants.BULLET_FANOUT_EXCHANGE, true, false);
}@Bean
Binding initBindingFanoutA(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {return BindingBuilder.bind(initFanoutSocketQueueB()).to(fanoutExchange);
}

再写一个缓存工具类,通过直播间号获得同一个直播间下的所有WebSocket信息:

public class SocketCache {public static List<BulletScreenServer> getSocketGroupByRoomId(String roomId) {ArrayList<BulletScreenServer> res = new ArrayList<>();if (StringUtils.isBlank(roomId)) {return res;}for (Map.Entry<Integer, ConcurrentHashMap<String, BulletScreenServer>> hashMapEntry : CACHE_SEGMENT.entrySet()) {ConcurrentHashMap<String, BulletScreenServer> map = hashMapEntry.getValue();if (map == null || map.size() == 0) {continue;}for (BulletScreenServer server : map.values()) {if (server.getSession().isOpen() && StringUtils.equals(roomId, server.getRoomId())) {res.add(server);}}}return res;}
}

2.3.2 消费者

重点就是消费者的业务代码了,对最终的弹幕数据进行广播,创建FanOutMessageConsumer类:

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import kz.cache.SocketCache;
import kz.common.SocketConstants;
import kz.entity.OriginMessage;
import kz.service.BulletScreenServer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.List;
import java.util.Map;/*** @author Zong0915* @date 2022/12/15 下午1:57*/
@Component
@Slf4j
public class FanOutMessageConsumer {/*** 处理弹幕消息,开始广播** @param testMessage Map类型的消息体* @param headers     消息头* @param channel     消息所在的管道*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "bulletSocket-queueA", durable = "true"),// 默认的交换机类型就是directexchange = @Exchange(name = "bulletFanOut-exchange", type = "fanout")))@RabbitHandlerpublic void onOriginMessage(@Payload Map testMessage, @Headers Map<String, Object> headers, Channel channel) throws IOException {log.info("***********消费开始, Socket服务A接收到广播消息*************");log.info("消费体:{}", JSONObject.toJSONString(testMessage));OriginMessage originMessage = getOriginMessage(testMessage);if (originMessage == null) {return;}// 根据roomID去找到同一个直播间下的所有用户并广播消息List<BulletScreenServer> socketGroupByRoomId = SocketCache.getSocketGroupByRoomId(originMessage.getRoomId());for (BulletScreenServer bulletScreenServer : socketGroupByRoomId) {bulletScreenServer.getSession().getBasicRemote().sendText(JSONObject.toJSONString(originMessage));}// 确认消息Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);channel.basicAck(deliveryTag, false);}public OriginMessage getOriginMessage(Map testMessage) {String messageJson = (String) testMessage.get(SocketConstants.MESSAGE);if (StringUtils.isBlank(messageJson)) {return null;}OriginMessage originMessage = JSONObject.parseObject(messageJson, OriginMessage.class);return originMessage;}
}

另一个Socket服务则改一下消费者的监听队列和日志内容即可:

在这里插入图片描述

2.4 测试

打开同一个直播间的两个用户,让两个WebSocket正好建立到不同的服务器上:
在这里插入图片描述
此时Socket服务A:
在这里插入图片描述

Socket服务B:
在这里插入图片描述
页面A中随便发送一条弹幕:
在这里插入图片描述
页面B中随便发送一条弹幕:
在这里插入图片描述

1.前端发送一条弹幕,后端监听到,开始向预处理队列丢消息。
在这里插入图片描述
2.service-bulletscreen服务,监听到预处理队列数据,开始进行处理。

在这里插入图片描述
3.经过一系列校验和幂等性处理之后,将处理完的弹幕通过交换机发送给广播队列:
在这里插入图片描述
4.Socket服务B接收到消息:
在这里插入图片描述

Socket服务A接收到广播消息:
在这里插入图片描述

5.前端页面展示:

页面A:
在这里插入图片描述

页面B:
在这里插入图片描述

到这里,一个聊天服务就完成了。不过大家也看到在线人数这块咱没做。可以用Redis缓存来记录每个直播间的人数。这个功能放到下一篇文章来讲。

http://www.dtcms.com/wzjs/52983.html

相关文章:

  • 百度链接地址抖音seo排名系统
  • 在线培训网站怎么做广东广州网点快速网站建设
  • 直接IP做访问我服务器的网站张雷明任河南省委常委
  • 商城建站系统多少钱外贸谷歌优化
  • 成都公司网站设计套餐谷歌浏览器官网下载安装
  • wordpress微信分享代码seo指什么
  • 会议网站建设百度影响力排名顺序
  • 重庆网络公司网站建设黄页网络的推广软件
  • 文昌网站 做炸饺子免费引流人脉推广软件
  • 深圳市宝安区做网站建设的企业网站流量数据分析
  • 受和攻不停的做漫画网站百度智能云官网
  • 网店怎么开要多少钱榆林百度seo
  • wordpress 锚点 插件seo优化网站的注意事项
  • 专业网页设计培训企业网站seo公司
  • 有哪些ui的设计网站百度广告联盟app下载官网
  • 源码网站模板外贸营销系统
  • 如何设计网站的主菜单西安seo学院
  • 东港区网站制作四川企业seo推广
  • 电子商务网站运营流程衡阳seo服务
  • 网页显示网站正在建设中怎么做成功的网络营销案例及分析
  • 制作动画网站模板精准营销推广
  • 做的比较好的p2p网站搜索引擎竞价排名
  • icp网站备案密码找回谷歌seo视频教程
  • 自己做网站美工独立网站
  • 网站建设可以帮助企业安徽百度关键词优化
  • 青岛市网站建设公司托管竞价账户哪家好
  • 企业网站手机版模板免费下载有哪些平台可以免费发广告
  • 利用小偷程序做网站seo查询工具
  • 延安网站制作惠州seo网络推广
  • 网站空间10gseo是什么专业的课程