当前位置: 首页 > wzjs >正文

国外什么网站是做外贸深圳外包网络推广

国外什么网站是做外贸,深圳外包网络推广,轻松筹 的网站价格做,网站建设简历模板一、简介 RabbitMQ 是一个基于 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,它用于异步通信、解耦系统,提高系统的可扩展性和可靠性。它广泛应用于微服务架构、分布式系统、异步处理等场景…

一、简介

RabbitMQ 是一个基于 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,它用于异步通信、解耦系统,提高系统的可扩展性和可靠性。它广泛应用于微服务架构、分布式系统、异步处理等场景。


1.RabbitMQ 的特点

  1. 支持多种协议:支持 AMQP 0.9.1、AMQP 1.0、MQTT、STOMP 等。
  2. 高可用性:支持集群部署、镜像队列等机制,确保消息不丢失。
  3. 可靠性:通过持久化、ACK 机制、事务支持等方式保证消息可靠传输。
  4. 灵活的路由机制:通过交换机(Exchange)和绑定(Binding)策略,实现复杂的消息路由规则。
  5. 插件机制:提供插件扩展,如管理插件、消息追踪插件、WebSocket 支持等。
  6. 轻量级、易于部署:基于 Erlang 语言开发,占用资源较低,支持 Docker、Kubernetes 部署。

2.RabbitMQ 的优缺点

(1)优点:
  1. 支持多种消息模式(点对点、发布订阅、路由等)。
  2. 社区活跃,生态丰富,官方提供大量插件,支持多种语言客户端(Java、Python、Go 等)。
  3. 支持消息持久化,即使服务器重启,消息也不会丢失。
  4. 支持集群模式,可扩展性强,适用于分布式环境。
  5. 性能较好,适用于中小型业务场景。
(2)缺点:
  1. 吞吐量较低:相比 Kafka,RabbitMQ 的吞吐量较低,不适合超大规模数据流处理。
  2. 管理运维复杂:需要手动管理队列、交换机、绑定等,运维成本相对较高。
  3. Erlang 生态较小,部分企业的运维人员对 Erlang 语言不熟悉,可能影响问题排查。
  4. 资源占用较大:在高并发场景下,RabbitMQ 的内存占用较多,可能影响性能。

二、核心组件

1. 生产者(Producer)

  • 负责发送消息到 RabbitMQ,消息最终会进入队列(Queue)。
  • 生产者不会直接把消息发到队列,而是先发送到交换机(Exchange)

2. 交换机(Exchange)

  • 负责接收生产者发送的消息,并根据绑定规则决定消息流向。
  • 交换机类型:
    • Direct(直连交换机):消息按照指定的 routing key 发送到匹配的队列。
    • Fanout(扇形交换机):消息广播到所有绑定的队列,不考虑 routing key
    • Topic(主题交换机):按模式匹配 routing key,适用于模糊匹配的场景(如 logs.*)。
    • Headers(头交换机):根据消息的 Header 属性进行路由。

3. 队列(Queue)

  • 存储消息,等待消费者(Consumer)消费。
  • 可配置是否持久化(避免 RabbitMQ 重启后消息丢失)。
  • 可设置死信队列(DLX),当消息超时未消费或被拒绝时,进入死信队列做后续处理。

4. 绑定(Binding)

  • 连接交换机和队列的桥梁,决定消息如何流转。
  • 绑定通常通过 routing key 进行匹配,或基于 headers 进行匹配。

5. 消费者(Consumer)

  • 从队列中获取消息并进行处理。
  • 支持手动 ACK 机制(保证消息消费后才被确认,避免丢失)。

6. 虚拟主机(Virtual Host,VHost)

  • RabbitMQ 服务器的逻辑隔离机制,不同 VHost 之间的消息互不影响。
  • 每个 VHost 维护自己的队列、交换机和权限控制。

7. RabbitMQ 连接(Connection)和信道(Channel)

  • Connection:TCP 连接,生产者和消费者都需要建立连接。
  • Channel:RabbitMQ 的逻辑连接,多个 Channel 共享一个 Connection,减少 TCP 连接开销。

三、交换机类型

RabbitMQ 中的交换机(Exchange)是消息路由的核心,决定了消息的流向。根据不同的路由策略,RabbitMQ 提供了几种交换机类型:

1. Direct Exchange(直连交换机)

  • 特点:消息通过交换机按 routing key 路由到匹配的队列。
  • 适用场景:精确匹配的消息路由。

在 Direct Exchange 中,生产者发送消息时指定 routing key,只有与队列绑定时的 routing key 完全匹配的队列会接收到消息。


2. Fanout Exchange(扇形交换机)

  • 特点:不关心 routing key,将消息广播到所有绑定的队列。
  • 适用场景:需要将消息发送给多个消费者或多个队列的场景(例如广播通知)。

Fanout Exchange 将消息发送到所有绑定的队列,完全不考虑 routing key


3. Topic Exchange(主题交换机)

  • 特点:使用 routing key 的模式匹配功能,支持通配符(*#)来路由消息。
  • 适用场景:根据模式灵活路由消息,比如日志系统中的级别过滤(如 logs.infologs.error)。

Topic Exchange 根据绑定的 routing key 模式进行匹配,* 匹配一个词,# 匹配多个词。比如 logs.info 匹配所有包含 logs.info 的消息。


4. Headers Exchange(头交换机)

  • 特点:通过匹配消息的 Header 信息进行路由,而不是 routing key
  • 适用场景:需要通过多个属性来过滤消息的情况,比如消息头包含多个属性,灵活的路由机制。

Headers Exchange 根据消息的 header 信息来路由,常用于需要灵活多维度匹配的场景。


总结

  • Direct Exchange:用于精确匹配的场景。
  • Fanout Exchange:用于广播场景,所有绑定的队列都会接收到消息。
  • Topic Exchange:用于模式匹配,灵活路由。
  • Headers Exchange:通过 header 属性进行路由,灵活多维度匹配。

四、创建方式

在 RabbitMQ 中,队列(Queue)、交换机(Exchange)、和绑定(Binding)是消息传递的核心元素。下面我会介绍它们在 Spring Boot 中的创建方式,包括使用 配置类注解方式

1.配置类方式

在配置类中,可以通过 @Bean 注解来声明队列、交换机和绑定关系。

@Configuration
public class RabbitConfig {// 创建队列@Beanpublic Queue queue() {return new Queue("myQueue", true);}// 创建 Direct 交换机@Beanpublic DirectExchange directExchange() {return new DirectExchange("directExchange");}// 创建绑定@Beanpublic Binding binding(Queue queue, DirectExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with("myRoutingKey");}
}
  • 队列(Queue):可以使用 @Bean 在配置类中声明。
  • 交换机(Exchange):同样使用 @Bean 创建,通常使用 DirectExchangeFanoutExchangeTopicExchange 等不同类型。
  • 绑定(Binding):队列和交换机的绑定通过 BindingBuilder.bind(queue).to(exchange).with(routingKey) 来配置。

2.注解方式

在 Spring Boot 中,@RabbitListener 注解可以用来监听 RabbitMQ 队列,并且可以通过 @QueueBinding 直接在注解中完成 队列、交换机、绑定 的声明(如果不存在就会创建)。

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "coupon.order.queue1", durable = "true"), // 队列exchange = @Exchange(name = "coupon.order.exchange", type = ExchangeTypes.DIRECT, durable = "true"), // 交换机key = {"coupon"} // 绑定的 routing key
))
public void receiveMessage(String message) {System.out.println("Received: " + message);
}

注意:RabbitMQ 默认发送和接收的消息是 字节数组(byte[]),在 Spring Boot 中,默认是字符串,但如果发送的是对象,需要手动配置 JSON 序列化 才能正确转换。需要在生产者和消费者都添加Bean注解:

@Beanpublic MessageConverter jacksonMessageConvertor() {return new Jackson2JsonMessageConverter();}

五、生产者确认机制

RabbitMQ 生产者确认机制(Publisher Confirms)用于确保消息从生产者 正确投递到交换机,并且 从交换机正确路由到队列,防止消息丢失。


1. 生产者确认机制的三种模式

首先配置.yml文件或.properties文件:

spring:rabbitmq:# 生产者网络连接失败重试设置,不建议使用template:retry:enabled: true        # 开启消费者重试(抛出异常时触发)initial-interval: 400ms  # 首次重试间隔multiplier: 1        # 间隔倍数(1表示固定间隔)max-attempts: 3      # 最大重试次数(含首次消费)# 生产者确认机制(可靠性保障关键配置)publisher-confirm-type: none  # 开启消息确认publisher-returns: correlated     # 开启消息路由失败通知

(1)ConfirmCallback(消息到达交换机,Exchange)

作用: 确保消息 成功到达交换机(Exchange)。

如果消息 到达交换机,返回 ack=true;如果 未到达交换机(如交换机不存在),返回 ack=false 并提供 cause 错误信息。

实现方式:

rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("消息成功到达交换机!");} else {System.out.println("消息未到达交换机,原因:" + cause);}
});

(2)ReturnCallback(消息未投递到队列,Queue)

作用: 确保消息 正确路由到队列。

如果消息没有匹配任何队列(比如 routingKey 错误),触发 ReturnCallback

实现方式:

rabbitTemplate.setMandatory(true); // 必须设置为 true,否则不会触发回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {System.out.println("消息未投递到队列!");System.out.println("交换机:" + exchange);System.out.println("路由键:" + routingKey);System.out.println("错误代码:" + replyCode + ",错误信息:" + replyText);
});

(3)事务模式(不推荐)

作用: 生产者使用事务 channel.txCommit() 进行消息投递,失败时回滚 channel.txRollback()。但性能较低,不推荐使用。

 以上三种方法都能确认生产者的消息成功送达,但会带来系统额外的性能开销,因此都不推荐使用。非要使用,RabbitMQ 官方建议使用 ConfirmCallback。 

六、消费者确认机制

RabbitMQ 的消费者确认机制默认为none,这种模式下,RabbitMQ 不会要求消费者确认消息,消息会在消费者接收到时直接从队列中删除。 消费者确认机制(Consumer Acknowledgements)用于确保消息被正确消费,避免消息丢失或重复消费。主要有两种模式:

  1. 自动确认(autoAck = true)(不推荐,可能丢失消息)
  2. 手动确认(autoAck = false)(推荐,确保消息被正确处理)

1. 自动确认模式(Auto ACK)

默认情况下,RabbitMQ 采用自动确认,即消费者 收到消息后立即确认,无论业务逻辑是否执行成功,RabbitMQ 都会从队列中删除该消息。

代码示例:
@RabbitListener(queues = "testQueue")
public void receiveMessage(String message) {System.out.println("收到消息:" + message);int result = 1 / 0;  // 业务代码出错
}

消息 刚到达消费者 就被 RabbitMQ 删除,即使消费者 还没处理,也不会重新投递,导致消息丢失

2. 手动确认模式(Manual ACK)

手动确认允许消费者 成功处理消息后再手动确认,并可选择拒绝或重新入队,保证消息不会丢失。

代码示例:
@RabbitListener(queues = "testQueue", ackMode = "MANUAL")
public void receiveMessage(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.println("收到消息:" + new String(message.getBody()));// 业务处理逻辑processMessage(new String(message.getBody()));// 处理成功,手动确认channel.basicAck(deliveryTag, false);} catch (Exception e) {System.out.println("消息处理失败:" + e.getMessage());// 处理失败,拒绝消息并重新入队channel.basicNack(deliveryTag, false, true);}
}

手动确认的三种方式如下:

方法作用说明
channel.basicAck(deliveryTag, false);确认消息处理成功后,通知 RabbitMQ 删除消息
channel.basicNack(deliveryTag, false, true);拒绝消息,并重新入队处理失败时,消息会重新回到队列
channel.basicReject(deliveryTag, false);拒绝消息,不重新入队直接丢弃消息

七、消息持久化

在 RabbitMQ 中,消息持久化是为了确保消息在 RabbitMQ 服务器崩溃重启 后,仍然能够被恢复。持久化可以保证即使 RabbitMQ 意外停止,消息不会丢失。

  • 临时消息(non-persistent): 默认情况下,消息在内存中存储,RabbitMQ 重启后会丢失。
  • 持久化消息(persistent): 消息存储在磁盘上,RabbitMQ 重启后可以恢复。

RabbitMQ 消息持久化涉及两个方面:

  1. 队列持久化
  2. 消息持久化
  3. 路由器持久化

1.设置队列和交换机持久化 

在声明队列和交换机时,需要设置队列的 durable 属性为 true,表示队列是持久化的。

@Beanpublic DirectExchange dlxExchange() {return new DirectExchange("dlx.exchange", true, false);}@Beanpublic Queue dlxQueue() {return new Queue("dlx.queue", true);}

2.设置消息持久化

为了使消息持久化,需要将消息的 deliveryMode 设置为 2(持久化)。如果是通过 Spring AMQP 发送消息,可以通过 MessagePostProcessor 来设置消息持久化。

通过 Spring AMQP 设置消息持久化:
@Autowired
private RabbitTemplate rabbitTemplate;public void sendMessage(String message) {MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 设置消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}};rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, messagePostProcessor);
}

八、延迟队列的实现

RabbitMQ 中实现 延迟队列(Delay Queue)通常有两种方式,分别是通过 插件 和 死信队列+TTL(Time-To-Live) 实现。延迟队列允许消息在特定的时间延迟后再被消费,通常用于实现任务调度、定时任务等功能。

(1)不用插件:

通过死信队列+TTL实现的。首先,我们将消息放到延迟队列中,将TTL设置为延迟时间,消息在TTL过期之后会被转发到死信交换机,再路由到死信队列,接着消费者到死信队列中消费消息,从而达到延迟消费的作用。

通过配置类创建死信交换机和死信队列,设置绑定和TTL 

@Configuration
public class DlxConfig {// 死信交换机@Beanpublic DirectExchange dlxExchange() {return new DirectExchange("dlx.exchange", true, false);}// 死信队列@Beanpublic Queue dlxQueue() {return new Queue("dlx.queue", true);}// 死信队列绑定@Beanpublic Binding dlxBinding() {return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx");}// 延迟交换机和队列@Beanpublic DirectExchange delayExchange() {return new DirectExchange("delay.exchange", true, false);}// 延迟队列@Beanpublic Queue delayQueue() {return QueueBuilder.durable("delay.queue").deadLetterExchange("dlx.exchange")  // 绑定死信交换机.deadLetterRoutingKey("dlx")         // 设置死信路由键.ttl(10 * 1000)                      // 设置消息TTL为10秒.build();}// 延迟队列和交换机绑定@Beanpublic Binding delayBinding() {return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay.routing.key");}
}

 测试类,发送消息到延迟队列:

@Test
public void test4() {LocalTime now = LocalTime.now(); // 获取当前时间DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss"); // 格式化String msg = "hello, amqp";// 通过交换机发送消息rabbitTemplate.convertAndSend("delay.exchange", "delay.routing.key", msg);System.out.println("发送时间为:" + now.format(formatter));
}

 消费者,在死信队列中消费:

@RabbitListener(queues = "dlx.queue")
public void DelayedMessage(String msg) {LocalTime now = LocalTime.now(); // 获取当前时间DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss"); // 格式化System.out.println("收到延迟消息时间为:" + now.format(formatter));
}

(2)使用插件:rabbitmq_delayed_message_exchange

消息发送到延迟交换机时,携带X-delay头(延迟时间),插件将消息暂存在内部数据库中,不会立即路由到队列,到期后将消息路由到目标队列; 

直接在消费者的监视器注解上配置信息:

// 使用插件的 延迟队列声明@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.exchange", delayed = "true"),key = "delay"))public void DelayMessage(String msg) {LocalTime now = LocalTime.now(); // 获取当前时间DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss"); // 格式化System.out.println("收到延迟消息时间为:" + now.format(formatter));}

 测试类,设置TTL:

// 测试有延迟插件的 延迟队列@Testpublic void test5() {LocalTime now = LocalTime.now(); // 获取当前时间DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss"); // 格式化String msg = "hello, amqp";rabbitTemplate.convertAndSend("delay.exchange", "delay", msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(10 * 1000);   // 设置过期时间return message;}});System.out.println("第一次发送时间为:" + now.format(formatter));}

(3)使用插件和不使用插件的区别: 

  1. 消息存储位置:没有插件的需要额外创建一个队列用来存储消息,而有插件的将消息暂存在Mnesia数据库中,占用资源较低;
  2. 性能和误差:没有插件的需要轮询检查消息TTL(默认是每秒检查一次),而有插件的使用 Erlang Timer 模块 或 时间轮算法管理延迟时间,性能比较好,延迟误差较小;
  3. 消息阻塞:没有插件的队列只会检查对头消息是否过期,会导致后续消息被阻塞;而有插件的对每个消息的延迟独立计算,到期后立即触发路由;

九、消息重复消费的解决

消息重复消费是一个常见的 RabbitMQ 消息队列问题,它可能会影响系统的准确性和性能。为了解决消息重复消费的问题,可以采取以下几种策略:

1)开启消费者确认机制(手动ack或自动ack确保每次消费者成功处理完消息后,发送一个信号给RabbitMQ, 如果消费者处理失败,重新排队未确认的消息;

@RabbitListener(queues = "order.payment.queue", ackMode = "MANUAL")
public void handlePaymentMessage(Message message, Channel channel) {String orderId = new String(message.getBody());try {// 处理订单支付orderService.processPayment(orderId);// 确认消息已成功消费channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {// 处理失败,拒绝消息,并不重新入队channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);}
}

(2)核心思想是保证幂等性:无论消息被消费多少次,结果与一次消费相同:

1.设置唯一标识(消息ID),将消息ID放到缓存中,每次消费前根据消息ID是否已存在来判定消息是否处理过;

@Autowired
private StringRedisTemplate redisTemplate;@RabbitListener(queues = "order.payment.queue")
public void handlePaymentMessage(String orderId) {String messageId = "payment:" + orderId;if (redisTemplate.opsForSet().isMember("processedMessages", messageId)) {return;  // 如果消息已经处理过,跳过}// 处理订单支付orderService.processPayment(orderId);// 将消息ID存入 Redis,防止重复消费redisTemplate.opsForSet().add("processedMessages", messageId);
}

2.利用数据库的唯一索引、主键的约束防止消息重复消费; 

    在消费者代码中,如果订单已存在(即订单ID已经在数据库中),则跳过该消息,从而避免重复消费:

    @RabbitListener(queues = "order.payment.queue")
    public void handlePaymentMessage(String orderId) {try {// 检查订单是否已支付(通过唯一索引判断是否重复消费)if (orderService.isOrderPaid(orderId)) {return;  // 如果订单已经支付,则跳过该消息}// 处理订单支付orderService.processPayment(orderId);} catch (Exception e) {// 处理异常e.printStackTrace();}
    }
    

    orderService.isOrderPaid(orderId) 方法中,我们可以查询数据库,检查订单是否已处理:

    public boolean isOrderPaid(String orderId) {// 通过唯一索引查询订单是否存在,如果存在则返回 true,表示已支付return jdbcTemplate.queryForObject("SELECT COUNT(1) FROM order_payment WHERE order_id = ?", Integer.class, orderId) > 0;
    }
    

    3.使用乐观锁,通过版本号或条件判断是否重复消费;

    十、异常消息的处理

    异常消息的处理是消息队列中一个重要的问题,尤其是在 RabbitMQ 中。消息处理过程中,可能会遇到各种异常情况,例如消费者处理消息时发生错误、消息无法被消费等。为了提高系统的可靠性和容错能力,RabbitMQ 提供了多种机制来处理异常消息。

    配置.yml文件或者.properties文件:

    spring:rabbitmq:listener:simple:prefetch: 1  # 每个消费者最大未确认消息数(设为1实现能者多劳模式)acknowledge-mode: auto  # 自动ACK(高风险!建议改为 manual 手动确认,避免消息丢失)retry:enabled: true          # 是否开启发送失败重试(仅针对网络波动场景,不解决业务异常)multiplier: 1          # 重试间隔时间倍数(此处为固定间隔)max-attempts: 3        # 最大重试次数(含首次发送)initial-interval: 200ms # 首次重试间隔

    创建异常交换机和异常队列:

    /*** 定义异常交换机和异常队列* 将异常的消息重试多次失败后,统一放在这个错误队列中,人工处理*/@Configuration
    @ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")   // 当这个属性存在且为true,配置类才生效
    public class ErrorConfig {@Beanpublic DirectExchange errorExchange() {return new DirectExchange("error.exchange", true, false);}@Beanpublic Queue errorQueue() {return new Queue("error.queue", true);}@Beanpublic Binding errorBinding() {return BindingBuilder.bind(errorQueue()).to(errorExchange()).with("error");}// 消息异常重试n此后,移到该异常交换机@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");}
    }

     

    总结

    RabbitMQ 作为轻量级的消息队列,适用于中小型业务场景,具备高可用性、可靠性和灵活的消息路由机制。然而,它的吞吐量不及 Kafka,且运维成本较高。对于需要事务保障、低延迟、灵活消息分发的业务,RabbitMQ 是一个不错的选择。

    http://www.dtcms.com/wzjs/23169.html

    相关文章:

  1. 服装培训网站建设搜索量排行
  2. 成人用品怎样做网站推广山东最新消息今天
  3. 网页设计效果图分析西安seo顾问培训
  4. 网站地址怎么做超链接搜索引擎优化通常要注意的问题有
  5. 本网站只做信息展示不提供在线交易做网站的网络公司
  6. 天天斗地主官方网站开发网络营销人员招聘
  7. 成都科技网站建设热优化推广服务
  8. 城市建设网站金郑州seo推广优化
  9. phpcms 手机网站如何自己编写网站
  10. 建筑公司网站怎么设计seo网站自动发布外链工具
  11. icp主体备案号 网站备案号外贸网站建站和推广
  12. 个人手机网站大全网络推广发展
  13. 用啥网站做首页官方app下载安装
  14. 青岛经济新区建设局网站网站收录怎么做
  15. 如何才能让自己做的网站百度能搜百度爱采购排名
  16. 网站教程网东莞网站推广企业
  17. o2o网站线上seo研究中心骗局
  18. 建设网站公司哪家好晨阳seo
  19. 大良营销网站建设策划北京网上推广
  20. 商机加盟好项目下载优化大师
  21. 上海市最新消息今天网站seo推广
  22. 辽宁建设科技信息网网站会员营销
  23. 码制作官网郭生b如何优化网站
  24. 咨询装修seo联盟
  25. 婚纱网站怎么做seo优化大师下载旧版本安装
  26. 杭州网站设计制作网络营销手段有哪四种
  27. 建网站麻烦吗外包公司和劳务派遣的区别
  28. 静态网站做新闻系统酒店机票搜索量暴涨
  29. 制作网站在哪里百度手机助手下载安装
  30. 丁香园做科室网站今天军事新闻最新消息