当前位置: 首页 > wzjs >正文

关于建设教体局网站的申请我想注册一个网站怎么注册

关于建设教体局网站的申请,我想注册一个网站怎么注册,在什么网站上做精帖,武汉百度推广公司引言 在某些业务场景中,MQ需要保证消息的顺序性,比如支付订单应该在创建订单之后进行 如果不使用保证顺序的手段,由于多队列、网络等因素可能会导致先处理支付订单的消息再处理创建订单的消息,这样就会导致处理失败 为了避免这…

引言

在某些业务场景中,MQ需要保证消息的顺序性,比如支付订单应该在创建订单之后进行

如果不使用保证顺序的手段,由于多队列、网络等因素可能会导致先处理支付订单的消息再处理创建订单的消息,这样就会导致处理失败

为了避免这样的情况发生,使用MQ时有必要保证消息的顺序性,在RocketMQ中通常使用顺序发送消息和顺序消费消息来保证消息的顺序性

生产者端保证消息有序

当队列全局只有一个时,消息全局有序,此时只需要确保为单个生产者发送(多个生产者同时发送无法预估消息到达的顺序)

或者先生产创建订单的消息再生产支付订单的消息(确保消息不丢)由于全局有序只能有一个队列,队列的压力过大,所以不经常使用

更通用的做法是使用队列有序:在发送消息时通过一定的路由算法将需要有序的消息分发到同一个队列中,使用相同的队列保证有序性

顺序消息分类

RocketMQ 提供了两种顺序消息,即全局顺序消息和分区顺序消息。

  • 全局顺序消息:指某个 Topic 下的所有消息都要保证顺序。在这种情况下,Topic 内部只能有一个队列,生产者将消息顺序发送到这个唯一的队列中。因为队列本身是 FIFO(先进先出)的数据结构,所以进入队列的消息天然有序。不过,由于只有一个队列,会导致并发度很低,吞吐量也会受到限制,因此全局顺序消息的使用场景相对较少。

  • 分区顺序消息:指在一个分区(队列)内的消息有序,不同分区之间的消息可以无序。这是更常见的使用方式,既保证了一定程度的顺序性,又能通过多个分区提高系统的并发能力和吞吐量。

实现分区顺序消息的发送

为了实现分区顺序消息的发送,生产者在发送消息时需要指定消息的路由规则,确保同一业务逻辑下的消息被发送到同一个队列中。例如,在电商系统中,一个订单的不同状态变更消息需要保证顺序,那么可以使用订单 ID 作为消息的路由键。

private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();this.makeSureStateOK();//参数校验Validators.checkMessage(msg, this.defaultMQProducer);//获取topicTopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;try {//获取队列List<MessageQueue> messageQueueList =mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());Message userMessage = MessageAccessor.cloneMessage(msg);String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());userMessage.setTopic(userTopic);//选择队列mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException("select message queue threw exception.", e);}//超时判断long costTime = System.currentTimeMillis() - beginStartTime;if (timeout < costTime) {throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");}if (mq != null) {//发送消息return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);} else {throw new MQClientException("select message queue return null.", null);}}//失败 validateNameServerSetting();throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}

在上述代码中,通过 MessageQueueSelector 实现了消息的路由选择,根据 orderId 选择对应的队列,确保相同 orderId 的消息发送到同一个队列中。

消费者端保证消息有序

前文说过消费者消息消息时,为了全力以赴通常都是使用线程池进行并发消费的

当一批顺序消息被同时拉取到消费者时,如果由线程池并发进行消费也会导致消息的顺序性失效

因此在消费端也需要进行顺序消费,使用DefaultMQPushConsumer进行消费时,设置消息监听器为MessageListenerOrderly

在顺序消费的文章中也说过:设置消息监听器为MessageListenerOrderly时,会通过多种加锁的方式保证消费者顺序消费队列中的消息

如果消费发生失败会阻塞队列导致消息堆积,因此需要注意特殊处理,比如重试次数超过阈值时就记录下来后续再处理

consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {try {for (MessageExt msg : msgs) {// 获取消息的重试次数int retryCount = msg.getReconsumeTimes();System.out.println("Message [" + msg.getMsgId() + "] is reconsumed " + retryCount + " times");//如果重试次数超过阈值 记录if (retryCount >= 3) {System.out.println("Message [" + msg.getMsgId() + "] add DB");}// 模拟消费失败if (retryCount < 3) {throw new RuntimeException("Consume failed");}// 消费成功System.out.println("Message [" + msg.getMsgId() + "] consumed successfully");}return ConsumeOrderlyStatus.SUCCESS;} catch (Exception e) {// 记录日志e.printStackTrace();// 返回重试状态return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}
});

顺序消费模式

RocketMQ 的消费者提供了顺序消费模式,消费者在这种模式下会按照消息在队列中的顺序依次消费。消费者会对每个队列加锁,保证同一时间只有一个线程在处理该队列中的消息,从而确保消息消费的顺序性。

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;public class OrderedMessageConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("OrderedTopic", "*");// 设置为顺序消费模式consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

在上述代码中,通过 MessageListenerOrderly 实现了顺序消费的逻辑,消费者会按顺序处理接收到的消息。

消费异常处理

在顺序消费过程中,如果出现消费异常,需要进行合理的处理,以保证消息顺序不受影响。RocketMQ 提供了重试机制,当消费失败时,消息会被重新放入队列中,等待下次消费。消费者需要确保在处理异常时不会破坏消息的顺序。

http://www.dtcms.com/wzjs/324007.html

相关文章:

  • 有源码后怎么做网站win11优化大师
  • wordpress 视频站百度关键词排名用什么软件
  • 同城信息服务平台优化排名推广技术网站
  • 珠海免费建站打开搜索引擎
  • 西安企业注册兰州正规seo整站优化
  • 北京网站建设降龙网络google图片搜索
  • 单位网站设计流程步骤想做网站找什么公司
  • 成品ppt网站上海seo网站优化软件
  • 网站建设的需求和目的seo超级外链发布
  • 西安自助建站做网站个人如何在百度做广告
  • 深圳 营销型网站建设谷歌下载安装
  • 免备案域名注册郑州网站推广优化公司
  • 婚纱网站页面设计百度移动端关键词优化
  • 荆门网站开发公司深圳推广网络
  • 怎么做日本网站的推广网络营销渠道可分为
  • wordpress进不去数据库长沙seo优化哪家好
  • 人才网站建站下载安装百度一下
  • 免费网站空间产品推广的渠道
  • 西安网站建设 早晨百度一下网页版
  • 专门做app网站400个成品短视频
  • 吉林省交通建设质量监督站网站外贸营销网站制作公司
  • vs做网站时怎么弹出窗口一网信息一个简单便捷的新闻网站
  • 做动漫网站用什么程序域名购买
  • 怎么用织梦做自己的网站百度一下你就知道官网新闻
  • 用户界面设计原则seo网站优化建议
  • 广州金融网站建设百度安装到桌面
  • 利用百度图片做网站外链品牌建设的五个要素
  • 轻创网天津seo排名收费
  • 把自己做的网站上传到服务器种子搜索神器 bt 下载
  • 做衣服的网站推荐太原seo管理