当前位置: 首页 > news >正文

rabbitmq-amqp事务消息+消费失败重试机制+prefetch限流

1. 安装和配置

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

1.2 yml 配置

### 生产端的配置
spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: / # 虚拟主机
    username: guest
    password: guest
    publisher-returns: true  #确认消息已经发送到队列,生产上无需开启
    # simple:同步等待confirm结果,直到超时
    #开启消息确认 :correlated:异步回调,MQ返回结果时会回调这个ComfirmCallback
    publisher-confirm-type: correlated #确认消息已发送到交换机
## 生产端的配置
spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: / # 虚拟主机
    username: guest
    password: guest
    publisher-returns: true  #确认消息已经发送到队列,生产上无需开启
    # simple:同步等待confirm结果,直到超时
    #开启消息确认 :correlated:异步回调,MQ返回结果时会回调这个ComfirmCallback
    publisher-confirm-type: correlated #确认消息已发送到交换机

2.生产端的消息确认发送代码

/**
 * (1) RabbitTemplate.ConfirmCallback 这个接口是用来确定消息是否到达交换器的
 * (2) RabbitTemplate.ReturnsCallback 这个则是用来确定消息是否到达队列的,未到达队列时会被调用
 */
@Service
@Slf4j
public class RabbitMqConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{

    private RabbitTemplate rabbitTemplate;


    public void queueConfirm(Map<String, String> map) {
        // 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息
        rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1", map, new CorrelationData("111"));
        // 故意输入一个不存在的交换机
        rabbitTemplate.convertAndSend("confirm_exchange_2222", "confirm_key1", map, new CorrelationData("22222"));
        // 故意输入一个不存在的队列
        rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1_333333", map, new CorrelationData("3333"));
        log.info("Confirm -- 消息--发送结束");
    }


    /**
     * 需要给ConfirmCallback赋值 不然不会走回调方法,默认是null
     * //将当前类的实例设置为 RabbitMQ 的确认回调处理器,跟下面的confirm方法联合使用,
     * // 还需要打开配置:spring: rabbitmq: publisher-confirm-type: correlated
     */
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    @Autowired
    public RabbitMqConfirmCallback(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
//        rabbitTemplate.setConfirmCallback(this);
    }

 
    /** 此方法用于监听消息是否发送到交换机
     * 回调
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("confirm -- 监听消息成功发送到交换机--回调id = {}", correlationData);

        } else {
            log.info("confirm -- 消息没有发送到交换机回调id= {},消息发送失败:{}。", correlationData, cause);
        }
    }

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("消息未到达队列 --- returnedMessage= " + returnedMessage);
    }
}

2.2 生产端的截图

3.消费端代码

@Component
@Slf4j
public class RabbitConfirmConsumer {

    // 交换机
    public static final String confirm_exchange_name = "confirm_exchange";
    // 队列
    public static final String confirm_queue_name="confirm_queue";
    // routingkey
    public static final String confirm_routing_key = "confirm_key1";

    // 声明交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return new DirectExchange(confirm_exchange_name);
    }
    // 声明队列
    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(confirm_queue_name).build();
    }
    // 绑定队列到交换机
    @Bean
    public Binding queueBingExchange(Queue confirmQueue,DirectExchange confirmExchange){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(confirm_routing_key);
    }


    /**
     * ack:成功处理消息,RabbitMQ从队列中删除该消息
     * nack:消息处理失败,RabbitMQ需要再次投递消息
     * reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
     */
    @RabbitListener(queues = "confirm_queue")
    public void consumerConfirm(Message message, Channel channel, @Payload Map<String, Object> map,
                                @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        //获取消息的唯一标记
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        log.info("接收的消息为:{},消息的唯一标记={}, 直接注入的tag= {}",message, deliveryTag, tag);
        if(message.getBody() != null){
            //获取消息的内容
            byte[] body = message.getBody();
            //basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
            channel.basicAck(deliveryTag,false);//false 表示仅确认当前消息消费成功
            log.info("接收的消息为:{}", map);
        }else{
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
            log.info("未消费数据");
        }
    }

}

3.2消费端截图

4 消费端重试机制

@Service
@Slf4j
public class RabbitRetryConsumer {

    @Bean
    public Queue retryQueue(){
        Map<String,Object> params = new HashMap<>();
        return QueueBuilder.durable("retry_queue").withArguments(params).build();

    }

    @Bean
    public TopicExchange retryTopicExchange(){
        return new TopicExchange("retry_exchange",true,false);
    }
    //队列与交换机进行绑定
    @Bean
    public Binding BindingRetryQueueAndRetryTopicExchange(Queue retryQueue, TopicExchange retryTopicExchange){
        return BindingBuilder.bind(retryQueue).to(retryTopicExchange).with("retry_key");
    }


    int count  = 0;
    //测试重试,需要在yml配置 retry
    @RabbitListener(queues = "retry_queue")
    public void retryConsumer(Map<String, String> map, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        log.info("retryConsumer 重试次数 = {},重试接收数据为:{}",count++, map);
        int i = 10 /0;
        channel.basicAck(tag,false);
    }

}

4.2 重试机制截图

5. 限流设置--消费端

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual # 开启手动确认模式
        prefetch: 5 #控制消费者从队列中预取(prefetch)消息的数量,以此来实现流控制

5.1 生产端--发送19条信息

@GetMapping("/xianliu")
    public String xianliuTest(){
        for(int i = 1; i < 20; i++){
            Map<String, String> map = new HashMap<>();
            map.put("key","限流测试--" + i);
            rabbitMqProducer.xianliuTest(map);
        }
        return "限流测试发送成功";
    }


/***
     * 限流消息的发送测试
     */
    public void xianliuTest(Map<String, String> map) {
        // 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息
        rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1", map, new CorrelationData("111"));
    }

5.2 消费端

 /**
     * ack:成功处理消息,RabbitMQ从队列中删除该消息
     * nack:消息处理失败,RabbitMQ需要再次投递消息
     * reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
     */
    @RabbitListener(queues = "confirm_queue")
    public void consumerConfirm(Message message, Channel channel, @Payload Map<String, Object> map,
                                @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        //获取消息的唯一标记
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        log.info("接收的消息为:{},消息的唯一标记={}, 直接注入的tag= {}",message, deliveryTag, tag);
        if(message.getBody() != null){
            //basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
            //channel.basicAck(deliveryTag,false);//false 表示仅确认当前消息消费成功
            log.info("接收的消息为:{}", map);
        }else{
            //否定确认
            //channel.basicNack(deliverTag,false,true);//requeue为false,则变成死信队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
            log.info("未消费数据");
        }
    }

5.3 注释掉channel.basicAck--堵塞了

5.4 注释掉了 prefetch -- 19条全部被消费,即使没有ack

相关文章:

  • Ubuntu 22.04安装NVIDIA A30显卡驱动
  • 机器学习-决策树详细解释
  • 决策树(Decision Tree)基础知识
  • Java TCP 通信:实现简单的 Echo 服务器与客户端
  • Scala:统计每个单词出现的个数并打印
  • SolidWorks 转 PDF3D 技术详解
  • [vue] .native修饰符
  • 【Proteus仿真】【STM32单片机】全自动养护智能生态雨林缸
  • SpringCloud篇(服务网关 - GateWay)
  • 《深度学习实战》第11集:AI大模型压缩与加速
  • Python语句中OR逻辑运算符用例分析
  • unity学习63,第2个小游戏:用fungus做一个简单对话游戏
  • grpc工具使用
  • SQL基础语法
  • 数据结构与算法:二分答案法
  • 文件IO函数和目录相关函数
  • 飞算JavaAI编程工具集成到idea中
  • STM32 -- 仿真器 ST-Link、J-Link 的连接、参数设置
  • 版本控制器Git和gdb
  • 一键无损放大视频,让老旧画面重焕新生!
  • 视频丨美国两名男童持枪与警察对峙,一人还试图扣动扳机
  • 北京“准80后”干部兰天跨省份调任新疆生态环境厅副厅长
  • 美国三大指数全线高开:纳指涨逾4%,大型科技股、中概股大涨
  • 世界期待中美对话合作带来更多确定性和稳定性
  • 第二期人工智能能力建设研讨班在京开班,近40国和区域组织代表参加
  • 《广州大典研究》集刊发展座谈会:“广州学”的传承与创新