RocketMQ学习系列之——特殊消息类型
在前面的文章我们介绍了常规消息,接下来我将介绍一下其他特殊的消息类型。具体代码,我们可以在 RocketMQ 源码的 example 文件夹去看。
一、广播消息(broadcast)
在一般情况下,对于某一条消息,如果不考虑重试,那么一个 ConsumerGroup 内只有一个 Consumer 能接收到。但是在某些场景下,我们需要让所有的 Consumer 都能收到,此时就用到了广播消息。
消费者通过以下设置,就可以实现订阅广播消息:
consumer.setMessageModel(MessageModel.BROADCASTING);
此时,broker 不会保存 ConsumerGroup 的 offset,而是由 Consumer 自行维护自己的 offset。
二、过滤消息(filter)
如果我们希望在一个 topic 中进一步划分多种消息,消费者只关注其中的一种或几种消息,此时可以使用过滤消息。
过滤消息的逻辑由 Broker 完成,消费者只会收到过滤后的消息。
过滤消息分为两种:
(1)基于 tag 过滤
生产者发送消息的时候,指定 tag:
String[] tags = new String[] {"TagA", "TagB", "TagC"};for (int i = 0; i < 60; i++) {Message msg = new Message("TagFilterTest",tags[i % tags.length],"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}
消费者消费消息的时候,指定关心哪些 tag:
consumer.subscribe("TagFilterTest", "TagA || TagC");
于是,消费者只会收到 topic = TagFilterTest,tag = TagA 或 TagC 的消息。
(2)基于 SQL 过滤
如果希望实现更加复杂的过滤,如:数值比较、模糊匹配等,可以使用 SQL 过滤。SQL 过滤不仅支持 Tag,还支持用户自定义的属性,支持的语法为 SQL92 标准。
比如,生产者发送消息时,指定 Tag 和自定义属性:
String[] tags = new String[] {"TagA", "TagB", "TagC"};for (int i = 0; i < 10; i++) {Message msg = new Message("SqlFilterTest",tags[i % tags.length],("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));msg.putUserProperty("a", String.valueOf(i));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}
消费者消费消息时,通过 SQL 过滤感兴趣的消息:
consumer.subscribe("SqlFilterTest",MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +"and (a is not null and a between 0 and 3)"));
三、顺序消息(order message)
在 RocketMQ 中,一个 topic 的消息会保存在多个 MessageQueue 中。单个 MessageQueue 中的消息满足严格的 FIFO 特性,但是多个 MessageQueue 中的消息并不一定有序。
在某些场景中,我们要求消息必须按照发送时间进行排序。比如,一个订单有从下单、锁库存、支付、下物流等几个业务步骤,这几个消息必须有序,此时就用到顺序消息。
生产者把需要保持顺序的消息发送到同一个 MessageQueue 中,即可实现顺序消息:
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);
消费者需要注册 MessageListerOrderly 回调接口,来向 Broker 反馈消费结果:
consumer.registerMessageListener(new MessageListenerOrderly() {AtomicLong consumeTimes = new AtomicLong(0);@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);this.consumeTimes.incrementAndGet();if ((this.consumeTimes.get() % 2) == 0) {return ConsumeOrderlyStatus.SUCCESS;} else if ((this.consumeTimes.get() % 5) == 0) {context.setSuspendCurrentQueueTimeMillis(3000);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}return ConsumeOrderlyStatus.SUCCESS;}});
注意事项:
1、理解局部有序与全局有序。大部分业务场景下,我们需要的其实是局部有序。如果要保持全局有序,那就只保留一个MessageQueue。性能显然非常低。
2、生产者端尽可能将有序消息打散到不同的MessageQueue上,避免过于集中导致数据热点竞争。
3、消费者端只进行有限次数的重试。如果一条消息处理失败,RocketMQ会将后续消息阻塞住,让消费者进行重试。但是,如果消费者一直处理失败,超过最大重试次数,那么RocketMQ就会跳过这一条消息,处理后面的消息,这会造成消息乱序。
4、消费者端如果确实处理逻辑中出现问题,不建议抛出异常,可以返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT作为替代。
四、延迟消息(delay)
有时我们希望生产者发送消息后,过一段时间之后才能让消费者消费,此时可以用延迟消息。
RocketMQ 提供了两种实现延迟消息的机制,一种是指定固定的延迟级别,一种是指定消息发送时间。
// 指定固定的延迟级别Message message = new Message(TOPIC, ("Hello scheduled message " + i).getBytes(StandardCharsets.UTF_8));message.setDelayTimeLevel(3); //10秒之后发送// 指定消息发送时间Message message = new Message(TOPIC, ("Hello scheduled message " + i).getBytes(StandardCharsets.UTF_8));message.setDeliverTimeMs(System.currentTimeMillis() + 10_000L); //指定10秒之后的时间点
DelayLevel 的延迟时间如下:
对于指定固定延迟级别的延迟消息,RocketMQ 的实现方式是预设一个系统 Topic,名字叫做SCHEDULE_TOPIC_XXXXX。在这个Topic下,预设了18个MessageQueue。这里每个对列就对应了一种延迟级别。然后每次扫描这18个队列里的消息,进行延迟操作就可以了。
对于指定时间点的延迟消息,RocketMQ 采用时间轮算法实现。
五、批量消息(batch)
当生产者发送的消息较多时,为了节省网络传输时间和网络 IO,可以将多条消息打包发送。
同一批消息的 topic 必须相同,且不支持延迟消息。此外,一次发送的消息不能太多,一般不超过 1MB。
List<Message> messages = new ArrayList<>(MESSAGE_COUNT);for (int i = 0; i < MESSAGE_COUNT; i++) {messages.add(new Message(TOPIC, TAG, "OrderID" + i, ("Hello world " + i).getBytes(StandardCharsets.UTF_8)));}SendResult sendResult = producer.send(messages);
六、事务消息(transaction)
假设在一个电商场景中,当用户支付成功后,会进行物流发货、积分变更、清空购物车等操作。这里涉及分布式事务,要保证一致性,实现较为复杂。
不过,引入 RocketMQ 后,情况会变得简单一些。支付服务在支付成功后,把订单消息发送到 MQ 中,由物流服务、积分服务、购物车服务进行消费。由于 MQ 的重试机制,可以认为只要支付服务把消息发送到 MQ 中,后续的服务能够实现最终一致性。于是,分布式事务的问题就简化为如何保证:(1)支付服务的本地事务,(2)支付服务把订单消息投递到 MQ,这两个操作的一致性。
RocketMQ 采用两阶段提交的机制来实现事务消息,保证本地事务和 MQ 消息发送的一致性。
如上图所示,事务消息的实现逻辑如下:
1、生产者发送消息到 MQ。
2、MQ 收到消息后,向生产者返回 ACK。此时消息状态为“暂不能投递”,我们称为“半事务消息”。
3、生产者执行本地事务。
4、生产者根据本地事务执行的结果,向 MQ 提交二次确认结果(Commit 或 Rollback):
4.1、若结果为 Commit,则半事务消息变成“可投递”,并投递给消费者
4.2、若结果为 Rollback,则 MQ 会回滚事务,不会将半事务消息投递给消费者
5、在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
6、生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果,向 MQ 提交二次确认结果(Commit 或 Rollback)。
生产者代码示例:
public static void main(String[] args) throws MQClientException, InterruptedException {TransactionListener transactionListener = new TransactionListenerImpl();TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP, Arrays.asList(TOPIC));// Uncomment the following line while debugging, namesrvAddr should be set to your local address
// producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;});producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < MESSAGE_COUNT; i++) {try {Message msg =new Message(TOPIC, tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("%s%n", sendResult);Thread.sleep(10);} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();}}for (int i = 0; i < 100000; i++) {Thread.sleep(1000);}producer.shutdown();}
public class TransactionListenerImpl implements TransactionListener {private AtomicInteger transactionIndex = new AtomicInteger(0);private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {int value = transactionIndex.getAndIncrement();int status = value % 3;localTrans.put(msg.getTransactionId(), status);return LocalTransactionState.UNKNOW;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status = localTrans.get(msg.getTransactionId());if (null != status) {switch (status) {case 0:return LocalTransactionState.UNKNOW;case 1:return LocalTransactionState.COMMIT_MESSAGE;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;default:return LocalTransactionState.COMMIT_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;}
}
在上述代码中,投递事务消息时会注册一个 TransactionListener。这个 Listener 提供两个回调方法:1、半事务消息发送成功后,执行本地事务;2、收到 MQ 的消息回查请求时,查询本地事务执行状态。