当前位置: 首页 > wzjs >正文

成都的网站苏州软件定制开发公司

成都的网站,苏州软件定制开发公司,域名注册网,内容营销1. 简介 RabbitMQ 的消息发送流程: producer 将消息发送给 broker,consumer 从 broker 中获取消息并消费 那么在这里就涉及到了两种消息发送,即 producer 与 broker 之间和 consumer 与 broker 之间。 “消息确认” 讨论的是 consumer 与…

1. 简介

RabbitMQ 的消息发送流程:

  • producer 将消息发送给 broker,consumer 从 broker 中获取消息并消费

那么在这里就涉及到了两种消息发送,即 producer 与 broker 之间和 consumer 与 broker 之间。

“消息确认” 讨论的是 consumer 与 broker 之间的消息发送。

2. 为什么会有这个特性

当 broker 给 consumer 发送消息时,可能会出现下面两种情况:

  • 消息未成功到达 consumer;
  • 消息成功到达 consumer,但是 consumer 没有成功消费这条消息,如:在处理消息时发生异常等情况。

这时,就需要有一种解决方案,保证 broker 与 consumer 之间消息传输的可靠性,于是就有了消息确认这一特性。

3. 使用 RabbitMQ Java 时如何进行消息确认(不是重点)

public class ConsumerDemo1 {public static void main(String[] args) throws IOException, TimeoutException {//建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USERNAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();//创建信道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);//声明队列//如果队列不存在,就创建channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);//消费消息DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);}
}

这是一段路由模式的代码,在这段代码中,有下面一条语句:

channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);

在这个方法中,有三个参数:

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
  • queue:consumer 通过哪个队列获取 broker 发送的消息;
  • autoAck:是否自动确认;
  • callback:consumer 消费消息的逻辑。

其中,autoAck 就是消息确认的体现:

  • autoAck 为 true:RabbitMQ 会将发送给 consumer 的消息视为已被成功接收和消费(consumer 可能并没有成功接收到或成功消费,但是 RabbitMQ 不管了),就会被将这条消息删除;
  • autoAck 为 false:当 RabbitMQ 发送消息后,并不会马上就将消息删除,而是会等 consumer 调用 Basic.Ack,收到 ack 后,才会将消息删除。

将 autoAck 设置为 false 后,若 broker 长时间没有收到 consumer 发送的 ack 且 consumer 已经断开连接,就会将这条消息重新入队列,继续发送给 consumer 进行消费,此时,队列中的消息就分为了两种:

  • 还未被发送的消息;
  • 已经发送了的消息,但是没有收到 ack 而重新入队列等待被消费。

4. 在 spring 中使用 RabbitMQ 时如何进行消息确认

4.1 basicAck

在 spring 下的 Channel 类中提供了下面几种方法:

void basicAck(long deliveryTag, boolean multiple) throws IOException;

在这个方法中,有三个参数:

  • deliveryTag:是 broker 给 consumer 发送消息的唯一标识,在一个 channel 中 deliveryTag 是唯一的;
  • mulitple: 是否批量确认

使用这个方法后,就会告知 broker 这条消息已经成功被消费,可以将其删除。

4.2 basicNack

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

 在这个方法中,多了一个参数:

  • requeue:是否重新入队列。

使用这个方法,就相当于给 broker 发送 nack,即这条消息没有被正确消费。

若 requeue 为 true,就会将这条消息重新入队列,继续给 consumer 消费;

若 requeue 为 false,broker 就会这条消息删除。

4.3 basicReject

void basicReject(long deliveryTag, boolean requeue) throws IOException;

这个方法与 basicNack 大致相同,此处省略。

4.4 配置

在 spring 中,提供了三种配置用于消息确认:

  • none:当消息发送给 consumer,不管 consumer 是否成功消费了消息,broker 都会当作这条消息被成功消费了,然后删除这条消息;
  • auto:在 consumer 处理消息时没有抛出异常时,就会确认消息,反之就不会确认,并且将消息重新放入队列中,进行下一次的消费;
  • manual:手动确认,我们需要在代码中指定这条消息是消费成功还是消费失败,分别使用 basicAck 和 basicNack。
spring:rabbitmq:listener:simple:acknowledge-mode: none

5. 代码测试

@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {String messageInfo = "consumer ack mode test...";rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, Constants.ACK_ROUTINGKEY, messageInfo);return "消息发送成功";}
}

这段代码代表的是一个 producer,下面接收到的消息都是通过这段代码发送的。

5.1 none

① 无异常时的消费者代码:

    @RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}

代码运行结果如下:

我们可以通过访问 RabbitMQ 客户端来观察这条消息是否成功被消费:

 

可以看到,Messages 这一列中,Ready 和 Unacked 都为 0,表示消息被成功消费。 

 ② 有异常时的消费者代码:

    @RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();int num = 1 / 0;log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}

代码运行结果如下:

由于我们使用了除零操作,于是抛出了异常,我们可以通过访问 RabbitMQ 来观察这条消息是否被删除:

和上面一样,在 broker 中这条消息已经被删除,这与 none 配置性质一致。

5.2 auto 

① 无异常时的消费者代码:

@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}}

代码运行结果如下:

在 RabbitMQ 客户端中显示,这条消息已经被成功消费:

 

② 有异常时的消费者代码:

@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();int num = 1 / 0;log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);}}

 代码运行结果如下:

在运行结果中,一直会有报错产生,并且都是两个两个为一组,并且在报错信息中可以看到,producer 发送的消息一直在被消费,这是因为存在异常,就会导致这条消息一直在队列中,通过观察 RabbitMQ 客户端可以看出,这条消息依然保存在队列中:

5.3  manual

① 无异常的消费者代码如下:

@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}}
}

在这段代码中,我们使用了 basicAck 和 basicNack 来进行消息确认,当消息处理成功后,就会执行 basicAck,告诉 broker 这条消息已经被成功消费,可以将其删除;当消息执行发生异常后,就会执行 basicNack,并且根据 requeue 参数决定如何处理这条消息。

代码运行结果如下:

RabbitMQ 客户端显示这条消息被成功消费:

 

 ② 有异常的消费者代码如下:

当 requeue 为 true:

@Component
@Slf4j
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void listener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {log.info("接收消息: {}; deliveryTag: {}", new String(message.getBody()), deliveryTag);int n = 1 / 0;channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, true);}}
}

在此处的 basicNack,将 requeue 设置为了 true,当消息处理失败后,就会将消息重新入队列,重新被消费:

我们可以看到,这条消息一直在被消费,并且 delivertTag 在递增。

并且从 RabbitMQ 客户端中可以看到,这条消息依然存在,等待被成功消费:

 当 requeue 为 false:

当处理消息发生异常后,就会将消息从队列中删除。

代码运行结果如下:

虽然异常依然存在,但是消息却没有重复发送,并且 RabbitMQ 中也将这条消息删除:

 

 


文章转载自:

http://r87e7LCT.mdmxf.cn
http://mGk3RiSj.mdmxf.cn
http://FjLkkS1Q.mdmxf.cn
http://mVmU7w6J.mdmxf.cn
http://mkJw9wFT.mdmxf.cn
http://kZcpzM8T.mdmxf.cn
http://oUbJC4Bz.mdmxf.cn
http://iYFVfmha.mdmxf.cn
http://zLqfwpaq.mdmxf.cn
http://scnDGJWB.mdmxf.cn
http://aDyi4xp3.mdmxf.cn
http://PGOZPy6p.mdmxf.cn
http://oBGDgCH5.mdmxf.cn
http://rP7o0hIm.mdmxf.cn
http://epPL3PwM.mdmxf.cn
http://05D2QQcG.mdmxf.cn
http://7VMgNn2c.mdmxf.cn
http://pYZJz3dO.mdmxf.cn
http://duiXC7HM.mdmxf.cn
http://kEr4cqDE.mdmxf.cn
http://1iDKEgzr.mdmxf.cn
http://79zOr8ox.mdmxf.cn
http://bVoC2AND.mdmxf.cn
http://UCHWn5W2.mdmxf.cn
http://VfwITkX6.mdmxf.cn
http://2mtl4e6g.mdmxf.cn
http://6a5EBcjt.mdmxf.cn
http://ClrYCcT0.mdmxf.cn
http://thv4cJ0a.mdmxf.cn
http://Nnk54oae.mdmxf.cn
http://www.dtcms.com/wzjs/748423.html

相关文章:

  • 安卓优化大师官网下载现在网站优化
  • 网站关键词优化推广哪家好做企业网站对企业的好处
  • 本溪建设银行网站类似凡科建站的平台
  • 垂直 网站开发宝安网站建设zrare
  • 广州网站建设需要多少费用wordpress 商户插件
  • 爱用建站平台的优势深圳做网站设计公司
  • 四川做网站的网站托管是什么
  • 建设银行档案管理网站百度知道一下首页
  • 服务器部署php网站汕头网站搭建多少钱
  • 网站建设三层架构实训报告wordpress 博客主题
  • 专业网站的建设设行吗做火影网站背景图
  • 深圳网站建设高端工程房地产行业一条龙网站
  • 网站开发需要大学吗长春网站建设net
  • 基于C 的网站开发源码网站设计公司怎么样
  • 网站建设提高信息光谷软件园 网站建设
  • 北京做网站公司哪家强手机网站的好处
  • wordpress 数据字典网站seo的优化怎么做
  • 如何查看网站权重一个考试网站怎么做
  • wordpress语言设置谷歌优化排名哪家强
  • 网站筹备建设情况网站开发调查表
  • 网站设计与制作说明书DW网站建设出现哪些问题
  • 建站行业的利润网页课程设计
  • 如何设置网站兼容性网站建设包括内容
  • 网站建设的误区预备网络推广方案
  • 建设网站上海wordpress 数据库设置
  • 自己的网站怎么在百度上面推广做的比较好的企业网站
  • 中卫网站设计厂家wordpress 更新过慢
  • 去除wordpress版本临沧seo
  • 有学做衣服的网站吗万博法务网站建设项目
  • 衡阳网站建设设计泌阳县住房建设局网站