RocketMQ 消息发送高级特性解析(二)
四、代码示例与实践
(一)可靠性传输代码示例
1. 同步发送
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("sync_producer_group");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 创建消息实例,指定Topic、Tag和消息体
Message message = new Message("SyncTopic", "TagA", "Hello, this is a sync message".getBytes());
// 同步发送消息
SendResult sendResult = producer.send(message);
System.out.printf("Message sent. Result: %s%n", sendResult);
// 关闭生产者
producer.shutdown();
}
}
2. 异步发送
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("async_producer_group");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 创建消息实例,指定Topic、Tag和消息体
Message message = new Message("AsyncTopic", "TagA", "Hello, this is an async message".getBytes());
// 异步发送消息
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("Message sent successfully. Result: %s%n", sendResult);
}
@Override
public void onException(Throwable e) {
System.out.printf("Message sent failed. Exception: %s%n", e);
}
});
// 为了确保异步回调执行,主线程休眠一段时间
Thread.sleep(1000);
// 关闭生产者
producer.shutdown();
}
}
3. 单向发送
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class OnewayProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("oneway_producer_group");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 创建消息实例,指定Topic、Tag和消息体
Message message = new Message("OnewayTopic", "TagA", "Hello, this is a oneway message".getBytes());
// 单向发送消息
producer.sendOneway(message);
// 关闭生产者
producer.shutdown();
}
}
(二)事务性发送代码示例
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.TimeUnit;
public class TransactionProducer {
public static void main(String[] args) throws Exception {
// 创建事务消息生产者,并指定生产者组名
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 实现TransactionListener接口
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务逻辑
System.out.println("Executing local transaction for message: " + msg);
// 模拟本地事务执行成功
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 处理事务回查逻辑
System.out.println("Checking local transaction for message: " + msg);
// 根据实际情况返回事务状态
return LocalTransactionState.COMMIT_MESSAGE;
}
});
// 启动生产者
producer.start();
// 发送事务消息
Message msg = new Message("TransactionTopic", "TagA", "Hello RocketMQ, this is a transactional message".getBytes());
producer.sendMessageInTransaction(msg, null);
// 保持主线程运行一段时间,确保事务处理完成
TimeUnit.SECONDS.sleep(5);
// 关闭生产者
producer.shutdown();
}
}
(三)延迟发送代码示例
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class DelayMessageProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("delay_message_group");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 创建消息实例,指定Topic、Tag和消息体
Message message = new Message("DelayTopic", "TagA", "Hello, this is a delayed message".getBytes());
// 设置延迟级别,例如:延迟10秒(延迟级别3)
message.setDelayTimeLevel(3);
// 发送消息
SendResult sendResult = producer.send(message);
System.out.printf("Message sent. Result: %s%n", sendResult);
// 关闭生产者
producer.shutdown();
}
}
(四)批量发送和消费代码示例
1. 批量发送
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;
public class BatchProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("batch_producer_group");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 创建消息列表
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 5; i++) {
// 创建消息实例,指定Topic、Tag和消息体
Message message = new Message("BatchTopic", "TagA", ("Batch Message " + i).getBytes());
messages.add(message);
}
// 批量发送消息
SendResult sendResult = producer.send(messages);
System.out.printf("Batch messages sent. Result: %s%n", sendResult);
// 关闭生产者
producer.shutdown();
}
}
2. 批量消费
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class BatchConsumer {
public static void main(String[] args) throws Exception {
// 创建消费者实例,并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_consumer_group");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅Topic和Tag
consumer.subscribe("BatchTopic", "TagA");
// 注册消息监听器,设置每次拉取10条消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("Received batch message: %s%n", new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.println("Batch consumer started.");
}
}
(五)顺序消息发送代码示例
1. 队列级别顺序消息
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List;
public class OrderProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("order_producer_group");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 订单ID
long orderId = 1001;
// 创建消息实例,指定Topic、Tag和消息体
Message message = new Message("OrderTopic", "TagA", ("Order message for orderId: " + orderId).getBytes());
// 自定义队列选择器,根据订单ID选择队列
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
long id = (long) arg;
int index = (int) (id % mqs.size());
return mqs.get(index);
}
}, orderId);
System.out.printf("Order message sent. Result: %s%n", sendResult);
// 关闭生产者
producer.shutdown();
}
}
2. Topic 级别顺序消息
要实现 Topic 级别顺序消息,只需将 Topic 的队列数量设置为 1,这里假设通过 MQ 的管理工具或配置文件已经将TopicLevelOrderTopic的队列数量设置为 1,代码示例如下:
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class TopicLevelOrderProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("topic_level_order_producer_group");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 创建消息实例,指定Topic、Tag和消息体
Message message = new Message("TopicLevelOrderTopic", "TagA", "This is a topic level order message".getBytes());
// 发送消息
SendResult sendResult = producer.send(message);
System.out.printf("Topic level order message sent. Result: %s%n", sendResult);
// 关闭生产者
producer.shutdown();
}
}
实际场景选择建议
- 可靠性传输:
-
- 同步发送:适用于对消息可靠性要求极高,且业务流程需要等待消息发送结果后再继续执行的场景,如订单创建成功后发送订单确认消息,只有消息发送成功,才认为订单创建流程完整。
-
- 异步发送:适用于对响应时间要求较高,业务逻辑可以在消息发送后继续执行的场景,如用户注册成功后发送欢迎邮件和短信通知,使用异步发送可以避免因等待邮件和短信发送结果而影响用户注册的响应速度。
-
- 单向发送:适用于对消息可靠性要求不高,但追求高吞吐量的场景,如日志收集系统,即使部分日志消息丢失,也不会对核心业务产生重大影响。
- 事务性发送:适用于需要保证本地事务与消息发送一致性的场景,如电商系统中的订单支付场景,只有当支付信息记录成功后,才会发送支付成功消息,确保订单状态与支付状态的一致性。
- 延迟发送:适用于需要在特定时间后执行的任务场景,如电商系统中的订单超时未支付自动取消功能,通过延迟发送设置订单支付截止时间,到达时间后消费消息并检查订单状态。
- 批量发送和消费:适用于需要发送或处理大量小消息的场景,如电商系统的库存更新场景,将多个商品的库存更新消息批量发送和处理,可以减少网络开销和系统调用次数,提高消息处理效率。
- 顺序消息:
-
- 队列级别顺序消息:适用于对局部顺序有要求,且需要一定并发处理能力的场景,如电商系统的订单处理,将同一个订单的相关消息发送到同一个队列,保证订单业务流程的顺序性,同时通过多个队列实现一定的并发处理。
-
- Topic 级别顺序消息:适用于对全局顺序要求极高,但对并发处理能力要求较低的场景,如金融交易系统中交易订单的处理,必须严格按照下单顺序进行,以保证交易的准确性和一致性。
五、总结与展望
RocketMQ 消息发送的高级特性为分布式系统的构建提供了强大的支持 。可靠性传输特性通过同步发送、异步发送和单向发送,满足了不同业务场景对消息发送可靠性和效率的要求;事务性发送特性通过本地事务、两阶段提交和消息回查,保证了消息发送与本地事务的一致性;延迟发送特性为定时任务等场景提供了便捷的解决方案;批量发送和消费特性提高了消息处理的效率;顺序消息特性则确保了消息在特定场景下的有序性。
在实际应用中,合理使用这些高级特性能够显著提升系统的性能和可靠性 。通过选择合适的消息发送方式,如在对可靠性要求高的场景使用同步发送,在追求高并发和低延迟的场景使用异步发送,可以优化系统的消息处理流程。事务性发送特性对于保证分布式系统中数据的一致性至关重要,尤其是在涉及多个服务之间的数据交互时。延迟发送特性能够实现如订单超时处理、任务定时执行等功能,增强系统的自动化和智能化。批量发送和消费特性在处理大量小消息时,能够有效减少网络开销和系统调用次数,提升系统的整体性能。顺序消息特性则在一些对消息顺序敏感的业务场景,如金融交易、物流跟踪等,确保了业务逻辑的正确性和完整性。
展望未来,随着分布式系统的不断发展和业务需求的日益复杂,RocketMQ 有望在消息发送特性方面持续创新和优化 。在可靠性方面,可能会进一步增强消息的容错机制,降低消息丢失和重复的概率;在事务性方面,或许会提供更简洁、高效的事务处理方式,支持更多复杂的业务场景;在延迟发送方面,可能会实现更灵活的延迟时间设置和更精准的时间控制;在批量处理和顺序消息方面,也可能会不断提升性能和稳定性,以满足大规模、高并发的业务需求。同时,RocketMQ 也将不断适应云原生、大数据、人工智能等新兴技术的发展趋势,与其他技术组件更好地融合,为构建更加高效、智能的分布式系统提供更强大的支持。