当前位置: 首页 > news >正文

分布式中间件:RabbitMQ确认消费机制

分布式中间件:RabbitMQ确认消费机制

在分布式系统中,消息队列是实现异步通信和系统解耦的重要组件。RabbitMQ 作为一款功能强大的消息队列中间件,提供了丰富的特性来保证消息的可靠传输和消费。其中,确认消费机制是确保消息被正确处理的关键环节。本文将深入探讨 RabbitMQ 的确认消费机制,并给出不同场景下的配置示例。

确认消费机制概述

RabbitMQ 的确认消费机制主要涉及两个方面:生产者消息确认和消费者消息确认。生产者消息确认用于确保消息成功发送到 RabbitMQ 服务器,而消费者消息确认则用于确保消息被消费者正确处理。

消费者确认模式

RabbitMQ 提供了三种消费者确认模式:

  1. AcknowledgeMode.AUTO:自动确认模式。当消费者接收到消息后,RabbitMQ 会自动将消息标记为已消费,无论消费者是否成功处理该消息。这种模式简单方便,但可能会导致消息丢失,因为如果消费者在处理消息时出现异常,消息已经被确认,无法再次处理。
  2. AcknowledgeMode.MANUAL:手动确认模式。消费者需要显式地调用 channel.basicAck() 方法来确认消息已经被成功处理,或者调用 channel.basicNack()channel.basicReject() 方法来拒绝消息。这种模式可以确保消息不会丢失,但需要开发者手动处理确认逻辑。
  3. AcknowledgeMode.NONE:无确认模式。RabbitMQ 不会等待消费者的确认,一旦消息被发送给消费者,就会将其标记为已消费。这种模式适用于对消息可靠性要求不高的场景。

配置示例

单一消费者实例配置

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    private final ConnectionFactory connectionFactory;

    public RabbitMQConfig(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    @Bean("singleListenerContainer")
    public SimpleRabbitListenerContainerFactory singleListenerContainer() {
        // 创建一个监听容器工厂
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        // 设置连接工厂
        factory.setConnectionFactory(connectionFactory);
        // 设置消息转换器
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        // 设置并发消费
        factory.setConcurrentConsumers(1);
        // 设置最大并发消费
        factory.setMaxConcurrentConsumers(1);
        // 设置最大单条消费消息
        factory.setPrefetchCount(1);
        return factory;
    }
}

在这个配置中,我们创建了一个单一消费者实例的监听容器工厂。通过设置 ConcurrentConsumersMaxConcurrentConsumers 为 1,确保只有一个消费者实例在处理消息。PrefetchCount 设置为 1,表示消费者一次只从队列中获取一条消息,保证消息的顺序处理。

多个消费者实例配置

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.AcknowledgeMode;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    private final ConnectionFactory connectionFactory;

    public RabbitMQConfig(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    @Bean("multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer() {
        // 创建一个监听容器工厂
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        // 设置连接工厂
        factory.setConnectionFactory(connectionFactory);
        // 设置消息转换器
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        // 设置手动提交
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 设置并发消费
        factory.setConcurrentConsumers(10);
        // 设置最大并发消费
        factory.setMaxConcurrentConsumers(20);
        // 设置最大单条消费消息
        factory.setPrefetchCount(10);
        return factory;
    }
}

在多个消费者实例的配置中,我们将 ConcurrentConsumers 设置为 10,MaxConcurrentConsumers 设置为 20,表示初始有 10 个消费者实例,最多可以扩展到 20 个。PrefetchCount 设置为 10,允许每个消费者一次从队列中获取 10 条消息,提高消费效率。同时,我们将确认模式设置为 AcknowledgeMode.MANUAL,需要开发者手动处理消息确认。

自定义 RabbitMQ 发送消息组件

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    private final ConnectionFactory connectionFactory;

    public RabbitMQConfig(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        // 设置消息发送确认
        connectionFactory.setPublisherConfirms(true);
        // 设置消息发送返回
        connectionFactory.setPublisherReturns(true);
        // 创建 rabbitTemplate
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 设置消息发送确认回调
        rabbitTemplate.setMandatory(true);
        // 设置消息发送确认回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("消息发送成功");
            } else {
                System.out.println("消息发送失败:" + cause + correlationData.toString());
            }
        });
        // 设置消息发送返回回调
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            System.out.println("消息丢失:exchange:" + exchange + ",route:" + routingKey + ",replyCode:" + replyCode + ",replyText:" + replyText);
        });
        // 设置消息转换器
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }
}

在这个配置中,我们自定义了一个 RabbitTemplate 用于发送消息。通过设置 PublisherConfirmsPublisherReturnstrue,开启生产者消息确认和返回机制。ConfirmCallback 用于处理消息发送确认结果,ReturnCallback 用于处理消息无法路由到队列的情况。

总结

RabbitMQ 的确认消费机制为分布式系统中的消息传递提供了可靠的保障。通过合理配置生产者和消费者的确认模式,可以确保消息的可靠发送和消费。在实际应用中,需要根据业务需求选择合适的确认模式,并处理好异常情况,以保证系统的稳定性和可靠性。

希望本文能帮助你更好地理解和使用 RabbitMQ 的确认消费机制。如果你有任何问题或建议,欢迎在评论区留言。

以上博客详细介绍了 RabbitMQ 的确认消费机制,并给出了不同场景下的配置示例。通过这些配置,你可以根据业务需求灵活调整消费者实例数量和确认模式,确保消息的可靠传输和处理。

相关文章:

  • QT网页显示的几种方法及对比
  • 计算机网络精讲day1——计算机网络的性能指标(上)
  • 【大坐标处理】
  • MyBatis plus详解
  • 使用BootStrap 3的原创的模态框组件,没法弹出!估计是原创的bug
  • day-110 下降路径最小和 II
  • filebeat和logstash区别
  • reCAPTCHA 打码平台
  • CCBCISCN复盘
  • Ubuntu检查并启用 Nginx 的stream模块或重新安装支持stream模块的Nginx
  • MacOS下的IntelliJ IDEA突然无法访问本机的虚拟机
  • Ubuntu上查看GPU使用情况并释放内存
  • 【C++】C++类
  • Java集合操作三剑客:Collection、collect与Collectors的协奏曲
  • 【高德】-下载路径规划数据-无代码
  • conda create之后,以前的conda env list 只能看到环境路径 没有环境名称了
  • slq-labs日志
  • C++和标准库速成(十)——类型别名、类型定义、类型推断和标准库简介
  • HarmonyOS Next~鸿蒙系统功耗优化体系解析:前台交互与后台任务的全场景节能设计
  • AI Agent系列(七) -思维链(Chain of Thought,CoT)
  • 日照网站网站建设/成人职业培训学校
  • 做自己网站彩票/网站流量统计分析的维度包括
  • 帝国cms地方门户网站模板/青岛网站seo
  • 网站及单位网站建设情况/百度一下图片识别
  • 只做美食类目产品的网站/济南最新消息今天
  • h5网站开发多少钱/南昌seo搜索优化