生产者消费者消息流转和基本实操
环境准备
# 配置 Broker 的IP地址
echo "brokerIP1=192.168.215.140" > broker.conf# 启动 Broker 和 Proxy
docker run -d \
--name rmqbroker \
--network rocketmq \
-p 10912:10912 -p 10911:10911 -p 10909:10909 \
-p 8080:8080 -p 8081:8081 \
-e "NAMESRV_ADDR=rmqnamesrv:9876" \
-v /root/rocketmq/broker.conf:/home/rocketmq/rocketmq-5.3.1/conf/broker.conf \
apache/rocketmq:5.3.1 sh mqbroker --enable-proxy \
-c /home/rocketmq/rocketmq-5.3.1/conf/broker.conf# 验证 Broker 是否启动成功
docker exec -it rmqbroker bash -c "tail -n 10 /home/rocketmq/logs/rocketmqlogs/proxy.log"
可视化控制台
docker run -d --name rocketmq-dashboard --network rocketmq -e "JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876" -p 8101:8080 -t apacherocketmq/rocketmq-dashboard:latest
客户端依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>${rocketmq.version}</version>
</dependency>
生产者发送消息
生产者和主题的关系为多对多关系,即同一个生产者可以向多个主题发送消息,对于平台类场景如果需要发送消息到多个主题,并不需要创建多个生产者;同一个主题也可以接收多个生产者的消息,以此可以实现生产者性能的水平扩展和容灾。
消息发送模型
RocketMQ提供了三种消息发送模式:同步发送,异步发送,单向发送。
- 同步发送:在同步发送模式下,消息发送方会一直等待消息发送的结果返回。发送方发送消息后会阻塞等待直到收到Broker的确认响应,然后才会继续执行后续的代码逻辑。这种方式可以确保消息的可靠性,但是会对发送方的性能产生一定的影响。
- 异步发送:在异步发送模式下,消息发送方发送消息后不会等待Broker的确认响应,而是立即返回并继续执行后续的代码逻辑。发送方可以注册一个回调函数,在Broker确认收到消息后,会触发回调函数执行相应的逻辑。这种方式可以提高发送方的性能,但是消息的可靠性需要开发者自行处理。
- 在单向发送模式下,消息发送方只发送消息,并不等待Broker的确认响应,也没有回调函数执行相应逻辑。该模式适用于一些不需要关心消息是否成功送达的场景,如日志采集、统计数据等。
同步发送
同步发送是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式。
应用场景
对消息可靠性较高的场景,可以保证发送成功后才继续业务逻辑处理。
- 订单系统(下单成功必须有可靠的消息通知)
- 支付系统(支付完成必须通知其他系统)
- 财务系统(交易完成必须同步通知)
代码示例
/*** 同步发送*/
public class SyncProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = new DefaultMQProducer("producerGroup1");// Specify name server addresses.producer.setNamesrvAddr(MqConstant.DEFAULT_NAMESRVADDR);//Launch the instance.producer.start();for (int i = 0; i < 100; i++) {//Create a message instance, specifying topic, tag and message body.Message msg = new Message(MqConstant.SIMPLE_TOPIC ,"TagA" ,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) );//Call send message to deliver message to one of brokers.//收到broker确认后该方法才会返回SendResultSendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}//Shut down once the producer instance is not longer inuse.producer.shutdown();}
}
异步发送
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。云消息队列 RocketMQ 版的异步发送,需要您实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
应用场景
异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。对实时性要求较高。但希望获得发送结果的通知
- 用户注册成功后发送短信、邮件通知
- 日志分析、审计系统
- 视频上传后通知转码服务
- 上传后快速响应用户(不阻塞)
- 转码任务又希望不丢失,所以需要有发送结果回调,确保任务不会漏掉。
代码示例
/*** 异步发送*/
public class AsyncProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = new DefaultMQProducer("producerGroup1");// Specify name server addresses.producer.setNamesrvAddr(MqConstant.DEFAULT_NAMESRVADDR);//Launch the instance.producer.start();producer.setRetryTimesWhenSendAsyncFailed(0);int messageCount = 100;final CountDownLatch countDownLatch = newCountDownLatch(messageCount);for (int i = 0; i < messageCount; i++) {try {final int index = i;Message msg = new Message(MqConstant.SIMPLE_TOPIC,"TagA",("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});}catch (Exception e) {e.printStackTrace();}}countDownLatch.await(5, TimeUnit.SECONDS);producer.shutdown();}
}
单向发送
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
应用场景
不需要消息发送结果、对可靠性要求不高的场景
- 日志采集
- 数据上报(例如埋点统计)
- 监控数据发送
代码示例
/*** 单向发送*/
public class OnewayProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = newDefaultMQProducer("producerGroup1");// Specify name server addresses.producer.setNamesrvAddr(MqConstant.DEFAULT_NAMESRVADDR);//Launch the instance.producer.start();for (int i = 0; i < 100; i++) {//Create a message instance, specifying topic, tag and message body.Message msg = new Message(MqConstant.SIMPLE_TOPIC,"TagA",("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));//Call send message to deliver message to one of brokers.//没有返回值也就是不需要broker确认producer.sendOneway(msg);}//Wait for sending to completeThread.sleep(5000);producer.shutdown();}
}
消息的流转
消息过滤
过滤的含义指的是将符合条件的消息投递给消费者,而不是将匹配到的消息过滤掉。
- topic,消息的逻辑分类,类似于消息的主题或者标签。Producer 发送消息时,需要指定发送到哪个 Topic 下。
- tag,对消息的附加标记,用于在一个主题下对消息进行更细粒度的分类。Consumer 可以根据标签来选择性地订阅和过滤消息。
topic是对消息的高层分类,tag是在topic基础上对消息更细粒度的分类
消息类型
- Normal:普通消息,消息本身无特殊语义,消息之间也没有任何关联。
- FIFO:顺序消息,Apache RocketMQ 通过消息分组MessageGroup标记一组特定消息的先后顺序,可以保证消息的投递顺序严格按照消息发送时的顺序。
- Delay:定时/延时消息,通过指定延时时间控制消息生产后不要立即投递,而是在延时间隔后才对消费者可见。
- Transaction:事务消息,Apache RocketMQ 支持分布式事务消息,支持应用数据库更新和消息调用的事务一致性保障。
代码示例
交易生产者 分别向订单服务、物流服务和支付服务发送消息
/*** 交易服务生产者*/
public class TradeProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("trade-producer-group");producer.setNamesrvAddr(MqConstant.DEFAULT_NAMESRVADDR);producer.start();String[] tags = new String[]{MqConstant.PAY_TAG, MqConstant.LOGISTICS_TAG, MqConstant.ORDER_TAG};for (int i = 0; i < 15; i++) {Message msg = new Message(MqConstant.TRADE_TOPIC,tags[i % tags.length],tags[i % tags.length].getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}producer.shutdown();}
}
物流服务消费者 只消费物流tag的消息
/*** 物流服务消费者*/
public class LogisticsConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("LogisticsConsumer");consumer.setNamesrvAddr(MqConstant.DEFAULT_NAMESRVADDR);consumer.subscribe(MqConstant.TRADE_TOPIC , MqConstant.LOGISTICS_TAG);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n",Thread.currentThread().getName(),msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}
支付服务消费者 只消费订单tag消息
/*** 支付服务消费者*/
public class PaymentConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PaymentConsumer");consumer.setNamesrvAddr(MqConstant.DEFAULT_NAMESRVADDR);consumer.subscribe(MqConstant.TRADE_TOPIC , MqConstant.PAY_TAG);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n",Thread.currentThread().getName(),msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}
交易成功率分析服务 订阅支付和订单消息
/*** 交易成功率分析服务*/
public class TransactionSuccessRateAnalysisConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TransactionSuccessRateAnalysisConsumer");consumer.setNamesrvAddr(MqConstant.DEFAULT_NAMESRVADDR);consumer.subscribe(MqConstant.TRADE_TOPIC , MqConstant.PAY_TAG+" || "+MqConstant.ORDER_TAG);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n",Thread.currentThread().getName(),msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}
实时计算服务 订阅trade-topic所有消息
/*** 实时计算服务 订阅trade-topic所有消息*/
public class RealtimeComputingConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("RealtimeComputingConsumer");consumer.setNamesrvAddr(MqConstant.DEFAULT_NAMESRVADDR);consumer.subscribe(MqConstant.TRADE_TOPIC , "*" );consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n",Thread.currentThread().getName(),msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}
消息消费
消息消费模型
- 消费组间广播消费 (广播):如上图所示,每个消费者分组只初始化唯一一个消费者,每个消费者可消费到消费者分组内所有的消息,各消费者分组都订阅相同的消息,以此实现单客户端级别的广播一对多推送效果。
该方式一般可用于网关推送、配置推送等场景。在广播模式下,同一个 ConsumerGroup 中的每个 Consumer 实例都处理全部的队列。需要注意的是,广播模式下因为每个 Consumer 实例都需要处理全部的消息,因此这种模式仅推荐在通知推送、配置同步类小流量场景使用。 - 消费组内共享消费 (集群):如上图所示,每个消费者分组下初始化了多个消费者,这些消费者共同分担消费者分组内的所有消息,实现消费者分组内流量的水平拆分和均衡负载。
该方式一般可用于微服务解耦场景。
@Slf4j
public class ClusterModel1 {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cluster_consume_group");consumer.setNamesrvAddr(MqConstant.DEFAULT_NAMESRVADDR);consumer.subscribe(MqConstant.CONSUME_TEST_TOPIC, "TagA");// 消费模式 默认是集群模式(负载均衡模式)consumer.setMessageModel(MessageModel.BROADCASTING);// 注册回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {try {for (MessageExt msg : msgs) {String topic = msg.getTopic();String tags = msg.getTags();String msgBody = new String(msg.getBody());log.info("消息消费成功:{}", "topic:" + topic + " tags:" + tags + " 消息内容:" + msgBody);}} catch (Exception e) {log.info("消息消费失败:{}", e.getMessage());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 最后才启动消费者consumer.start();log.info("消费者启动成功... clientId:"+consumer.buildMQClientId());}
}