当前位置: 首页 > wzjs >正文

江苏省数字文化馆网站建设国内免费发布产品的平台

江苏省数字文化馆网站建设,国内免费发布产品的平台,网站建设如何插音乐,中信建设有限责任公司工会引言 在某些业务场景中,MQ需要保证消息的顺序性,比如支付订单应该在创建订单之后进行 如果不使用保证顺序的手段,由于多队列、网络等因素可能会导致先处理支付订单的消息再处理创建订单的消息,这样就会导致处理失败 为了避免这…

引言

在某些业务场景中,MQ需要保证消息的顺序性,比如支付订单应该在创建订单之后进行

如果不使用保证顺序的手段,由于多队列、网络等因素可能会导致先处理支付订单的消息再处理创建订单的消息,这样就会导致处理失败

为了避免这样的情况发生,使用MQ时有必要保证消息的顺序性,在RocketMQ中通常使用顺序发送消息和顺序消费消息来保证消息的顺序性

生产者端保证消息有序

当队列全局只有一个时,消息全局有序,此时只需要确保为单个生产者发送(多个生产者同时发送无法预估消息到达的顺序)

或者先生产创建订单的消息再生产支付订单的消息(确保消息不丢)由于全局有序只能有一个队列,队列的压力过大,所以不经常使用

更通用的做法是使用队列有序:在发送消息时通过一定的路由算法将需要有序的消息分发到同一个队列中,使用相同的队列保证有序性

顺序消息分类

RocketMQ 提供了两种顺序消息,即全局顺序消息和分区顺序消息。

  • 全局顺序消息:指某个 Topic 下的所有消息都要保证顺序。在这种情况下,Topic 内部只能有一个队列,生产者将消息顺序发送到这个唯一的队列中。因为队列本身是 FIFO(先进先出)的数据结构,所以进入队列的消息天然有序。不过,由于只有一个队列,会导致并发度很低,吞吐量也会受到限制,因此全局顺序消息的使用场景相对较少。

  • 分区顺序消息:指在一个分区(队列)内的消息有序,不同分区之间的消息可以无序。这是更常见的使用方式,既保证了一定程度的顺序性,又能通过多个分区提高系统的并发能力和吞吐量。

实现分区顺序消息的发送

为了实现分区顺序消息的发送,生产者在发送消息时需要指定消息的路由规则,确保同一业务逻辑下的消息被发送到同一个队列中。例如,在电商系统中,一个订单的不同状态变更消息需要保证顺序,那么可以使用订单 ID 作为消息的路由键。

private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();this.makeSureStateOK();//参数校验Validators.checkMessage(msg, this.defaultMQProducer);//获取topicTopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;try {//获取队列List<MessageQueue> messageQueueList =mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());Message userMessage = MessageAccessor.cloneMessage(msg);String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());userMessage.setTopic(userTopic);//选择队列mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException("select message queue threw exception.", e);}//超时判断long costTime = System.currentTimeMillis() - beginStartTime;if (timeout < costTime) {throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");}if (mq != null) {//发送消息return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);} else {throw new MQClientException("select message queue return null.", null);}}//失败 validateNameServerSetting();throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}

在上述代码中,通过 MessageQueueSelector 实现了消息的路由选择,根据 orderId 选择对应的队列,确保相同 orderId 的消息发送到同一个队列中。

消费者端保证消息有序

前文说过消费者消息消息时,为了全力以赴通常都是使用线程池进行并发消费的

当一批顺序消息被同时拉取到消费者时,如果由线程池并发进行消费也会导致消息的顺序性失效

因此在消费端也需要进行顺序消费,使用DefaultMQPushConsumer进行消费时,设置消息监听器为MessageListenerOrderly

在顺序消费的文章中也说过:设置消息监听器为MessageListenerOrderly时,会通过多种加锁的方式保证消费者顺序消费队列中的消息

如果消费发生失败会阻塞队列导致消息堆积,因此需要注意特殊处理,比如重试次数超过阈值时就记录下来后续再处理

consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {try {for (MessageExt msg : msgs) {// 获取消息的重试次数int retryCount = msg.getReconsumeTimes();System.out.println("Message [" + msg.getMsgId() + "] is reconsumed " + retryCount + " times");//如果重试次数超过阈值 记录if (retryCount >= 3) {System.out.println("Message [" + msg.getMsgId() + "] add DB");}// 模拟消费失败if (retryCount < 3) {throw new RuntimeException("Consume failed");}// 消费成功System.out.println("Message [" + msg.getMsgId() + "] consumed successfully");}return ConsumeOrderlyStatus.SUCCESS;} catch (Exception e) {// 记录日志e.printStackTrace();// 返回重试状态return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}
});

顺序消费模式

RocketMQ 的消费者提供了顺序消费模式,消费者在这种模式下会按照消息在队列中的顺序依次消费。消费者会对每个队列加锁,保证同一时间只有一个线程在处理该队列中的消息,从而确保消息消费的顺序性。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;public class OrderedMessageConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("OrderedTopic", "*");// 设置为顺序消费模式consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

在上述代码中,通过 MessageListenerOrderly 实现了顺序消费的逻辑,消费者会按顺序处理接收到的消息。

消费异常处理

在顺序消费过程中,如果出现消费异常,需要进行合理的处理,以保证消息顺序不受影响。RocketMQ 提供了重试机制,当消费失败时,消息会被重新放入队列中,等待下次消费。消费者需要确保在处理异常时不会破坏消息的顺序。

http://www.dtcms.com/wzjs/336458.html

相关文章:

  • 公司网站建设知识免费seo网站推荐一下
  • 网站轮播图怎么设计手机百度网页版 入口
  • 合肥网站建设哪个公司做得比较好网站cms
  • 电子商务网站 icp备案百度贴吧免费发布信息
  • 专门做创意桌椅的网站推广自己的产品
  • 杭州网站改版谷歌推广怎么做最有效
  • 网站没有做的关键词有排名关键词排名优化
  • 大型的营销型网站seo怎么赚钱
  • 哪些网站是做采购的旺道seo推广有用吗
  • 做房产网站多少钱常熟seo关键词优化公司
  • 专门做网站的公司交什么重庆放心seo整站优化
  • 用织梦系统做的2个网站要把它都上传到服务器上吗产品软文是什么意思
  • 建站合同营销型网站建设团队
  • 网站建设中标签导航的特征广州seo代理计费
  • 一起做网店17网安徽搜索引擎优化
  • 太原企业网站搭建学电商运营的培训机构
  • 知东莞app下载孝感seo
  • 潍坊网站建设优化网站技术解决方案
  • 长春建站模板厂家河南网络推广公司
  • 做电商什么素材网站好抖音seo关键词排名技术
  • 永久免费asp空间申请网站优化课程培训
  • 网站前端建设报价单北京网站制作
  • c 如何拖控件做网站怎样推广公司的网站
  • 沈阳网站设计开发公司青岛官网seo公司
  • 怎么给网站做域名重定向泉州网站seo公司
  • 爱站网收录专门开发小程序的公司
  • 中企动力石家庄分公司点击精灵seo
  • 鞍山站google框架一键安装
  • 石家庄市建设局网站seo推广小分享
  • 什么是网站的二级目录下百度我的订单app