当前位置: 首页 > wzjs >正文

青羊区定制网站建设报价深圳网站设计三把火

青羊区定制网站建设报价,深圳网站设计三把火,外贸网站建设怎么建设,泰安红河网站建设什么是MQ? message queue,消息队列。 消息message:在不同的应用程序之间传递数据。队列queue:一种FIFO先进先出的数据结构,将消息以队列的形式存储起来,并且在不同的应用程序之间进行传递,这就成了Messa…

什么是MQ?

message queue,消息队列。

消息message:在不同的应用程序之间传递数据。队列queue:一种FIFO先进先出的数据结构,将消息以队列的形式存储起来,并且在不同的应用程序之间进行传递,这就成了MessageQueue。

MQ产品最直接的作用,是将同步的事件改为异步的消息驱动

从这个示例看到,SpringBoot框架其实在启动时,就会尝试发布各种ApplicationEvent事件,表示自己启动到了哪个步骤,这时,SpringBoot框架就可以称为消息生产者producer。同样的,只要有ApplicationEvent事件发布了,就会触发MyApplicationListener监听器,处理这些事情。MyApplicationListener就可以成为消息消费者Consumer。

Producer和Consumer他们的运行状况互不干涉,不管有没有Consumer,producer一样会发布消息,反过来,不过Producer有没有发布消息,Consumer一样会监听这些事件,这种方式,实际上就是通过事件中包含的消息在驱动Producer和Consumer工作,这种工作方式也就成为消息驱动。

与消息驱动形成对比的是事件驱动,比如经常写的Controller,只有通过一个事件主动触发,才会调用。

从这个简单的例子可以看出,SpringBoot内部就集成了这种消息驱动的机制,但是,这些producer和Concumer都只能在一个进程中使用。如果要跨进程进行使用呢?这就需要独立一个中间服务,才能发布和接受这段消息。而这个中间服务,就是MQ中间件。

比如在一个大型电商项目中,订单服务完成下单,就可以发布下单时间而下游的消费者,就可以消费这个下单事件,进行一些补充的业务。

在这个业务中MQ中间件应该要起到什么作用呢?

解耦:producer和consumer都只跟中间件进行交互,而不需要互相进行交互,这意味着,在producer发送消息时,不需要考虑有没有consumer或者有多少个consumer,反之亦然,即便producer和cosummer是用不同语言开发的,只要能够与MQ中间件进行交互,那么他们就可以通过MQ中间件进行消息传递。

异步:消息并不是从producer发送出来以后,就立即交由consummer处理,而是在MQ中间件中暂存下来。等consumer启动之后,自行去MQ中间件上处理。也就是说,错开了producer发送消息和consumer接收消息的事件。

削峰:有了MQ做消息暂存,那么当producer发送消息和consumer处理消息的速度不一致时,MQ就能起到削峰填谷的作用。

1、主流MQ产品对比

产品优点缺点适用场景
Kafka吞吐量非常大,性能非常好,技术生态完整功能比较单一分布式日志收集,大数据采集
RabbitMQ消息可靠性高,功能全面吞吐量较低,消息积压会影响性能企业内部系统调用
RocketMQ高吞吐,高性能,高可用,功能全面技术生态相对没有那么完整几乎全场景。尤其适用于金融场景

启动NameServer

nohup sh mqnamesrv &

启动broker

nohup sh ./mqbroker -n localhost:9876 &

关闭namesrv和broker服务

mqshutdown namesrv

mqshutdown broker

什么是顺序消息?

顺序消息是指,消费者端消费消息时,需按照消息的发送顺序来消费,即先发送的消息,需要先消费。

订单创建---->订单支付---->订单发货---->订单配送---->订单完成

创建订单后,会发送5条消息到MQ Broker中,消费者要按照订单创建---->订单支付---->订单发货---->订单配送---->订单完成这个顺序去消费,这样的订单才是有效的。

顺序消费的原理

在默认情况下,消息发送会采取Round Robin轮询方式把消息发送到不同的queue,而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序的。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序,当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

1.updateTopic:修改或创建一个Topic

mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t testtopic

2、deleteTopic:从broker和nameserver删除topic

mqadmin deleteTopic 

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class Producer {public  static final String TOPIC = "testtopic";public static final int MESSAGE_COUNT = 10;public static final String TAG = "simple_tag";public static final String NAME_SERVER_ADDR = "127.0.0.1:9876";public static final String PRODUCER_GROUP = "global_producer_group";public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);producer.setNamesrvAddr("192.168.146.134:9876");producer.start();for (int i = 0; i < MESSAGE_COUNT; i++) {Message message = new Message(TOPIC, TAG, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(message,20*1000);System.out.printf("%s%n",sendResult);}producer.shutdown();}}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws MQClientException, InterruptedException {//创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("global_consumer_group");//设置nameserver地址consumer.setNamesrvAddr("192.168.146.134:9876");//设置订阅, *代表订阅所有tag,不做过滤consumer.subscribe("testtopic","*");//设置消费监听器consumer.setMessageListener(new MessageListenerConcurrently() {/*** 消费消息* @param list 消息列表* @param consumeConcurrentlyContext 消费者上下文对象* @return 返回消费状态*/@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list) {System.out.println("Consumer Receive:"+new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();System.out.println("Consumer Started");//循环消费Thread.sleep(Long.MAX_VALUE);}
}
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.List;public class Producer {public  static final String TOPIC = "simple_topic";public static final int MESSAGE_COUNT = 10;public static final String TAG = "simple_tag";public static final String NAME_SERVER_ADDR = "127.0.0.1:9876";public static final String PRODUCER_GROUP = "global_producer_group";public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);producer.setNamesrvAddr("192.168.146.134:9876");producer.start();for (int i = 0; i < MESSAGE_COUNT; i++) {Message message = new Message(TOPIC, TAG, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(message,new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {System.out.println("list:"+list);System.out.println("o:"+o);System.out.println("message:"+message);return list.get(1);}},1);System.out.printf("%s%n",sendResult);}producer.shutdown();}}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws MQClientException, InterruptedException {//创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("global_consumer_group");//设置nameserver地址consumer.setNamesrvAddr("192.168.146.134:9876");//设置订阅, *代表订阅所有tag,不做过滤consumer.subscribe("simple_topic","*");//设置消费监听器consumer.setMessageListener(new MessageListenerConcurrently() {/*** 消费消息* @param list 消息列表* @param consumeConcurrentlyContext 消费者上下文对象* @return 返回消费状态*/@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list) {System.out.println("Consumer Receive:"+new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();System.out.println("Consumer Started");//循环消费Thread.sleep(Long.MAX_VALUE);}
}

http://www.dtcms.com/wzjs/410506.html

相关文章:

  • 广告牌免费设计在线生成seo成创网络
  • wordpress怎样建站互联网营销模式
  • 成都哪里做网站便宜上海网络推广专员
  • 河北建设厅网站技术电话肇庆seo排名
  • 烟台h5网站开发培训心得体会怎么写
  • 徐州人才网档案查询资源优化排名网站
  • 怎么看网站的建设时间广州网络营销推广公司
  • 城建培训中心官网seo需要掌握哪些技术
  • 中国建设银行邵阳分行网站网络建设推广
  • 详情页设计软件今日头条seo
  • 做抽奖的网站犯法吗2023年7月疫情还会严重吗
  • 潍坊网站建设哪家专业市场营销在线课程
  • 个人旅游网站模版株洲seo快速排名
  • 南昌网站推广策划网站seo 工具
  • 南宁网站建设官网百度一下就知道官方
  • 任何做网站百度地图导航2022最新版
  • 党的组织建设优化网站教程
  • 网站建设需什么软件域名交易中心
  • 网站制作内容文案东莞推广
  • 丰台网站关键词优化今年疫情最新消息
  • 比较好看的企业网站公司网页怎么做
  • 对招聘网站页面设计做建议新开传奇网站
  • 迪庆定制网站建设费用个人博客网页制作
  • 网站建设帮助中心外贸网站建设公司
  • 浙江省建设协会网站整站优化cms
  • 扬中新闻中心天津优化公司哪家好
  • 网络营销的特点包含seo外包公司一般费用是多少
  • 国内谷歌网站SEO优化公司做网络推广哪个网站好
  • 做网站的费用记什么会计科目万网注册域名查询
  • 大连网站建设兼职多地优化完善疫情防控措施