当前位置: 首页 > news >正文

消息队列--RocketMQ

什么是MQ?

message queue,消息队列。

消息message:在不同的应用程序之间传递数据。队列queue:一种FIFO先进先出的数据结构,将消息以队列的形式存储起来,并且在不同的应用程序之间进行传递,这就成了MessageQueue。

MQ产品最直接的作用,是将同步的事件改为异步的消息驱动

从这个示例看到,SpringBoot框架其实在启动时,就会尝试发布各种ApplicationEvent事件,表示自己启动到了哪个步骤,这时,SpringBoot框架就可以称为消息生产者producer。同样的,只要有ApplicationEvent事件发布了,就会触发MyApplicationListener监听器,处理这些事情。MyApplicationListener就可以成为消息消费者Consumer。

Producer和Consumer他们的运行状况互不干涉,不管有没有Consumer,producer一样会发布消息,反过来,不过Producer有没有发布消息,Consumer一样会监听这些事件,这种方式,实际上就是通过事件中包含的消息在驱动Producer和Consumer工作,这种工作方式也就成为消息驱动。

与消息驱动形成对比的是事件驱动,比如经常写的Controller,只有通过一个事件主动触发,才会调用。

从这个简单的例子可以看出,SpringBoot内部就集成了这种消息驱动的机制,但是,这些producer和Concumer都只能在一个进程中使用。如果要跨进程进行使用呢?这就需要独立一个中间服务,才能发布和接受这段消息。而这个中间服务,就是MQ中间件。

比如在一个大型电商项目中,订单服务完成下单,就可以发布下单时间而下游的消费者,就可以消费这个下单事件,进行一些补充的业务。

在这个业务中MQ中间件应该要起到什么作用呢?

解耦:producer和consumer都只跟中间件进行交互,而不需要互相进行交互,这意味着,在producer发送消息时,不需要考虑有没有consumer或者有多少个consumer,反之亦然,即便producer和cosummer是用不同语言开发的,只要能够与MQ中间件进行交互,那么他们就可以通过MQ中间件进行消息传递。

异步:消息并不是从producer发送出来以后,就立即交由consummer处理,而是在MQ中间件中暂存下来。等consumer启动之后,自行去MQ中间件上处理。也就是说,错开了producer发送消息和consumer接收消息的事件。

削峰:有了MQ做消息暂存,那么当producer发送消息和consumer处理消息的速度不一致时,MQ就能起到削峰填谷的作用。

1、主流MQ产品对比

产品优点缺点适用场景
Kafka吞吐量非常大,性能非常好,技术生态完整功能比较单一分布式日志收集,大数据采集
RabbitMQ消息可靠性高,功能全面吞吐量较低,消息积压会影响性能企业内部系统调用
RocketMQ高吞吐,高性能,高可用,功能全面技术生态相对没有那么完整几乎全场景。尤其适用于金融场景

启动NameServer

nohup sh mqnamesrv &

启动broker

nohup sh ./mqbroker -n localhost:9876 &

关闭namesrv和broker服务

mqshutdown namesrv

mqshutdown broker

什么是顺序消息?

顺序消息是指,消费者端消费消息时,需按照消息的发送顺序来消费,即先发送的消息,需要先消费。

订单创建---->订单支付---->订单发货---->订单配送---->订单完成

创建订单后,会发送5条消息到MQ Broker中,消费者要按照订单创建---->订单支付---->订单发货---->订单配送---->订单完成这个顺序去消费,这样的订单才是有效的。

顺序消费的原理

在默认情况下,消息发送会采取Round Robin轮询方式把消息发送到不同的queue,而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序的。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序,当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

1.updateTopic:修改或创建一个Topic

mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t testtopic

2、deleteTopic:从broker和nameserver删除topic

mqadmin deleteTopic 

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class Producer {
    public  static final String TOPIC = "testtopic";
    public static final int MESSAGE_COUNT = 10;
    public static final String TAG = "simple_tag";
    public static final String NAME_SERVER_ADDR = "127.0.0.1:9876";
    public static final String PRODUCER_GROUP = "global_producer_group";
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
        producer.setNamesrvAddr("192.168.146.134:9876");
        producer.start();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            Message message = new Message(TOPIC, TAG, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(message,20*1000);
            System.out.printf("%s%n",sendResult);
        }
        producer.shutdown();

    }

}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        //创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("global_consumer_group");
        //设置nameserver地址
        consumer.setNamesrvAddr("192.168.146.134:9876");
        //设置订阅, *代表订阅所有tag,不做过滤
        consumer.subscribe("testtopic","*");
       //设置消费监听器
        consumer.setMessageListener(new MessageListenerConcurrently() {
            /**
             * 消费消息
             * @param list 消息列表
             * @param consumeConcurrentlyContext 消费者上下文对象
             * @return 返回消费状态
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt messageExt : list) {
                    System.out.println("Consumer Receive:"+new String(messageExt.getBody()));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //启动消费者
        consumer.start();
        System.out.println("Consumer Started");
        //循环消费
        Thread.sleep(Long.MAX_VALUE);
    }
}
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.List;

public class Producer {
    public  static final String TOPIC = "simple_topic";
    public static final int MESSAGE_COUNT = 10;
    public static final String TAG = "simple_tag";
    public static final String NAME_SERVER_ADDR = "127.0.0.1:9876";
    public static final String PRODUCER_GROUP = "global_producer_group";
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
        producer.setNamesrvAddr("192.168.146.134:9876");
        producer.start();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            Message message = new Message(TOPIC, TAG, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(message,new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                    System.out.println("list:"+list);
                    System.out.println("o:"+o);
                    System.out.println("message:"+message);
                    return list.get(1);
                }
            },1);

            System.out.printf("%s%n",sendResult);
        }
        producer.shutdown();

    }

}
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        //创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("global_consumer_group");
        //设置nameserver地址
        consumer.setNamesrvAddr("192.168.146.134:9876");
        //设置订阅, *代表订阅所有tag,不做过滤
        consumer.subscribe("simple_topic","*");
       //设置消费监听器
        consumer.setMessageListener(new MessageListenerConcurrently() {
            /**
             * 消费消息
             * @param list 消息列表
             * @param consumeConcurrentlyContext 消费者上下文对象
             * @return 返回消费状态
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt messageExt : list) {
                    System.out.println("Consumer Receive:"+new String(messageExt.getBody()));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //启动消费者
        consumer.start();
        System.out.println("Consumer Started");
        //循环消费
        Thread.sleep(Long.MAX_VALUE);
    }
}

相关文章:

  • DeepSeek 助力 Vue3 开发:打造丝滑的表格(Table)之添加行拖拽排序功能示例13,TableView16_13 键盘辅助拖拽示例
  • 【算法】快速幂
  • 6内存泄露问题的讨论
  • MySQL其他客户端程序
  • 边缘计算:工业自动化的智能新引擎
  • 低成本文件共享解决方案:Go File本地Docker部署与外网访问全记录
  • 小米平板 4 Plus 玩机日志
  • Xvfb和VNC Server是什么
  • 使用自定义的RTTI属性对对象进行流操作
  • 7对象树(1)
  • 文本分析(非结构化数据挖掘)——特征词选择(基于TF-IDF权值)
  • Java项目打包(使用IntelliJ IDEA打包Java项目)
  • Ubuntu 22.04 LTS 下载英伟达驱动
  • 买家利益为中心的购物平台
  • 每日一题洛谷P8716 [蓝桥杯 2020 省 AB2] 回文日期c++
  • Mapbox GL JS 实现鼠标绘制矩形功能的详细代码和讲解
  • C++ | std::function
  • Spring Boot中对同一接口定义多个切面的示例,分别通过接口方式和注解方式实现切面排序,并对比差异
  • 基于方法分类的无监督图像去雾论文
  • 小白入门机器学习概述
  • 代做财务报表分析网站/微博指数
  • 昆明网站建设价格低/关键词seo报价
  • 丽之鑫科技网站后台怎么做/网络营销实训个人总结
  • 在百度搜索到自己的网站/如何推广网上国网
  • php 开发动态网站开发/搜索引擎最佳化
  • 做招投标应该了解的网站/中国广告公司前十强