当前位置: 首页 > news >正文

建设银行的网站查询密码创意网络广告

建设银行的网站查询密码,创意网络广告,南和县建设局黄页网站,清镇网站建设目录 一、消息确认 1.消息确认机制 2.手动确认方法 二、代码示例 1. AcknowledgeMode.NONE 1.1 配置文件 1.2 生产者 1.3 消费者 1.4 运行程序 2.AcknowledgeMode.AUTO 3.AcknowledgeMode.MANUAL 一、消息确认 1.消息确认机制 生产者发送消息之后,到达消…

目录

一、消息确认

1.消息确认机制

2.手动确认方法

二、代码示例

1. AcknowledgeMode.NONE

1.1 配置文件

1.2 生产者

 1.3 消费者

1.4 运行程序 

 2.AcknowledgeMode.AUTO

3.AcknowledgeMode.MANUAL


一、消息确认

1.消息确认机制

生产者发送消息之后,到达消费端之后,可能会有以下情况:

1. 消息处理成功;

2. 消息处理异常。

RabbitMQ向消费者发送消息后,就会把这条消息删除掉,那么第二种情况就会造成消息丢失。

那么如何确保消息端已经被成功接收了并且被正确处理了呢?

为了确保消息从队列可靠的到达消费者,RabbitMQ提供了消息确认机制(Messageacknowledment)。

消费者在订阅队列时,可以指定autoAck参数,根据这个参数,消息确认机制分为以下两种:

自动确认:当autoAck等于true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正的接收到消息,自动确认模式适用于对于消息可靠性要求不高的场景。

手动确认:当autoAck等于false时,RabbitMQ会等待消费者显式的调用BasicAck命令,回复确认信号后才从内存(或者磁盘)中删除,这种方式适用于对消息可靠性要求较高的场景。

自动确认代码示例:

DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息: " + new String(body));}
};
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);

当autoAck参数置为false,对于RabbitMQ服务器来说,队列中的消息分为了两个部分:

一是等待发送给消费者的消息;二是已经发送给消费者,但是还没收到消费者确认信号的消息。

如果RabbitMQ一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会重新安排这条消息进入队列,等待投递给下一个消费者,当然也有可能是原来的那个消费者。

从RabbitMQ的Web管理平台上也可以看到当前队列中Ready状态和Unacked状态的消息数。 

Ready:等待投递给消费者的消息数。

Unacked:已经投递给消费者,但是未收到消费者确认信号的消息数。

2.手动确认方法

消费者在收到消息后,可以选择确认,也可以选择跳过或者直接拒绝确认,RabbitMQ也提供了不同的确认方法,消费者客户端可以调用与其对应的channel的相关方法,共有以下三种:

肯定确认: Channel.basicAck(long deliveryTag, boolean multiple);

RabbitMQ 已经知道该消息并且成功的处理消息,可以将其丢弃。

参数说明:

deliveryTag:消息的唯一标识,它是一个单调递增的64位的长整型值,deliveryTag是每个信道(Channel)独立维护的,所以在每个信道上都是唯一的,当消费者确认(ack)一条消息时,必须使用对应的信道进行确认。

multiple:是否批量确认,在某些情况下,为了减少网络流量,可以对一系列连续的deliveryTag进行批量确认,值为true则会一次性ack所以小于等于指定deliveryTag的消息,值为false,则只确认当前deliveryTag的消息。

deliveryTag 是RabbitMQ中消息确认机制的⼀个重要组成部分, 它确保了消息传递的可靠性和顺
序性。
否定确认: Channel.basicReject(long deliveryTag, boolean requeue);

参数说明:

deliveryTag:参考上文。

requeue:表示拒绝后,这条消息该如何处理,如果值为true那么,则RabbitMQ会将这条消息重新入队,重新发送给下一个订阅的消费者,值为false,则RabbitMQ会把这条消息从队列中移除,不会再发送给消费者。

否定确认: Channel.basicNack(long deliveryTag, boolean multiple,
boolean requeue);

参数说明:

参考上文 

multiple参数设置为true则表⽰拒绝deliveryTag编号之前所有未被当前消费者确认的消息。

二、代码示例

我们基于SpringBoot来演示消息的确认机制,使用方式和方法与RabbitMQ Java Client有一定差异,

Spring AMQP对消息确认提供了三种策略:

public enum AcknowledgeMode {NONE,MANUAL,AUTO;
}
. AcknowledgeMode.NONE:
这种模式下, 消息⼀旦投递给消费者, 不管消费者是否成功处理了消息, RabbitMQ 就会⾃动确认
消息, 从RabbitMQ队列中移除消息. 如果消费者处理消息失败, 消息可能会丢失.
  AcknowledgeMode.AUTO(默认):
这种模式下, 消费者在消息处理成功时会⾃动确认消息, 但如果处理过程中抛出了异常, 则不会确
认消息.
  AcknowledgeMode.MANUAL:
⼿动确认模式下, 消费者必须在成功处理消息后显式调⽤ basicAck ⽅法来确认消息. 如果消
息未被确认, RabbitMQ 会认为消息尚未被成功处理, 并且会在消费者可⽤时重新投递该消息, 这
种模式提⾼了消息处理的可靠性, 因为即使消费者处理消息后失败, 消息也不会丢失, ⽽是可以被
重新处理.

1. AcknowledgeMode.NONE

1.1 配置文件

spring:rabbitmq:addresses: amqp:listener:simple:acknowledge-mode: none

1.2 生产者

public class Constant {public static final String ACK_EXCHANGE_NAME = "ack_exchange";public static final String ACK_QUEUE = "ack_queue";
}
/*
以下为消费端⼿动应答代码⽰例配置
*/
@Bean("ackExchange")
public Exchange ackExchange(){return
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build()
;
}
//2. 队列
@Bean("ackQueue")
public Queue ackQueue() {return QueueBuilder.durable(Constant.ACK_QUEUE).build();
}
//3. 队列和交换机绑定 Binding
@Bean("ackBinding")
public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange, 
@Qualifier("ackQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("ack").noargs();
}
import com.xiaowu.rabbitmq.constant.Constant;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/producer")
public class ProductController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack(){rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack", 
"consumer ack test...");return "发送成功!";}
}

 1.3 消费者

import com.xiaowu.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AckQueueListener {//指定监听队列的名称@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws
Exception {System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(),"UTF-8"), 
message.getMessageProperties().getDeliveryTag());//模拟处理失败int num = 3/0;System.out.println("处理完成");}
}

1.4 运行程序 

启动生产者可以从RabbitMQ Web管理界面看到如下:

再启动消费者,控制台输出:

接收到消息: consumer ack test..., deliveryTag: 1
2024-04-29T17:03:57.797+08:00 WARN 16952 --- [ntContainer#0-1] 
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message 
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: 
Listener method 'public void 
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception
//....

管理界面:

可以看到消息处理失败但是消息已经从管理界面移除。 

 2.AcknowledgeMode.AUTO

将配置文件修改为:

spring:rabbitmq:addresses: amqp:listener:simple:acknowledge-mode: auto

再次启动程序,控制台不断输出错误信息:

接收到消息: consumer ack test..., deliveryTag: 1
2024-04-29T17:07:06.114+08:00 WARN 16488 --- [ntContainer#0-1] 
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message 
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: 
Listener method 'public void 
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception接收到消息: consumer ack test..., deliveryTag: 2
2024-04-29T17:07:07.161+08:00 WARN 16488 --- [ntContainer#0-1] 
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message 
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: 
Listener method 'public void 
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception接收到消息: consumer ack test..., deliveryTag: 3
2024-04-29T17:07:08.208+08:00 WARN 16488 --- [ntContainer#0-1] 
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message 
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: 
Listener method 'public void 
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception接收到消息: consumer ack test..., deliveryTag: 4
2024-04-29T17:07:09.254+08:00 WARN 16488 --- [ntContainer#0-1] 
s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message 
listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: 
Listener method 'public void 
com.xiaowu.rabbitmq.listener.AckQueueListener.ListenerQueue(org.springframework.a
mqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception'
threw exception
从⽇志上可以看出, 当消费者出现异常时, RabbitMQ会不断的重发. 由于异常,多次重试还是失败,消 息没被确认,也无法nack,就⼀直是unacked状态,导致消息积压。

3.AcknowledgeMode.MANUAL

spring:rabbitmq:addresses: amqp:listener:simple:acknowledge-mode: manual

消费者手动确认逻辑:

import com.xiaowu.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AckQueueListener {//指定监听队列的名称@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws
Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1. 接收消息System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(),"UTF-8"), 
message.getMessageProperties().getDeliveryTag());//2. 处理业务逻辑System.out.println("处理业务逻辑");//⼿动设置⼀个异常, 来测试异常拒绝机制
// int num = 3/0;//3. ⼿动签收channel.basicAck(deliveryTag, true);} catch (Exception e) {//4. 异常了就拒绝签收//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false, 
则直接丢弃channel.basicNack(deliveryTag, true,true);}}
}

 这个代码运行的结果是正常的, 运行后消息会被签收: Ready为0, unacked为0。

异常时拒绝:

import com.xiaowu.rabbitmq.constant.Constant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AckQueueListener {//指定监听队列的名称@RabbitListener(queues = Constant.ACK_QUEUE)public void ListenerQueue(Message message, Channel channel) throws
Exception {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//1. 接收消息System.out.printf("接收到消息: %s, deliveryTag: %d%n", new
String(message.getBody(),"UTF-8"), 
message.getMessageProperties().getDeliveryTag());//2. 处理业务逻辑System.out.println("处理业务逻辑");//⼿动设置⼀个异常, 来测试异常拒绝机制int num = 3/0;//3. ⼿动签收channel.basicAck(deliveryTag, true);} catch (Exception e) {//4. 异常了就拒绝签收//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false, 
则直接丢弃channel.basicNack(deliveryTag, true,true);}}
}
运⾏结果: 消费异常时不断重试, deliveryTag 从1递增
控制台日志:
接收到消息: consumer ack test..., deliveryTag: 1
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 2
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 3
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 4
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 5
处理业务逻辑
接收到消息: consumer ack test..., deliveryTag: 6
处理业务逻辑

管理页面上unacked也是1:

http://www.dtcms.com/a/513812.html

相关文章:

  • 小型手机网站建设企业如乐建站之家
  • 襄阳宜城网站建设进入wordpress
  • 免费的网站怎么建做推广任务的网站
  • 东莞手机建网站上海58招聘网最新招聘
  • 外贸网站建设内容包括哪些电子商务网站策划书2000字
  • 小说网站wordpress制作一个网站难吗
  • 网站一般怎么推广html网站 怎么做seo
  • 网站后台英文一个网站一年的费用
  • 建设科技网络网站的意义和目的wordpress怎么让文章只显示摘要
  • 网站建设书店目标客户分析太原推广型网站开发
  • 免费下载建筑图集规范的网站快速将网站seo
  • 二次元网站设计绍兴网站建设网站
  • 个人建设什么网站好初中学习网站大全免费
  • 济南做网站建设中小企业网站功能模块及数据库表
  • 官方网站开发商网络新闻专题做的最好的网站
  • 容桂网站建设公司网站开发 无代码
  • 婚礼效果图网站h5制作工具免费版
  • 如何取一个大气的名字的做网站北京国际化品牌设计
  • 如何查询网站服务商中小企业建站的方法
  • 南昌网站排名wordpress系统语言设置
  • 可以做视频创收的网站公众号开发工具下载
  • 北京的电商平台网站有哪些想建网站须要什么条件
  • 景区网站建设的好处网架报价清单表格
  • 中南路网站建设公司温州网站建设方案表
  • 有没有找客户的网站帮网贷做网站会判刑吗
  • 英文网站怎么做301跳转建筑工程挂网甩浆
  • 合肥建行网站推广是什么
  • 网站建设敬请期待图片素材济南12345官网
  • 烟台网站排名优化公司哪家好电商网站的开发形式
  • 电子商务网站建设课程设计58同城网招聘找工作建筑工程