消息队列--RocketMQ
什么是MQ?
message queue,消息队列。
消息message:在不同的应用程序之间传递数据。队列queue:一种FIFO先进先出的数据结构,将消息以队列的形式存储起来,并且在不同的应用程序之间进行传递,这就成了MessageQueue。
MQ产品最直接的作用,是将同步的事件改为异步的消息驱动
从这个示例看到,SpringBoot框架其实在启动时,就会尝试发布各种ApplicationEvent事件,表示自己启动到了哪个步骤,这时,SpringBoot框架就可以称为消息生产者producer。同样的,只要有ApplicationEvent事件发布了,就会触发MyApplicationListener监听器,处理这些事情。MyApplicationListener就可以成为消息消费者Consumer。
Producer和Consumer他们的运行状况互不干涉,不管有没有Consumer,producer一样会发布消息,反过来,不过Producer有没有发布消息,Consumer一样会监听这些事件,这种方式,实际上就是通过事件中包含的消息在驱动Producer和Consumer工作,这种工作方式也就成为消息驱动。
与消息驱动形成对比的是事件驱动,比如经常写的Controller,只有通过一个事件主动触发,才会调用。
从这个简单的例子可以看出,SpringBoot内部就集成了这种消息驱动的机制,但是,这些producer和Concumer都只能在一个进程中使用。如果要跨进程进行使用呢?这就需要独立一个中间服务,才能发布和接受这段消息。而这个中间服务,就是MQ中间件。
比如在一个大型电商项目中,订单服务完成下单,就可以发布下单时间而下游的消费者,就可以消费这个下单事件,进行一些补充的业务。
在这个业务中MQ中间件应该要起到什么作用呢?
解耦:producer和consumer都只跟中间件进行交互,而不需要互相进行交互,这意味着,在producer发送消息时,不需要考虑有没有consumer或者有多少个consumer,反之亦然,即便producer和cosummer是用不同语言开发的,只要能够与MQ中间件进行交互,那么他们就可以通过MQ中间件进行消息传递。
异步:消息并不是从producer发送出来以后,就立即交由consummer处理,而是在MQ中间件中暂存下来。等consumer启动之后,自行去MQ中间件上处理。也就是说,错开了producer发送消息和consumer接收消息的事件。
削峰:有了MQ做消息暂存,那么当producer发送消息和consumer处理消息的速度不一致时,MQ就能起到削峰填谷的作用。
1、主流MQ产品对比
产品 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
Kafka | 吞吐量非常大,性能非常好,技术生态完整 | 功能比较单一 | 分布式日志收集,大数据采集 |
RabbitMQ | 消息可靠性高,功能全面 | 吞吐量较低,消息积压会影响性能 | 企业内部系统调用 |
RocketMQ | 高吞吐,高性能,高可用,功能全面 | 技术生态相对没有那么完整 | 几乎全场景。尤其适用于金融场景 |
启动NameServer
nohup sh mqnamesrv &
启动broker
nohup sh ./mqbroker -n localhost:9876 &
关闭namesrv和broker服务
mqshutdown namesrv
mqshutdown broker
什么是顺序消息?
顺序消息是指,消费者端消费消息时,需按照消息的发送顺序来消费,即先发送的消息,需要先消费。
订单创建---->订单支付---->订单发货---->订单配送---->订单完成
创建订单后,会发送5条消息到MQ Broker中,消费者要按照订单创建---->订单支付---->订单发货---->订单配送---->订单完成这个顺序去消费,这样的订单才是有效的。
顺序消费的原理
在默认情况下,消息发送会采取Round Robin轮询方式把消息发送到不同的queue,而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序的。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序,当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
1.updateTopic:修改或创建一个Topic
mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t testtopic
2、deleteTopic:从broker和nameserver删除topic
mqadmin deleteTopic
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
public static final String TOPIC = "testtopic";
public static final int MESSAGE_COUNT = 10;
public static final String TAG = "simple_tag";
public static final String NAME_SERVER_ADDR = "127.0.0.1:9876";
public static final String PRODUCER_GROUP = "global_producer_group";
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
producer.setNamesrvAddr("192.168.146.134:9876");
producer.start();
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message message = new Message(TOPIC, TAG, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message,20*1000);
System.out.printf("%s%n",sendResult);
}
producer.shutdown();
}
}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("global_consumer_group");
//设置nameserver地址
consumer.setNamesrvAddr("192.168.146.134:9876");
//设置订阅, *代表订阅所有tag,不做过滤
consumer.subscribe("testtopic","*");
//设置消费监听器
consumer.setMessageListener(new MessageListenerConcurrently() {
/**
* 消费消息
* @param list 消息列表
* @param consumeConcurrentlyContext 消费者上下文对象
* @return 返回消费状态
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
System.out.println("Consumer Receive:"+new String(messageExt.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
System.out.println("Consumer Started");
//循环消费
Thread.sleep(Long.MAX_VALUE);
}
}
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
public class Producer {
public static final String TOPIC = "simple_topic";
public static final int MESSAGE_COUNT = 10;
public static final String TAG = "simple_tag";
public static final String NAME_SERVER_ADDR = "127.0.0.1:9876";
public static final String PRODUCER_GROUP = "global_producer_group";
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
producer.setNamesrvAddr("192.168.146.134:9876");
producer.start();
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message message = new Message(TOPIC, TAG, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message,new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
System.out.println("list:"+list);
System.out.println("o:"+o);
System.out.println("message:"+message);
return list.get(1);
}
},1);
System.out.printf("%s%n",sendResult);
}
producer.shutdown();
}
}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//创建消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("global_consumer_group");
//设置nameserver地址
consumer.setNamesrvAddr("192.168.146.134:9876");
//设置订阅, *代表订阅所有tag,不做过滤
consumer.subscribe("simple_topic","*");
//设置消费监听器
consumer.setMessageListener(new MessageListenerConcurrently() {
/**
* 消费消息
* @param list 消息列表
* @param consumeConcurrentlyContext 消费者上下文对象
* @return 返回消费状态
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
System.out.println("Consumer Receive:"+new String(messageExt.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动消费者
consumer.start();
System.out.println("Consumer Started");
//循环消费
Thread.sleep(Long.MAX_VALUE);
}
}