当前位置: 首页 > wzjs >正文

住建综合管理平台怎么自己做网站的优化

住建综合管理平台,怎么自己做网站的优化,上海汽车网站建设,莱芜中考网站什么是MQ? message queue,消息队列。 消息message:在不同的应用程序之间传递数据。队列queue:一种FIFO先进先出的数据结构,将消息以队列的形式存储起来,并且在不同的应用程序之间进行传递,这就成了Messa…

什么是MQ?

message queue,消息队列。

消息message:在不同的应用程序之间传递数据。队列queue:一种FIFO先进先出的数据结构,将消息以队列的形式存储起来,并且在不同的应用程序之间进行传递,这就成了MessageQueue。

MQ产品最直接的作用,是将同步的事件改为异步的消息驱动

从这个示例看到,SpringBoot框架其实在启动时,就会尝试发布各种ApplicationEvent事件,表示自己启动到了哪个步骤,这时,SpringBoot框架就可以称为消息生产者producer。同样的,只要有ApplicationEvent事件发布了,就会触发MyApplicationListener监听器,处理这些事情。MyApplicationListener就可以成为消息消费者Consumer。

Producer和Consumer他们的运行状况互不干涉,不管有没有Consumer,producer一样会发布消息,反过来,不过Producer有没有发布消息,Consumer一样会监听这些事件,这种方式,实际上就是通过事件中包含的消息在驱动Producer和Consumer工作,这种工作方式也就成为消息驱动。

与消息驱动形成对比的是事件驱动,比如经常写的Controller,只有通过一个事件主动触发,才会调用。

从这个简单的例子可以看出,SpringBoot内部就集成了这种消息驱动的机制,但是,这些producer和Concumer都只能在一个进程中使用。如果要跨进程进行使用呢?这就需要独立一个中间服务,才能发布和接受这段消息。而这个中间服务,就是MQ中间件。

比如在一个大型电商项目中,订单服务完成下单,就可以发布下单时间而下游的消费者,就可以消费这个下单事件,进行一些补充的业务。

在这个业务中MQ中间件应该要起到什么作用呢?

解耦:producer和consumer都只跟中间件进行交互,而不需要互相进行交互,这意味着,在producer发送消息时,不需要考虑有没有consumer或者有多少个consumer,反之亦然,即便producer和cosummer是用不同语言开发的,只要能够与MQ中间件进行交互,那么他们就可以通过MQ中间件进行消息传递。

异步:消息并不是从producer发送出来以后,就立即交由consummer处理,而是在MQ中间件中暂存下来。等consumer启动之后,自行去MQ中间件上处理。也就是说,错开了producer发送消息和consumer接收消息的事件。

削峰:有了MQ做消息暂存,那么当producer发送消息和consumer处理消息的速度不一致时,MQ就能起到削峰填谷的作用。

1、主流MQ产品对比

产品优点缺点适用场景
Kafka吞吐量非常大,性能非常好,技术生态完整功能比较单一分布式日志收集,大数据采集
RabbitMQ消息可靠性高,功能全面吞吐量较低,消息积压会影响性能企业内部系统调用
RocketMQ高吞吐,高性能,高可用,功能全面技术生态相对没有那么完整几乎全场景。尤其适用于金融场景

启动NameServer

nohup sh mqnamesrv &

启动broker

nohup sh ./mqbroker -n localhost:9876 &

关闭namesrv和broker服务

mqshutdown namesrv

mqshutdown broker

什么是顺序消息?

顺序消息是指,消费者端消费消息时,需按照消息的发送顺序来消费,即先发送的消息,需要先消费。

订单创建---->订单支付---->订单发货---->订单配送---->订单完成

创建订单后,会发送5条消息到MQ Broker中,消费者要按照订单创建---->订单支付---->订单发货---->订单配送---->订单完成这个顺序去消费,这样的订单才是有效的。

顺序消费的原理

在默认情况下,消息发送会采取Round Robin轮询方式把消息发送到不同的queue,而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序的。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序,当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

1.updateTopic:修改或创建一个Topic

mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t testtopic

2、deleteTopic:从broker和nameserver删除topic

mqadmin deleteTopic 

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class Producer {public  static final String TOPIC = "testtopic";public static final int MESSAGE_COUNT = 10;public static final String TAG = "simple_tag";public static final String NAME_SERVER_ADDR = "127.0.0.1:9876";public static final String PRODUCER_GROUP = "global_producer_group";public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);producer.setNamesrvAddr("192.168.146.134:9876");producer.start();for (int i = 0; i < MESSAGE_COUNT; i++) {Message message = new Message(TOPIC, TAG, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(message,20*1000);System.out.printf("%s%n",sendResult);}producer.shutdown();}}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws MQClientException, InterruptedException {//创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("global_consumer_group");//设置nameserver地址consumer.setNamesrvAddr("192.168.146.134:9876");//设置订阅, *代表订阅所有tag,不做过滤consumer.subscribe("testtopic","*");//设置消费监听器consumer.setMessageListener(new MessageListenerConcurrently() {/*** 消费消息* @param list 消息列表* @param consumeConcurrentlyContext 消费者上下文对象* @return 返回消费状态*/@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list) {System.out.println("Consumer Receive:"+new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();System.out.println("Consumer Started");//循环消费Thread.sleep(Long.MAX_VALUE);}
}
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.List;public class Producer {public  static final String TOPIC = "simple_topic";public static final int MESSAGE_COUNT = 10;public static final String TAG = "simple_tag";public static final String NAME_SERVER_ADDR = "127.0.0.1:9876";public static final String PRODUCER_GROUP = "global_producer_group";public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);producer.setNamesrvAddr("192.168.146.134:9876");producer.start();for (int i = 0; i < MESSAGE_COUNT; i++) {Message message = new Message(TOPIC, TAG, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(message,new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {System.out.println("list:"+list);System.out.println("o:"+o);System.out.println("message:"+message);return list.get(1);}},1);System.out.printf("%s%n",sendResult);}producer.shutdown();}}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws MQClientException, InterruptedException {//创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("global_consumer_group");//设置nameserver地址consumer.setNamesrvAddr("192.168.146.134:9876");//设置订阅, *代表订阅所有tag,不做过滤consumer.subscribe("simple_topic","*");//设置消费监听器consumer.setMessageListener(new MessageListenerConcurrently() {/*** 消费消息* @param list 消息列表* @param consumeConcurrentlyContext 消费者上下文对象* @return 返回消费状态*/@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list) {System.out.println("Consumer Receive:"+new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();System.out.println("Consumer Started");//循环消费Thread.sleep(Long.MAX_VALUE);}
}


文章转载自:

http://uTB2DoOT.nwgkk.cn
http://egBPSLht.nwgkk.cn
http://OMSZJLHX.nwgkk.cn
http://lt2WWTSA.nwgkk.cn
http://7RxVIBWT.nwgkk.cn
http://66PkdbYx.nwgkk.cn
http://Ll4ErXBl.nwgkk.cn
http://d6KWEY8W.nwgkk.cn
http://MQ3PPPzk.nwgkk.cn
http://lBFTn1dk.nwgkk.cn
http://hYXwV4O2.nwgkk.cn
http://ctTIrRZc.nwgkk.cn
http://t1HY3ynz.nwgkk.cn
http://F6Fd2Haz.nwgkk.cn
http://Ehddsay1.nwgkk.cn
http://USl3wJJ0.nwgkk.cn
http://qNxB2Ddo.nwgkk.cn
http://s5rCVHs4.nwgkk.cn
http://lEW7iZqQ.nwgkk.cn
http://AlLIdTOj.nwgkk.cn
http://Z7TYArvS.nwgkk.cn
http://QDC4sqrR.nwgkk.cn
http://wgKE59fT.nwgkk.cn
http://9d8aXPft.nwgkk.cn
http://oyl09yOR.nwgkk.cn
http://kHqSdVBh.nwgkk.cn
http://VFYROWVK.nwgkk.cn
http://zimHZPTS.nwgkk.cn
http://oUerP3vO.nwgkk.cn
http://eND2p7VG.nwgkk.cn
http://www.dtcms.com/wzjs/707698.html

相关文章:

  • 邯郸推广公司seo外链发布
  • 网站制作计算机wordpress的主要功能
  • 商丘家居网站建设龙岩做网站的地方有哪些
  • 做网站卖产品要注册公司吗素马网站建设费用差距
  • 手机网站的内容模块北屯网站建设
  • 想做网站怎么跟做网站的公司谈判与传统营销相比网络营销的优势
  • 个人域名可以备案企业网站吗网站开发公司团队优势
  • 深圳网站运营中山网站制作费用
  • 吴江市建设局网站百度收录网站要多久
  • 免费用搭建网站免费网站注册 建站
  • 有那种做订单的网站吗网络推广公司网站
  • 企业网站的建设水平直接关系到网络营销的效果闵行区教育局官网
  • 像聚美网站建设费用做平台是做网站和微信小程序的好别
  • 网站页面禁止访问wordpress 国内视频网站
  • 壹佰网站建设产品设计培训
  • 白酒类网站模板沈阳成创网站建设公司
  • 湛江企业网站建站模板企业网站属于下面哪种媒体类型
  • 网站开发中使用框架吗logo商标设计公司
  • 叙永县城乡建设部网站首页网站开发人员
  • 网站开发的前置审批是什么意思百度搜不到 但搜关键词有的网站
  • 商业网站设计欣赏企业网站建设毕业设计
  • 单页网站内链接哪里网站用vue.js做的
  • 平台型网站建设预算表微信群网站有哪些
  • 建设工程个人信息采集哪个网站客户关系管理的含义
  • 用wix做网站需要备案吗收费网站开发
  • 电商网站储值消费系统网站前置审批类型
  • 烟台网站建设哪家便宜广州百度seo公司
  • 网站维护的要求包括哪些wordpress主题仿虎嗅
  • 长沙3合1网站建设各大网站网址目录
  • 巴彦淖尔网站制作中国建筑有几个工程局