当前位置: 首页 > wzjs >正文

个人发布房源的网站seo知名公司

个人发布房源的网站,seo知名公司,沈阳网站建设思路,免费网站建设排行榜什么是MQ? message queue,消息队列。 消息message:在不同的应用程序之间传递数据。队列queue:一种FIFO先进先出的数据结构,将消息以队列的形式存储起来,并且在不同的应用程序之间进行传递,这就成了Messa…

什么是MQ?

message queue,消息队列。

消息message:在不同的应用程序之间传递数据。队列queue:一种FIFO先进先出的数据结构,将消息以队列的形式存储起来,并且在不同的应用程序之间进行传递,这就成了MessageQueue。

MQ产品最直接的作用,是将同步的事件改为异步的消息驱动

从这个示例看到,SpringBoot框架其实在启动时,就会尝试发布各种ApplicationEvent事件,表示自己启动到了哪个步骤,这时,SpringBoot框架就可以称为消息生产者producer。同样的,只要有ApplicationEvent事件发布了,就会触发MyApplicationListener监听器,处理这些事情。MyApplicationListener就可以成为消息消费者Consumer。

Producer和Consumer他们的运行状况互不干涉,不管有没有Consumer,producer一样会发布消息,反过来,不过Producer有没有发布消息,Consumer一样会监听这些事件,这种方式,实际上就是通过事件中包含的消息在驱动Producer和Consumer工作,这种工作方式也就成为消息驱动。

与消息驱动形成对比的是事件驱动,比如经常写的Controller,只有通过一个事件主动触发,才会调用。

从这个简单的例子可以看出,SpringBoot内部就集成了这种消息驱动的机制,但是,这些producer和Concumer都只能在一个进程中使用。如果要跨进程进行使用呢?这就需要独立一个中间服务,才能发布和接受这段消息。而这个中间服务,就是MQ中间件。

比如在一个大型电商项目中,订单服务完成下单,就可以发布下单时间而下游的消费者,就可以消费这个下单事件,进行一些补充的业务。

在这个业务中MQ中间件应该要起到什么作用呢?

解耦:producer和consumer都只跟中间件进行交互,而不需要互相进行交互,这意味着,在producer发送消息时,不需要考虑有没有consumer或者有多少个consumer,反之亦然,即便producer和cosummer是用不同语言开发的,只要能够与MQ中间件进行交互,那么他们就可以通过MQ中间件进行消息传递。

异步:消息并不是从producer发送出来以后,就立即交由consummer处理,而是在MQ中间件中暂存下来。等consumer启动之后,自行去MQ中间件上处理。也就是说,错开了producer发送消息和consumer接收消息的事件。

削峰:有了MQ做消息暂存,那么当producer发送消息和consumer处理消息的速度不一致时,MQ就能起到削峰填谷的作用。

1、主流MQ产品对比

产品优点缺点适用场景
Kafka吞吐量非常大,性能非常好,技术生态完整功能比较单一分布式日志收集,大数据采集
RabbitMQ消息可靠性高,功能全面吞吐量较低,消息积压会影响性能企业内部系统调用
RocketMQ高吞吐,高性能,高可用,功能全面技术生态相对没有那么完整几乎全场景。尤其适用于金融场景

启动NameServer

nohup sh mqnamesrv &

启动broker

nohup sh ./mqbroker -n localhost:9876 &

关闭namesrv和broker服务

mqshutdown namesrv

mqshutdown broker

什么是顺序消息?

顺序消息是指,消费者端消费消息时,需按照消息的发送顺序来消费,即先发送的消息,需要先消费。

订单创建---->订单支付---->订单发货---->订单配送---->订单完成

创建订单后,会发送5条消息到MQ Broker中,消费者要按照订单创建---->订单支付---->订单发货---->订单配送---->订单完成这个顺序去消费,这样的订单才是有效的。

顺序消费的原理

在默认情况下,消息发送会采取Round Robin轮询方式把消息发送到不同的queue,而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序的。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序,当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

1.updateTopic:修改或创建一个Topic

mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t testtopic

2、deleteTopic:从broker和nameserver删除topic

mqadmin deleteTopic 

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class Producer {public  static final String TOPIC = "testtopic";public static final int MESSAGE_COUNT = 10;public static final String TAG = "simple_tag";public static final String NAME_SERVER_ADDR = "127.0.0.1:9876";public static final String PRODUCER_GROUP = "global_producer_group";public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);producer.setNamesrvAddr("192.168.146.134:9876");producer.start();for (int i = 0; i < MESSAGE_COUNT; i++) {Message message = new Message(TOPIC, TAG, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(message,20*1000);System.out.printf("%s%n",sendResult);}producer.shutdown();}}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws MQClientException, InterruptedException {//创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("global_consumer_group");//设置nameserver地址consumer.setNamesrvAddr("192.168.146.134:9876");//设置订阅, *代表订阅所有tag,不做过滤consumer.subscribe("testtopic","*");//设置消费监听器consumer.setMessageListener(new MessageListenerConcurrently() {/*** 消费消息* @param list 消息列表* @param consumeConcurrentlyContext 消费者上下文对象* @return 返回消费状态*/@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list) {System.out.println("Consumer Receive:"+new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();System.out.println("Consumer Started");//循环消费Thread.sleep(Long.MAX_VALUE);}
}
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.List;public class Producer {public  static final String TOPIC = "simple_topic";public static final int MESSAGE_COUNT = 10;public static final String TAG = "simple_tag";public static final String NAME_SERVER_ADDR = "127.0.0.1:9876";public static final String PRODUCER_GROUP = "global_producer_group";public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);producer.setNamesrvAddr("192.168.146.134:9876");producer.start();for (int i = 0; i < MESSAGE_COUNT; i++) {Message message = new Message(TOPIC, TAG, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(message,new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {System.out.println("list:"+list);System.out.println("o:"+o);System.out.println("message:"+message);return list.get(1);}},1);System.out.printf("%s%n",sendResult);}producer.shutdown();}}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws MQClientException, InterruptedException {//创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("global_consumer_group");//设置nameserver地址consumer.setNamesrvAddr("192.168.146.134:9876");//设置订阅, *代表订阅所有tag,不做过滤consumer.subscribe("simple_topic","*");//设置消费监听器consumer.setMessageListener(new MessageListenerConcurrently() {/*** 消费消息* @param list 消息列表* @param consumeConcurrentlyContext 消费者上下文对象* @return 返回消费状态*/@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt messageExt : list) {System.out.println("Consumer Receive:"+new String(messageExt.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();System.out.println("Consumer Started");//循环消费Thread.sleep(Long.MAX_VALUE);}
}

http://www.dtcms.com/wzjs/25792.html

相关文章:

  • 免费优化关键词seo公司怎么推广宣传
  • 一家企业如何做网站推广网络推广优化服务
  • 高端品牌网站建设的特点求职seo服务
  • 惠州哪家做网站好找网站设计公司
  • 使用unity做网站色盲怎么治疗
  • 电商网站制作教程搜索引擎 磁力吧
  • 做网站添加mp3什么软件比百度搜索好
  • 如何在手机上制作网站百度全网营销
  • 网站开发的职业认知报告海外aso优化
  • 如何破解网站后台百度热搜广告位多少钱
  • 专门做外贸的网站手把手教你优化网站
  • 微商城小程序app开发抖音seo排名优化软件
  • 中信建设有限责任公司陶杨seo排名点击软件
  • 百度做网站优化多少钱一年爱站网关键词查询网站
  • 金环建设集团网站软文营销的三个层面
  • 国外响应式网站成都网站关键词推广
  • 网站名称意义如何推广seo
  • 政府网站建设和管理建议安卓优化大师官方版
  • 哈尔滨网站建设供应商腾讯云域名注册
  • 动态网站流程星乐seo网站关键词排名优化
  • 网站建设 管理系统开发网站快速排名公司
  • 手机网站内容模块营销培训方案
  • 做网站什么都不懂 怎么做企业的网络推广
  • 网站怎么做友情连接北京外贸网站优化
  • 免费做店招哪个网站好网站的网站建设
  • 网站建设方案 pdf2022年最新热点素材
  • 营销型网站的建设最近的头条新闻
  • 建个人网站赚钱多吗网络营销的概念是什么
  • 设计网站做海报最近几天新闻大事
  • 网站上传好了如何做定向网络优化seo是什么工作