当前位置: 首页 > wzjs >正文

河南做网站的公司学生做家教网站

河南做网站的公司,学生做家教网站,上海开办企业一窗通网上服务平台,网站整体建设方案一、消息确认机制 生产者发送的消息,可能有以下两种情况: 1> 消息消费成功 2> 消息消费失败 为了保证消息可靠的到达消费者(!!!注意:消息确认机制和前面的工作模式中的publisher confi…

一、消息确认机制

生产者发送的消息,可能有以下两种情况:

1> 消息消费成功

2> 消息消费失败

为了保证消息可靠的到达消费者(!!!注意:消息确认机制和前面的工作模式中的publisher confirms模式有很大区别,消息确认保证的是消息可靠的到达消费者,而publisher confirms保证的是消息可靠的到达RabbitMQServer),RabbitMQ引入了消息确认机制:

消费者在消费消息时,可以指定autoAck参数,对应着两种确认方式

(1)自动确认:消息只要到达消费者就会自动确认,不会考虑消费者是否正确消费了这些消息,直接从 内存/磁盘 中删除消息;

(2)手动确认:消息到达消费者,不会自动确认,会等待消费者调用Basic.Ack命令,才会从内存/磁盘 移除这条消息。

DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}
};
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);

如果将basicConsume中的true改为false,对于RabbitMQ服务器来说,消息分成两个部分:

1>  未发送给消费者的消息;

2>  已经发送给消费者,但还没有被确认的消息。

对应管理界面:


二、手动确认方法

RabbitMQ提供了两种手动确认,分别是肯定确认和否定确认,对应着3个方法:

(1)肯定确认:Channel.basicAck(long deliveryTag, boolean multiple)

    其中,deliveryTag为消息的唯一表示,在每一个channel中都是唯一的,相当于TCP协议中的序号的作用,用来确认哪条消息已经收到,multiple为true表示是否批量确认,同样与TCP中确认序号的作用类似,表示在这个deliveryTag前的消息都已收到,为false则表示一次只确认一条消息。

(2)否定确认: Channel.basicReject(long deliveryTag, boolean requeue)

   如果消息到达消费者后,消费者未正确处理这条消息(如发生异常),就可以通过这个方法进行否定确认,其中,参数requeue表示这条消息是否需要重新入队,这个方法一次只能确认一条消息。

(3)否定确认:Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

   与basicAck一样,这个方法可以通过multiple来实现批量确认


三、使用Spring Boot演示消息确认机制

  演示之前,需要先了解Spring-AMQP的确认机制,它和RabbitMQ JDK Client库的确认机制有些许不同:

1. AcknowledgeMode.NONE

  和RabbitMQ JDK Client库的自动确认机制一样,只要消息到达消费者,这条消息就会从队列中移除。

2. AcknowledgeMode.AUTO(和JDK Client库的区别)

  消息到达消费者且处理成功,才会自动确认,如果消息在处理过程中发生了异常,则不会自动确认。

3. AcknowledgeMode.MANUAL

  和JDK Client库一样,属于手动确认机制,同样分为肯定确认和否定确认。

接下来,进行准备工作,创建一个Spring Boot项目,添加RabbitMQ依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加RabbitMQ相关配置:

spring:rabbitmq:addresses: amqp://study:study@110.41.17.130:5672/extensionlistener:simple:acknowledge-mode: none #表示当前确认机制未AcknowledgeMode.NONE

添加下图的包,创建对应的类:

 


3.1  AcknowledgeMode.NONE

(1)编写常量类(由于只是验证消息确认机制,可以随便使用一种工作模式)

public class Constants {public static final String ACK_QUEUE = "ack.queue";public static final String ACK_EXCHANGE = "ack.exchange";
}

(2)配置声明队列、交换机、交换机与队列绑定关系

@Configuration
public class RabbitMQConfig {//1.声明队列@Bean("ackQueue")public Queue ackQueue(){return QueueBuilder.durable(Constants.ACK_QUEUE).build();}//2.声明交换机@Bean("ackExchange")public FanoutExchange ackExchange(){return ExchangeBuilder.fanoutExchange(Constants.ACK_EXCHANGE).build();}//3.声明队列与交换机的绑定关系@Beanpublic Binding ackBinding(@Qualifier("ackExchange") FanoutExchange fanoutExchange,@Qualifier("ackQueue") Queue queue){return BindingBuilder.bind(queue).to(fanoutExchange);}
}

(3)编写生产者代码

@RestController
@RequestMapping("/producer")
public class ProducerController {@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack(){rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE,"","consumer ack mode test...");return "发送消息成功";}
}

(4)编写消费者代码

@Component
public class AckListener {@RabbitListener(queues = Constants.ACK_QUEUE)public void ackListener(Message message, Channel channel) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//业务逻辑处理System.out.println("业务逻辑处理");
//            int num = 10/0;System.out.println("业务处理完成");}
}

(5)测试正常消费消息的情况


(6)测试消息消费异常情况(将消费者代码中得 int num  = 10/0  注解打开)


(7)总结

  经过确认,可以发现AcknowledgeMode.NONE机制,无论消费者是否正确消费消息,都会自动确认,不会保留异常消费的消息


3.2 AcknowledgeMode.AUTO

(1) 修改配置文件中的 acknowledge-mode: none 为 acknowledge-mode: auto

(2)演示消息正常消费的情况(将 int num = 10/0 注释)

  @RabbitListener(queues = Constants.ACK_QUEUE)public void ackListener(Message message, Channel channel) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//业务逻辑处理System.out.println("业务逻辑处理");
//            int num = 10/0;System.out.println("业务处理完成");}

运行程序,访问 producer/ack 接口发送消息:


(3)演示消息处理异常情况(取消 int num = 10/0 的注释)

 @RabbitListener(queues = Constants.ACK_QUEUE)public void ackListener(Message message, Channel channel) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息:%s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//业务逻辑处理System.out.println("业务逻辑处理");int num = 10/0;System.out.println("业务处理完成");}

运行程序,访问生产者端口:

消息没有成功消费,如果此时我们修改代码(将 int num = 10/0 注释,使程序不再发生异常),再次运行程序,队列中保存的消息就会被正常消费:


3.3 AcknowledgeMode.MANUAL

  (1) 修改配置文件中 acknowledge-mode: auto 为 acknowledge-mode: manul

(2)修改消费者代码(由于是手动确认,需要在代码中添加正常消费时的 “肯定确认” 和 消费异常时的 “否定确认”)

    @RabbitListener(queues = Constants.ACK_QUEUE)public void ackListener(Message message, Channel channel) throws Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try{System.out.printf("接收到消息:%s,deliveryTag: %d \n",new String(message.getBody(),"UTF-8"),message.getMessageProperties().getDeliveryTag());//业务逻辑处理System.out.println("业务逻辑处理");
//                int num = 10/0;System.out.println("业务处理完成");//消息正常消费,肯定确认channel.basicAck(deliveryTag,false);//单条确认}catch (Exception e){//消息消费异常,否定确认channel.basicNack(deliveryTag,false,true);//单条否定,异常后重新入队}}

(3)消息正常消费时的情况(注释 int num = 10/0)

接下来注释掉手动肯定确认的这行代码,看看会发生什么情况:

 //消息正常消费,肯定确认//channel.basicAck(deliveryTag,false);//单条确认

再次运行程序并通过接口 producer/ack 发送消息:

可以看到,在AcknowledgeMode.MANUAL机制下,如果不手动确认,队列不会移除已经被正常消费的消息:


(3)消息消费异常的情况下(取消 int num = 10/0 的注释)

接下来注释掉channel.basicNack,运行程序,访问接口发送消息:

如果将参数requeue置为false,会怎么样?

channel.basicNack(deliveryTag,false,false);//单条否定,重新发送消息

运行程序:


(4)总结

1> 无论是否正常处理消息都要进行手动确认;

2> 正常处理消息但未手动确认,管理界面中的队列会有 一条/多条 Unacked 的消息(重新启动程序后会重新消费);

3> 异常处理消息且未手动确认,也会有 一条/多条 Unacked 的消息(重启程序同样重新消费);

4> 如果异常处理,将requeue置为false,队列不会保存 这条/多条 异常消费的消息

                                               置为true,队列会不断重新发送 这条/多条 消息


文章转载自:

http://HZDuW20B.cndxL.cn
http://OQzOtBkN.cndxL.cn
http://9RsLEgJT.cndxL.cn
http://n9KB9PSG.cndxL.cn
http://ZxqjmaOr.cndxL.cn
http://RGXL7tau.cndxL.cn
http://mvVGsQYM.cndxL.cn
http://gf7pcnLp.cndxL.cn
http://7hoVfNMB.cndxL.cn
http://mYsk73c8.cndxL.cn
http://0tfOWodS.cndxL.cn
http://8kYpRBOE.cndxL.cn
http://CqoEctm7.cndxL.cn
http://xSxF8ccz.cndxL.cn
http://lmPgCw7O.cndxL.cn
http://42CMmOVj.cndxL.cn
http://mYZT5P1B.cndxL.cn
http://RPv9llQW.cndxL.cn
http://VcPWxz8p.cndxL.cn
http://tTuTLFqB.cndxL.cn
http://3FNPVsdb.cndxL.cn
http://xpd2LV0P.cndxL.cn
http://McKBklRB.cndxL.cn
http://yicRJ6Nh.cndxL.cn
http://XkYSCM3M.cndxL.cn
http://7T5PQaNP.cndxL.cn
http://ocrgs82l.cndxL.cn
http://zxKKxZS7.cndxL.cn
http://44O0Dl78.cndxL.cn
http://YTxU9WTP.cndxL.cn
http://www.dtcms.com/wzjs/767412.html

相关文章:

  • 做采集网站赚钱网站建设与运营的预算方案
  • 模板网站缺点中国有哪些企业
  • 中国铁路建设监理协会官方网站不良网站正能量进入窗口
  • 前端网站开发的公用头部金华市住房和城乡建设局网站
  • 网站制作公司北京网站建设公司哪家好兰州市城乡建设局网官网站
  • 易语言做电影网站源码做义工的网站
  • 果洛wap网站建设比较好拆车件交易网
  • 重庆网站建设cqsday电商网页图片设计
  • 江苏网站建设哪家专业做网站没流量
  • 天津高端网站制作做网站注册页面模板
  • 外贸建站主机空间哪家好sns社交网站建设
  • 淄博手机网站建设报价10个不愁销路的小型加工厂
  • 网上商城网站建设公司手机做炫光图头像的网站
  • 网站广告源码成都企业展厅设计成都企业展厅设计公司
  • 苍南住房和城乡规划建设局网站crm公司
  • 怎样做模具钢网站百合居装饰公司官网
  • h5技术建设网站的知识集团公司网站案例
  • 合肥网站建设之4个细节要注意网页制作是计算机什么专业
  • wordpress柚子皮5.31龙岩seo
  • 专业的单位网站建设个人网页官方网站
  • 论坛购物网站开发网站后台ftp替换图片怎么做
  • 模板建站配云服务器施工蓬莱网页设计
  • ip网站架设用wordpress开发网站模板
  • 淘宝店铺网站建设可行性报告做网站的标题图片
  • 网站建设教程免费传媒公司有哪些
  • 江都区城乡建设局网站无锡网站建设培训学校
  • 网站空间不支持php学做网站论坛好吗
  • 网站设计与网页制作在线中企动力官网登录
  • 淮安网站建设找谁好服务于中小企业建网站
  • 分享公众号的网站开办网站需要什么资质