当前位置: 首页 > news >正文

RocketMQ 使用手册

一、基本概念

  • 分组 group
  • 主题 topic
  • 消息队列 message queue:一般一个消息队列服务一个消费者,提高并发度。
  • 标签 tag:生产消息、消费消息可以指定tag,实现消息消费的过滤。
  • 偏移量 offset:
    • broker中某个队列的偏移量 - 当前队列中有多少消息。
    • 消费者消费到哪个消息了。

二、下载与安装

1、Linux安装

下载地址:https://rocketmq.apache.org/download

解压进入bin目录:

启动nameServer
sh mqnamesrv

如果出现内存空间不足的异常,就修改runserver.sh文件的初始堆空间大小

启动broker
sh mqbroker -c ../conf/broker.conf -n 127.0.0.1:9876 autoCreateTopicEnable=true

指定配置文件:-c ../conf/broker.conf
指定nameserver:-n 127.0.0.1:9876
开启自动创建主题:autoCreateTopicEnable=true

如果出现内存空间不足的异常,就修改runbroker.sh文件的初始堆空间大小

2、控制台安装

下载代码:rocketmq-dashboard.git

修改配置文件,打成jar包:

启用服务:

3、开放相关端口

9876,10909,10911等常用端口

# 检查已开放的端口
firewall-cmd --list-ports

# 新增开放端口
firewall-cmd --zone=public --add-port=10911/tcp --permanent

# 重启防火墙
systemctl restart firewalld.service

三、发送普通消息

1、同步发送

package org.apache.rocketmq.example.quickstart;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * 普通消息 - 同步发送
 * 发送消息后同步响应发送结果
 * 适用于短信通知、消息通知
 */
public class SyncProducer {
    public static void main(String[] args) {
        // 创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("message-group-1");
        // 设置nameServer地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动生产者
        try {
            producer.start();
            // 发送消息
            for (int i = 0; i < 10; i++) {
                // 创建消息实体
                Message message = new Message("MyTopic", "tageA", ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 发送消息并接收响应
                SendResult result = producer.send(message);
                // 打印响应
                System.out.println(result);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            // 停止生产者
            producer.shutdown();
        }
    }
}

响应内容:

控制台信息:每个队列分别有5条消息。

2、异步发送

package org.apache.rocketmq.example.quickstart;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * 普通消息 - 异步发送
 * 通过监听获取发送消息的响应,不会阻塞发送消息的线程
 * 适用于对处理速度要求较高的场景,让生产者可以连续发送消息,而不用阻塞
 */
public class AsyncProducer {
    public static void main(String[] args) {
        // 创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("message-group-2");
        // 设置nameServer地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动生产者
        try {
            producer.start();
            // 发送消息
            for (int i = 0; i < 10; i++) {
                // 创建消息实体
                Message message = new Message("message-topic-2", "tageA", ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 发送消息并接收响应
                producer.send(message, new SendCallback() {
                    // 异步监听,通过回调处理发送消息响应
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println("发送成功!");
                        System.out.println(sendResult);
                    }

                    @Override
                    public void onException(Throwable e) {
                        System.out.println(e.getMessage());
                    }
                });
            }
            Thread.sleep(10000);
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            // 停止生产者
            producer.shutdown();
        }
    }
}

控制台结果

3、单向发送

package org.apache.rocketmq.example.quickstart;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * 普通消息-单向发送
 * 不关心消息是否被接收,仅发送
 * 适用于耗时较短,不关心数据准确的场景 - 日志收集
 */
public class OnewayProducer {
    public static void main(String[] args) {
        // 创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("message-group-3");
        // 设置nameServer地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动生产者
        try {
            producer.start();
            // 发送消息
            for (int i = 0; i < 10; i++) {
                // 创建消息实体
                Message message = new Message("message-topic-3", "tageA", ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 发送消息-单向发送
                producer.sendOneway(message);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            // 停止生产者
            producer.shutdown();
        }
    }
}

四、消费普通消息

1、集群消费模式 - 默认模式

在一个消费者组内,多个消费者协同消费同一个topic中的消息,每个消费者负责一部分消息。

package org.apache.rocketmq.example.quickstart;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;

import java.util.List;

/**
 * 集群模式消费者
 */
public class BalanceConsumer {
    public static void main(String[] args) throws Exception {
        // 创建消费者对象 - 指定群组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group-1");
        // 指定nameServer地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 订阅topic
        consumer.subscribe("message-topic-1", "*");
        // 指定集群消费模式 - 默认
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt messageExt : messageExtList) {
                        System.out.println("topic:" + messageExt.getTopic() +  ",message:" + new String(messageExt.getBody(), "UTF-8"));
                    }
                } catch (Exception e) {
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
        System.out.println("消费者已启动");
    }
}

2、广播消费模式

在一个消费者群组内,每一条消息都被每一个消费者消费一遍。

消费进度不会由broker维护,而是由消费者维护。

package org.apache.rocketmq.example.quickstart;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;

import java.util.List;

/**
 * 集群模式消费者
 */
public class BroadcastConsumer {
    public static void main(String[] args) throws Exception {
        // 创建消费者对象 - 指定群组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group-2");
        // 指定nameServer地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 订阅topic
        consumer.subscribe("message-topic-3", "*");
        // 指定集群消费模式 - 默认
        consumer.setMessageModel(MessageModel.BROADCASTING);
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
                try {
                    for (MessageExt messageExt : messageExtList) {
                        System.out.println("topic:" + messageExt.getTopic() +  ",message:" + new String(messageExt.getBody(), "UTF-8"));
                    }
                } catch (Exception e) {
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
        System.out.println("消费者已启动");
    }
}

五、顺序消息

1、全局顺序消息

消息的生产、消费全局有序,只有一个消息队列。

2、部分顺序消息

消息的生产、消费在仅在一个消息队列内有序,可以有多个消息队列。

3、顺序消息生产

发送消息方法中指定队列选择器规则,规则参数即可。

package org.apache.rocketmq.example.quickstart;

import jdk.nashorn.internal.objects.annotations.Getter;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.ArrayList;
import java.util.List;

/**
 * 部分顺序消息生产者
 */
public class InOrderProducer {
    public static void main(String[] args) {
        // 创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("message-group-4");
        // 设置nameServer地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动生产者
        try {
            producer.start();
            // 发送消息
            for (Order order : buildOrderList()) {
                // 创建消息实体
                Message message = new Message("message-topic-5", null, order.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 发送消息-单向发送
                // 指定消息体,队列选择器,选择器参数(订单id)
                producer.sendOneway(message, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // 通过取模确定相同id的订单数据发送到同一个消息队列中
                        return mqs.get((Integer) arg % mqs.size());
                    }
                }, order.getOrderId());
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            // 停止生产者
            producer.shutdown();
        }
    }

    // 创建Order类
    private static class Order {

        private final Integer orderId;

        private final String step;

        public Order(Integer orderId, String step) {
            this.orderId = orderId;
            this.step = step;
        }

        public Integer getOrderId() {
            return orderId;
        }

        public String getStep() {
            return step;
        }
    }

    // 构造消息列表
    // 5个订单 3个步骤
    private static List<Order> buildOrderList() {
        List<Order> orderList = new ArrayList<>();

        orderList.add(new Order(1, "创建"));
        orderList.add(new Order(2, "创建"));
        orderList.add(new Order(3, "创建"));
        orderList.add(new Order(4, "创建"));
        orderList.add(new Order(5, "创建"));

        orderList.add(new Order(4, "支付"));
        orderList.add(new Order(2, "支付"));
        orderList.add(new Order(3, "支付"));
        orderList.add(new Order(1, "支付"));
        orderList.add(new Order(5, "支付"));

        orderList.add(new Order(2, "发货"));
        orderList.add(new Order(4, "发货"));
        orderList.add(new Order(1, "发货"));
        orderList.add(new Order(3, "发货"));
        orderList.add(new Order(5, "发货"));

        return orderList;
    }
}
一份一份地分配消息

4、顺序消息消费

package org.apache.rocketmq.example.quickstart;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * 顺序消费者
 */
public class InOrderConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group-3");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("message-topic-6", "*");
        // 指定消费偏移量
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 指定消费监听器
        consumer.registerMessageListener(new RegisterMessageListener());
        // 启动消费者
        consumer.start();
        System.out.println("消费者已启动");
    }

    // 构建消费监听器 - 顺序监听器
    private static class RegisterMessageListener implements MessageListenerOrderly {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeOrderlyContext context) {
            context.setAutoCommit(true);
            for (MessageExt messageExt : messageExtList) {
                try {
                    System.out.println("QueueId:" + messageExt.getQueueId() + " , body:"  + new String(messageExt.getBody(), "UTF-8"));
                } catch (Exception e) {
                    System.out.println("消费异常:" + e);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    }
}

六、延时消息

设定消息需要在多久之后才能被消费。

使用场景:下单后未支付,定时取消订单。下单后向队列中写入一条延时消息,消费者拿到消息后先判断是否已支付,已支付就忽略,没支付就取消。

Message message = new Message("message-topic-3", "tageA", ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置延时等级 一共有18个等级,等级越大延迟越久
message.setDelayTimeLevel(1);

七、批量消息

为了提高发送消息的性能,可以将多条消息打包成一份批量发送,操作的位置在发送端。

package org.apache.rocketmq.example.quickstart;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

import java.util.ArrayList;
import java.util.List;

/**
 * 批量消息生产者
 */
public class BatchProducer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("producer-group-1");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        // 批量发送消息
        try {
            // 如果消息总大小超过4M,需要手动切分
            producer.send(buildBatchMessage());
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            producer.shutdown();
        }
    }

    private static List<Message> buildBatchMessage() {
        List<Message> messageList = new ArrayList<>();
        messageList.add(new Message("message-topic-7", null, "message-1".getBytes()));
        messageList.add(new Message("message-topic-7", null, "message-2".getBytes()));
        messageList.add(new Message("message-topic-7", null, "message-3".getBytes()));
        messageList.add(new Message("message-topic-7", null, "message-4".getBytes()));
        messageList.add(new Message("message-topic-7", null, "message-5".getBytes()));
        messageList.add(new Message("message-topic-7", null, "message-6".getBytes()));
        messageList.add(new Message("message-topic-7", null, "message-7".getBytes()));

        return messageList;
    }
}

八、过滤消息

进行消息生产的时候,设置一些标签或属性,在消费时使用这些标签和属性实现过滤。

1、tag过滤

package org.apache.rocketmq.example.quickstart;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.Arrays;
import java.util.List;

/**
 * tag过滤生产者
 */
public class TagFilterProducer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new BaseProducer().getInstance();
        producer.start();

        List<String> list = Arrays.asList("tagA", "tagB", "tagC");
        for (String tag : list) {
            Message message = new Message("topic-8", tag, ("message-" + tag).getBytes());

            try {
                SendResult send = producer.send(message);
                System.out.println(send);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        producer.shutdown();
    }
}
package org.apache.rocketmq.example.quickstart;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * tag过滤消费者
 */
public class TagFilterConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group-1");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("topic-8", "tagA || tagB");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
                for (MessageExt messageExt : messageExtList) {
                    System.out.println(messageExt.getTags() + "---" + new String(messageExt.getBody()));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

2、SQL过滤

package org.apache.rocketmq.example.quickstart.filter;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.example.quickstart.base.BaseProducer;

import java.util.Arrays;
import java.util.List;

/**
 * tag过滤生产者
 */
public class SQLFilterProducer {

    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new BaseProducer().getInstance();
        producer.start();

        List<String> list = Arrays.asList("tagA", "tagB", "tagC", "tagD");
        for (int i = 0; i < list.size(); i++) {
            String tag = list.get(i);
            Message message = new Message("topic-9", tag, ("message-" + tag).getBytes());
            // 每一个消息都单独指定一个参数 - a
            message.putUserProperty("a", String.valueOf(i));

            try {
                SendResult send = producer.send(message);
                System.out.println(send);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        producer.shutdown();
    }
}
package org.apache.rocketmq.example.quickstart.filter;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * tag过滤消费者
 */
public class SQLFilterConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group-1");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 配置sql过滤规则,TAGS-默认属性 a-自定义属性
        consumer.subscribe("topic-9",
                MessageSelector.bySql("(TAGS is not null and TAGS in ('tagA', 'tagB')) and (a is not null and a between 0 and 3)"));
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
                for (MessageExt messageExt : messageExtList) {
                    System.out.println(messageExt.getTags() + "---" + new String(messageExt.getBody()));
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

九、发送与消费的重要方法、属性

1、发送

方法、属性

说明

使用场景

producerGroup生产者组适用于事务消息的回查机制,保证事务消息的高可用。普通消息不用关注。

defaultTopicQueueNums

消息队列数量默认主题在每一个broker中的队列数量,对于新创建的主题有效

sendMsgTimeout

发送消息阻塞时间默认3s

compressMsgBodyOverHowmuch

启用压缩的阈值默认4k,如果消息体的大小超过设定的大小,就自动压缩消息

retryTimesWhenSendFailed

同步发送的重试次数默认2次

retryTimesWhenSendAsyncFailed

异步发送的重试次数默认2次

retryAnotherBrokerWhenNotStoreOK

是否换一个broker重试发送消息

默认false

maxMessageSize

允许发送消息的最大长度

默认4M = 

1024 * 1024 *4

sendOneway(final Message msg)

单向发送

消息发出去就不管了

sendOneway(Message msg,
    MessageQueue mq)

单向发送,自定义队列选择

消息发出去就不管了

send(final Message msg)

同步发送

send(final Message msg, final long timeout)

同步发送指定阻塞时间

send(final Message msg, final SendCallback sendCallback)

异步发送,指定回调规则

send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)

异步发送消息指定队列、回调规则

 

2、消费

方法、属性

说明

使用场景

consumerGroup消费者组必填,指定消费者组

namesrvAddr

nameServer

指定消息队列集群

messageModel

消费方式

CLUSTERING - 集群消费

BROADCASTING - 广播消费

consumeFromWhere

消费开始时的偏移量

ConsumeFromWhere枚举

consumeThreadMin

最小线程数

consumeThreadMax

最大线程数

pullInterval

推模式的时间间隔

推模式相当于不断地轮询拉取消息,默认时间间隔为0

pullBatchSize

推模式的每一次条数

每次拉取地消息条数,默认32条

maxReconsumeTimes

消息消费的最大重试次数

默认16次,超出重试次数还失败就进入死心消息

consumeTimeout

消费地阻塞时间

在消费消息时可能会阻塞主线程,所以设定最大阻塞时间

fetchSubscribeMessageQueues(String topic)

获取对当前主题分配的队列

subscribe(String topic, String subExpression)

订阅主题

指定主题和过滤正则表达式

subscribe(final String topic, final MessageSelector messageSelector)

订阅主题并指定过滤规则

过滤规则包含

byTag和

bySql

unsubscribe(String topic)

取消订阅主题

registerMessageListener(MessageListenerConcurrently messageListener)

注册事件监听器

包含并发监听器和顺序监听器

ConsumeConcurrentlyStatus.RECONSUME_LATER

放入重试队列

ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT

稍等片刻再消费消息

保证消息的顺序性

相关文章:

  • 排序--快排--挖坑法
  • 电机控制 | 仿真分析:基于脉冲高频注入的永磁同步电机无感控制
  • doris:容灾管理概览
  • es新增运算符
  • 小程序实现瀑布流布局
  • 【MySQL】用户账户、角色、口令、PAM
  • 【miniconda】centos7安装miniconda
  • 【装饰器】【python】【@wraps详解】装饰器核心问题:元数据丢失解决,以及原理
  • 贪心算法经典应用:最优答疑调度策略详解与Python实现
  • ngx_rtmp_flv_module.c — FLV文件处理模块设计与分析
  • JavaScript基础-常用的键盘事件
  • 第三课:Stable Diffusion图生图入门及应用
  • 跨语言微服务架构(Java、Python)——“API中台”
  • SQL小菜之TOP N查找问题
  • 【SUNO】【AI作词】【提示词】
  • 徘徊检测:视觉分析技术的安防新方向
  • ROS2 humble .launch.py启动文件编写
  • QML输入控件: Dial(1)
  • OLED 播放 GIF图片 Adruino
  • QT高效文件I/O编程--实用指南与最佳实践
  • 中日东三省问题的源起——《1905年东三省事宜谈判笔记》解题
  • 陕西宁强县委书记李宽任汉中市副市长
  • 美政府以拨款为要挟胁迫各州服从移民政策,20个州联合起诉
  • 内塔尼亚胡:以军将在未来几天“全力进入”加沙
  • 中国-拉共体论坛第四届部长级会议北京宣言
  • 持续8年仍难终了的纠纷:败诉方因拒执罪被立案,胜诉方银行账户遭冻结