当前位置: 首页 > news >正文

Rabbitmq集成springboot,手动确认消息basicAck、basicNack、basicReject的使用

一、手动确认消息模式

模式成功消费消费失败连接中断性能
NONE自动删除消息丢失消息丢失最高
AUTO自动删除重试/死信消息重回队列中等
MANUAL手动删除自定义处理消息重回队列最低

手动确认: 消息到达消费者,不会自动确认,会等待消费者调用Basic.Ack命令,才会从内存/磁盘 移除这条消息。
自动确认: 消息只要到达消费者就会自动确认,不会考虑消费者是否正确消费了这些消息,直接从 内存/磁盘 中删除消息;

ack:成功处理消息,RabbitMO从队列中删除该消息
nack:消息处理失败,RabbitMO需要再次投递消息
reject:消息处理失败并拒绝该消息,RabbitMO从队列中删除该消息

手动确认消息basicAck、basicNack、basicReject的使用

1、basicAck

// 确认消息处理成功,basicAck 是 RabbitMQ 的手动确认机制核心方法
// 第一个参数 tag:消息的交付标签,唯一标识这条消息  第二个参数 false:表示不批量确认,仅确认当前这条消息
channel.basicAck(deliveryTag,false);

2、basicNack

//拒绝确认消息,即告诉 RabbitMQ 我们无法处理这条消息。这会导致该消息被重新放回队列中等待再次投递。
//1 第一个参数 表示消息的交付标签,是一个单调递增的标识符,用于唯一标识队列中的一条消息。通过这个标签,RabbitMQ 能够知道你正在处理哪一条消息。
//2 第二个参数 表示是否批量确认 true 表示否定确认当前 tag 以及之前所有未确认的消息;如果为 false,则只否定确认当前 tag 对应的消息。 此处为 false,代表只处理当前这一条消息
//3 第三个参数 如果设置为 true,消息会被重新放回队列,等待再次投递给其他消费者或同一个消费者。如果设置为 false,消息不会被重新入队,
//而是根据队列的配置可能会被丢弃或者路由到死信队列(如果配置了的话)。此处为 true,表示在发生异常时将消息重新入队以便重试。
channel.basicNack(tag, false, true);

3、basicReject


// 拒绝消息,
// 1、第一个参数是消息的交付标签,
// 2、第二个参数表示是否将消息重新入队;false 表示拒绝后不重新入队,消息会被丢弃(如果没有配置死信队列,则可能被直接删除)
channel.basicReject(tag, false);

二、准备基本环境

1、pom.xml引入的java包

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>${springboot-version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>${springboot-version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version><scope>provided</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><version>${springboot-version}</version><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.57</version></dependency></dependencies>

2、yaml配置文件

# 8004是zookeeper服务器的支付服务提供者端口号
server:port: 8004
spring:application:name: cloud-mqrabbitmq:addresses: 192.168.96.133port: 5672username: guestpassword: guestvirtual-host: /#消费者配置listener:#todo 切记,设置了重拾机制,要抛出异常,不可try catch 后不抛异常,否则重试机制失效simple:#开启ack 手动确认消息是否被消费成功acknowledge-mode: manualretry:enabled: true# 消费失败后,继续消费,然后最多消费5次就不再消费。max-attempts: 5# 消费失败后 ,重试初始间隔时间 2秒initial-interval: 2000# 重试最大间隔时间5秒max-interval: 5000# 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间multiplier: 2direct:#开启ack 手动确认消息是否被消费成功acknowledge-mode: manual#todo 切记,设置了重拾机制,要抛出异常,不可try catch 后不抛异常,否则重试机制失效retry:enabled: true# 消费失败后,继续消费,然后最多消费3次就不再消费。max-attempts: 3# 消费失败后 ,重试初始间隔时间 3秒initial-interval: 3000# 重试最大间隔时间max-interval: 7000# 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间multiplier: 2# 生产者配置template:retry:# 开启消息发送失败重试机制enabled: true# 生产者 true-开启消息抵达队列的确认publisher-returns: false#simple 配置用于设置 RabbitMQ 消息生产者的消息确认类型为“简单确认”。这意味着当消息被发送到 RabbitMQ 之后,只有在消息成功投递到队列中后,RabbitMQ 才会向生产者发送一个确认(ack)通知。如果消息未能成功投递,则不会收到确认。#该配置通常与 publisher-returns: true 一起使用以启用消息返回机制,但在此配置中 publisher-returns 被设置为 false,表示不启用消息返回功能publisher-confirm-type: simple

3、主启动类


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** @author 10564*/
@SpringBootApplication
public class ApplicationRabbitmq {public static void main(String[] args) {SpringApplication.run(ApplicationRabbitmq.class, args);}
}

三、手动确认消息(以普通消息为例)

1、定义消息队列Queue名称

package org.xwb.springcloud.constant;/*** @author 10564*/
public class MqConstant {/*** 手动确认消息*/public static final String ACK_MQ_NAME = "ackQueue";
}

2、配置类Configuration


import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xwb.springcloud.constant.MqConstant;/*** 创建RabbitMQ的配置类* @author 10564*/
@Configuration
public class RabbitmqAckConfig {/*** 简单消息队列*/@Beanpublic Queue ackQueue() {//名字(name):队列的名字,用来区分不同的队列。//是否持久化(durable):如果设置为 true,表示即使服务器重启了,这个队列依然存在。//是否独占(exclusive):如果设置为 true,表示只有创建它的连接才能使用这个队列。//是否自动删除(autoDelete):如果设置为 true,表示当不再有消费者使用这个队列时,服务器会自动删除它。return new Queue(MqConstant.ACK_MQ_NAME,true,false,false);}
}

3、生产者Producer

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import javax.annotation.Resource;/*** @author 10564*/
@Component
public class AckProducer {private static final Logger log = LoggerFactory.getLogger(AckProducer.class);@Resourceprivate RabbitTemplate rabbitTemplate;public void senderAckMessage(String message) {log.info("\nack生产者发送消息:{}\n", message);rabbitTemplate.convertAndSend(MqConstant.ACK_MQ_NAME, message);}
}

4、消费者Consumer

package org.xwb.springcloud.messagetype.ack;import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import org.xwb.springcloud.constant.MqConstant;import java.util.Date;/*** @author 10564*/
@Component
public class AckConsumer {private static final Logger log = LoggerFactory.getLogger(AckConsumer.class);@RabbitListener(queues = MqConstant.ACK_MQ_NAME)public void receiveAckQueueMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws  Exception {try {log.info("\nack消费者接收消息:{},tag:{} \n", message, tag);if("basicAck".equals( message)){//todo  确认消息处理成功,// 第一个参数 tag:消息的交付标签,唯一标识这条消息// 第二个参数 false:表示不批量确认,仅确认当前这条消息log.info("\n手动确认 处理成功 basicAck :{} \n", new Date());channel.basicAck(tag, false);}else if("basicNack".equals( message)){//todo 拒绝确认消息,即告诉 RabbitMQ 我们无法处理这条消息。这会导致该消息被重新放回队列中等待再次投递。//todo  1 第一个参数 表示消息的交付标签,是一个单调递增的标识符,用于唯一标识队列中的一条消息。通过这个标签,RabbitMQ 能够知道你正在处理哪一条消息。//todo  2 第二个参数 表示是否批量确认 true 表示否定确认当前 tag 以及之前所有未确认的消息;如果为 false,则只否定确认当前 tag 对应的消息。 此处为 false,代表只处理当前这一条消息//todo  3 第三个参数 如果设置为 true,消息会被重新放回队列,等待再次投递给其他消费者或同一个消费者。如果设置为 false,消息不会被重新入队,//todo 而是根据队列的配置可能会被丢弃或者路由到死信队列(如果配置了的话)。此处为 true,表示在发生异常时将消息重新入队以便重试。log.info("\n手动拒绝确认消息 basicNack :{} \n", new Date());channel.basicNack(tag, false, false);}else if("basicReject".equals( message)){//todo  拒绝消息,//todo  1、第一个参数是消息的交付标签,//todo  2、第二个参数表示是否将消息重新入队;false 表示拒绝后不重新入队,消息会被丢弃(如果没有配置死信队列,则可能被直接删除)log.info("\n手动确认 拒绝消息 basicReject :{} \n", new Date());channel.basicReject(tag, false);}else{throw new RuntimeException("模拟消费失败");}} catch (Exception e) {log.error("\n消费消息异常,抛出异常{} message:{}\n",tag, e.getMessage());//todo 抛出异常,触发 spring retrythrow e;}}
}

5、测试Test


import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.xwb.springcloud.messagetype.ack.AckProducer;import javax.annotation.Resource;/*** @author 10564*/
@RestController
@RequestMapping("/mq")
public class MqMessageController {@Resourceprivate AckProducer ackProducer;@GetMapping("/ack")public void ack(String message) {ackProducer.senderAckMessage(message);}

6、测试结果


### ack
GET http://localhost:8004/mq/ack?message=basicAck2025-06-21 23:33:21.758  INFO 19824 --- [nio-8004-exec-1] o.x.s.messagetype.ack.AckProducer        : 
ack生产者发送消息:basicAck
2025-06-21 23:33:21.771  INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer        : 
ack消费者接收消息:basicAck,tag:1 
2025-06-21 23:33:21.772  INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer        : 
手动确认 处理成功 basicAck :Sat Jun 21 23:33:21 CST 2025 ### ack
GET http://localhost:8004/mq/ack?message=basicNack
2025-06-21 23:33:52.687  INFO 19824 --- [nio-8004-exec-2] o.x.s.messagetype.ack.AckProducer        : 
ack生产者发送消息:basicNack2025-06-21 23:33:52.690  INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer        : 
ack消费者接收消息:basicNack,tag:2 2025-06-21 23:33:52.690  INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer        : 
手动拒绝确认消息 basicNack :Sat Jun 21 23:33:52 CST 2025 ### ack
GET http://localhost:8004/mq/ack?message=basicReject
2025-06-21 23:34:14.653  INFO 19824 --- [nio-8004-exec-3] o.x.s.messagetype.ack.AckProducer        : 
ack生产者发送消息:basicReject2025-06-21 23:34:14.656  INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer        : 
ack消费者接收消息:basicReject,tag:3 2025-06-21 23:34:14.656  INFO 19824 --- [ntContainer#0-1] o.x.s.messagetype.ack.AckConsumer        : 
手动确认 拒绝消息 basicReject :Sat Jun 21 23:34:14 CST 2025 

相关文章:

  • 应对进行性核上性麻痹,科学护理指南
  • 本地回环地址在广播风暴与环路排查中的实战指南
  • 简单通过SenseVoice给自己配置一个语音转文字服务
  • Django中为api自定义一些装饰器:如参数校验等
  • GeoJSON 数据简介
  • Android 终端模拟器 termux app
  • 深入Java面试:从Spring Boot到微服务
  • 【C++语法】类和对象(4)——日期类和const成员函数
  • linux安装minio并使用
  • 使用CommonAPI开发Some/IP的流程
  • Spring-MyBatis基本操作
  • rent8_wechat-最常用出租屋管理系统-微信小程序
  • 华为云Flexus+DeepSeek征文 | 基于Flexus X实例的金融AI Agent开发:智能风控与交易决策系统
  • C++题解:【入门】快乐的马里奥(BFS)
  • 从代码学习深度学习 - 预训练BERT PyTorch版
  • 【LeetCode 热题 100】15. 三数之和——排序 + 双指针解法
  • FastAPI框架的10个重要知识点总结
  • Chromium 136 编译指南 macOS篇:编译流程(五)
  • Linux进程间通信——信号
  • kibana和elasticsearch安装
  • 网站顶级导航制作方法/seo学校培训班
  • 网站搭建网站/高级搜索引擎技巧
  • 从蜘蛛日志分析网站/东莞seo推广
  • 学校网站首页模板/如何制作一个网页页面
  • 深圳金融投资网站建设/seo建站公司
  • 什么网站算是h5做的/最大的推广平台