当前位置: 首页 > wzjs >正文

石河子做网站的公司google 推广优化

石河子做网站的公司,google 推广优化,九龙坡网站建设公司,武汉商城网站建设死信交换机(Dead Letter Exchange,DLX) 定义:死信交换机是一种特殊的交换机,专门用于**接收从其他队列中因特定原因变成死信的消息**。它的本质还是交换机,遵循RabbitMQ中交换机的基本工作原理&#xff0c…

死信交换机(Dead Letter Exchange,DLX)

  • 定义:死信交换机是一种特殊的交换机,专门用于**接收从其他队列中因特定原因变成死信的消息**。它的本质还是交换机,遵循RabbitMQ中交换机的基本工作原理,如根据路由规则将消息发送到绑定的队列。
  • 作用:为死信提供一个集中处理的入口点。通过将死信发送到死信交换机,再由其路由到相应的死信队列,可以方便地对这些异常消息进行统一管理和处理,确保数据不丢失。

死信队列(Dead Letter Queue,DLQ)

  • 定义:死信队列用于存储那些无法在正常流程中被消费的消息,即死信。这些消息进入死信队列后,可以后续进行分析、重试或其他特殊处理。
  • 产生死信的原因
    • 消息被拒绝且不重新入队:消费者调用basic.rejectbasic.nack方法拒绝消息,并将requeue参数设置为false,表明该消息不再重新放回原队列等待消费,从而成为死信。
    • 消息过期:可以为消息或队列设置生存时间(TTL,Time-To-Live)。当消息在队列中的存活时间超过设定的TTL值时,消息就会过期成为死信。消息的TTL既可以在发送消息时针对单条消息设置,也可以在声明队列时对队列中的所有消息统一设置。
    • 队列达到最大长度:当为队列设置了最大长度(Max-Length),并且队列中的消息数量达到这个上限时,新进入的消息会被丢弃成为死信。

代码举例

下面将用代码举例,由于消息过期而进入死信队列

初始化RabbitMQ的连接配置、队列和交换机的声明

/*** RabbitMQ配置类* 负责管理RabbitMQ的连接配置、队列和交换机的声明*/
@Slf4j
public class RabbitMQConfig {// 普通队列和死信队列的配置常量public static final String NORMAL_QUEUE = "normal.queue";      // 普通队列名称public static final String DLX_QUEUE = "dlx.queue";           // 死信队列名称public static final String NORMAL_EXCHANGE = "normal.exchange"; // 普通交换机名称public static final String DLX_EXCHANGE = "dlx.exchange";     // 死信交换机名称public static final String NORMAL_ROUTING_KEY = "normal.routing.key"; // 普通路由键public static final String DLX_ROUTING_KEY = "dlx.routing.key";      // 死信路由键/*** 创建RabbitMQ连接** @return Connection RabbitMQ连接对象* @throws Exception*/public static Connection createConnection() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("xxxx");    // 设置RabbitMQ服务器地址factory.setPort(5672);           // 设置RabbitMQ服务器端口factory.setUsername("xxxx");    // 设置用户名factory.setPassword("xxxx");    // 设置密码return factory.newConnection();  // 创建并返回新的连接}/*** 初始化RabbitMQ的队列和交换机* 包括:* 1. 删除已存在的队列和交换机* 2. 声明死信交换机和队列* 3. 声明普通交换机和队列* 4. 设置队列的死信参数* 5. 绑定队列和交换机** @throws Exception*/public static void init() throws Exception {try (Connection connection = createConnection();Channel channel = connection.createChannel()) {// 删除已存在的队列和交换机try {channel.queueDelete(NORMAL_QUEUE);channel.queueDelete(DLX_QUEUE);channel.exchangeDelete(NORMAL_EXCHANGE);channel.exchangeDelete(DLX_EXCHANGE);} catch (Exception e) {// 忽略删除不存在的队列或交换机时的错误log.warn("删除队列或交换机时出错(可能是首次创建): {}", e.getMessage());}// 声明死信交换机,类型为direct,持久化channel.exchangeDeclare(DLX_EXCHANGE, "direct", true);// 声明死信队列,持久化channel.queueDeclare(DLX_QUEUE, true, false, false, null);// 将死信队列绑定到死信交换机,使用死信路由键channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_ROUTING_KEY);// 声明普通交换机,类型为direct,持久化channel.exchangeDeclare(NORMAL_EXCHANGE, "direct", true);// 设置普通队列的死信参数Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DLX_EXCHANGE);     // 设置死信交换机args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); // 设置死信路由键// 声明普通队列,并应用死信参数channel.queueDeclare(NORMAL_QUEUE, true, false, false, args);// 将普通队列绑定到普通交换机,使用普通路由键channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY);}}
} 

消息生产者

/*** 消息生产者类* 负责向RabbitMQ发送消息*/
@Slf4j
public class MessageProducer {/*** 发送消息到普通队列* 该方法会:* 1. 创建RabbitMQ连接和通道* 2. 将消息发布到普通交换机* 3. 使用try-with-resources自动关闭连接和通道* * @param message 要发送的消息内容* @throws Exception */public void sendMessage(String message) throws Exception {// 使用try-with-resources自动管理连接和通道的关闭try (Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel()) {// 打印发送的消息内容log.info("发送消息: {}", message);// 发布消息到普通交换机// 参数说明:// 1. 交换机名称// 2. 路由键// 3. 消息属性(这里为null表示使用默认属性)// 4. 消息内容(转换为字节数组)channel.basicPublish(RabbitMQConfig.NORMAL_EXCHANGE,RabbitMQConfig.NORMAL_ROUTING_KEY,null,message.getBytes());}}/*** 发送带TTL的消息到普通队列* 该方法会:* 1. 创建RabbitMQ连接和通道* 2. 设置消息的TTL属性* 3. 将消息发布到普通交换机* 4. 使用try-with-resources自动关闭连接和通道* * @param message 要发送的消息内容* @param ttl 消息的过期时间(毫秒)* @throws Exception 如果发送过程中出现错误则抛出异常*/public void sendMessageWithTTL(String message, int ttl) throws Exception {// 使用try-with-resources自动管理连接和通道的关闭try (Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel()) {// 设置消息属性,包括TTLAMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(String.valueOf(ttl)).build();// 打印发送的消息内容log.info("发送消息: {}, TTL: {}ms", message, ttl);// 发布消息到普通交换机// 参数说明:// 1. 交换机名称// 2. 路由键// 3. 消息属性(包含TTL)// 4. 消息内容(转换为字节数组)channel.basicPublish(RabbitMQConfig.NORMAL_EXCHANGE,RabbitMQConfig.NORMAL_ROUTING_KEY,properties,message.getBytes());}}
} 

消息消费者

/*** 消息消费者类* 负责从普通队列和死信队列中消费消息*/
@Slf4j
public class MessageConsumer {/*** 消费普通队列中的消息* 该方法会:* 1. 创建RabbitMQ连接和通道* 2. 设置预取计数为1,确保公平分发* 3. 创建消费者回调处理消息* 4. 确认消息处理完成** @throws Exception 异常*/public void consumeNormalQueue() throws Exception {// 创建RabbitMQ连接和通道Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel();// 设置预取计数为1,确保公平分发,避免某个消费者处理过多消息channel.basicQos(1);// 创建普通队列消费者回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 获取消息内容String message = new String(delivery.getBody(), StandardCharsets.UTF_8);log.info("收到普通队列消息: {}", message);// 模拟消息处理耗时try {Thread.sleep(20000);} catch (InterruptedException e) {throw new RuntimeException(e);}// 确认消息处理完成channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 开始消费普通队列// 参数说明:// 1. 队列名称// 2. 是否自动确认消息(false表示手动确认)// 3. 消息处理回调// 4. 消费者取消回调(这里为空实现)channel.basicConsume(RabbitMQConfig.NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});}/*** 消费死信队列中的消息* 该方法会:* 1. 创建RabbitMQ连接和通道* 2. 设置预取计数为1,确保公平分发* 3. 创建消费者回调处理消息* 4. 确认消息处理完成** @throws Exception 异常*/public void consumeDlxQueue() throws Exception {// 创建RabbitMQ连接和通道Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel();// 设置预取计数为1,确保公平分发channel.basicQos(1);// 创建死信队列消费者回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 获取消息内容String message = new String(delivery.getBody(), StandardCharsets.UTF_8);log.info("收到死信队列消息: {}", message);// 确认消息处理完成channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 开始消费死信队列// 参数说明:// 1. 队列名称// 2. 是否自动确认消息(false表示手动确认)// 3. 消息处理回调// 4. 消费者取消回调(这里为空实现)channel.basicConsume(RabbitMQConfig.DLX_QUEUE, false, deliverCallback, consumerTag -> {});}
} 

测试

@Slf4j
public class DLXTest {private static final int THREAD_COUNT = 1;  // 并发线程数private static final int MESSAGE_COUNT = 2; // 每个线程发送的消息数/*** 主方法,执行死信队列测试流程* 测试流程:* 1. 初始化RabbitMQ的队列和交换机* 2. 创建生产者和消费者实例* 3. 启动普通队列和死信队列的消费者线程* 4. 使用线程池发送测试消息* 5. 等待消息处理完成** @param args* @throws Exception*/public static void main(String[] args) throws Exception {// 初始化RabbitMQ的队列和交换机RabbitMQConfig.init();// 创建生产者和消费者实例MessageProducer producer = new MessageProducer();MessageConsumer consumer = new MessageConsumer();// 启动普通队列消费者线程new Thread(() -> {try {consumer.consumeNormalQueue();} catch (Exception e) {log.error("普通队列消费者异常", e);}}).start();// 启动死信队列消费者线程new Thread(() -> {try {consumer.consumeDlxQueue();} catch (Exception e) {log.error("死信队列消费者异常", e);}}).start();// 创建线程池和计数器ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);CountDownLatch latch = new CountDownLatch(THREAD_COUNT);// 提交任务到线程池for (int i = 0; i < THREAD_COUNT; i++) {final int threadId = i;executorService.submit(() -> {try {// 每个线程发送MESSAGE_COUNT条消息for (int j = 0; j < MESSAGE_COUNT; j++) {// 随机生成消息TTL(1-30秒)int ttl = (int) (Math.random() * 30000) + 1000;String message = String.format("消息-线程%d-第%d条 (消息TTL: %dms)", threadId + 1, j + 1, ttl);producer.sendMessageWithTTL(message, ttl);// 随机延迟0-100ms,模拟真实场景Thread.sleep((long) (Math.random() * 100));}} catch (Exception e) {log.error("发送消息异常", e);} finally {latch.countDown();}});}// 等待所有消息发送完成latch.await();log.info("所有消息已发送完成");// 关闭线程池executorService.shutdown();executorService.awaitTermination(1, TimeUnit.MINUTES);// 保持程序运行,等待消息处理完成Thread.sleep(60000);log.info("测试完成");}
} 

从结果可以看出,第一条消息 ttl 为 28301ms,被普通消费者进行消费,而产生的第二条消息得到 ttl 为 4332ms,由于第一条消息在消费时耗时较久,在此期间 第二条消息已经过期,不得不进入死信队列,由死信消费者进行处理,从前面的日志时间也可以看出,刚好间隔 4s 左右。

在这里插入图片描述

http://www.dtcms.com/wzjs/241021.html

相关文章:

  • 网站建设风景课程设计报告什么软件可以发帖子做推广
  • 武汉网站seo外包企业网站建设专业服务
  • 做电商图的设计网站百度推广app
  • 最好的建设网站适合seo软件
  • 网站设计中遇到的问题计算机培训机构排名
  • 六安市住房城乡建设委员会网站seo黑帽技术有哪些
  • 网站怎么备案啊百度权重排名
  • 衢州装饰装修网站上海百度推广官方电话
  • 网站开发毕设需求分析福州网站开发公司
  • 经典网站欣赏、网络营销企业是什么
  • 个人建 行业 网站网站seo搜索引擎优化怎么做
  • 十堰h5响应式网站百度推广多少钱
  • 功能网站建设宁波网站建设
  • 广州网页设计公司排名一键优化免费下载
  • 专业做外贸的网站萧山seo
  • 找人做网站需要注意什么什么是网络整合营销
  • 做展板好的网站企业网站推广公司
  • wordpress百度地图沈阳seo顾问
  • 长沙网站开发的网站厦门关键词优化企业
  • 男人和女人做污的视频网站深圳网络营销推广招聘网
  • 帮客户做传销网站360网站推广费用
  • 给银行做网站aso优化是什么
  • 织梦做的网站首页打不开seo内部优化方式包括
  • 网站页面做互联网推广软件
  • 财务管理做的好的门户网站搜狐新闻手机网
  • 校园类网站模板网页设计制作
  • 市场营销策划案重庆优化seo
  • 中英文网站怎么做的厦门人才网招聘
  • 天河区住房和建设水务局官方网站软文之家
  • 学校 网站建设工作小组哈尔滨网络seo公司