当前位置: 首页 > news >正文

011 rocketmq过滤消息

文章目录

  • 过滤消息
    • TAG模式过滤
      • FilterByTagProducer.java
      • FilterByTagConsumer.java
    • SQL表达式过滤
      • FilterBySQLProducer.java
      • FilterBySQLConsumer.java
    • 类过滤模式(基于4.2.0版本)

过滤消息

消息过滤包括基于表达式过滤与基于类模式两种过滤模式。其中表达式过滤⼜分为TAG和SQL92模式

TAG模式过滤

发送消息时我们会为每⼀条消息设置TAG标签,同⼀⼤类中的消息放在⼀个主题TOPIC下,但是如果
进⾏分类我们则可以根据TAG进⾏分类,每⼀类消费者可能不是关系某个主题下的所有消息,我们就可
以通过TAG进⾏过滤,订阅关注的某⼀类数据。

FilterByTagProducer.java

package com.example.rocketmq.demo.filter;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;


//通过TAG 实现 过滤消息
public class FilterByTagProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
                DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        String[] tags = {"TAGA","TAGB","TAGC"};
        for (int i = 0; i < 10; i++) {
            String tag =   tags[i%tags.length];
            //每个消息设置一个tag,tag 二级分类
            Message msg = new Message("TopicTest",tag,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );

            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

FilterByTagConsumer.java

package com.example.rocketmq.demo.filter;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class FilterByTagConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // Specify name server addresses.
        consumer.setNamesrvAddr("localhost:9876");

        //订阅Topic,只订阅标签为A或B的消息
        consumer.subscribe("TopicTest", "TAGA || TAGB");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

SQL表达式过滤

SQL92表达式消息过滤,是通过消息的属性运⾏SQL过滤表达式进⾏条件匹配,消息发送时需要设置⽤户的属性putUserProperty⽅法设置属性。
支持的语法:

  1. 数值⽐较, 如 > , >= , < , <= , BETWEEN , = ;
  2. 字符⽐较, 如 = , <> , IN ;
  3. IS NULL or IS NOT NULL ;
  4. 逻辑连接符 AND , OR , NOT ;

支持的类型:

  1. 数值型, 如123, 3.1415;
  2. 字符型, 如 ‘abc’, 必须⽤单引号;
  3. NULL , 特殊常数;
  4. 布尔值, TRUE or FALSE ;

FilterBySQLProducer.java

package com.example.rocketmq.demo.filter;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class FilterBySQLProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
                DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        String[] tags = {"TagA","TagB","TagC","TagD"};
        for (int i = 0; i < 10; i++) {
            try {
                String tag = tags[i % tags.length];
                //构建消息
                Message msg = new Message("TopicTest" /* Topic */,
                        tag /* Tag */,
                        ("RocketMQ消息测试,消息的TAG="+tag+  ", 属性 age = " + i + ", == " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                );
                //每个消息设置属性为age,age值为0-9
                msg.putUserProperty("age", i+"");
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
//                Thread.sleep(1000);
            }
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

FilterBySQLConsumer.java

package com.example.rocketmq.demo.filter;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

public class FilterBySQLConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // Specify name server addresses.
        consumer.setNamesrvAddr("localhost:9876");
        //订阅Topic
        consumer.subscribe("TopicTest", MessageSelector.bySql("age between 0 and 6"));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer===启动成功!");
    }
}

类过滤模式(基于4.2.0版本)

RocketMQ通过定义消息过滤类的接⼝实现消息过滤

相关文章:

  • 若依框架修改为多租户
  • 从分层到微服务:构建高可扩展的 ERP 系统架构之道
  • Excel基础(详细篇):总结易忽视的知识点,有用的细节操作
  • (一)Java虚拟机——JVM的组成
  • 构建高可用和高防御力的云服务架构第五部分:PolarDB(55)
  • 【Word2Vec】Skip-gram 的直观理解(深入浅出)
  • Redis 的几个热点知识
  • (十 一)趣学设计模式 之 组合模式!
  • 【Mac】git使用再学习
  • 基于SpringBoot的“母婴护理知识共享系统”的设计与实现(源码+数据库+文档+PPT)
  • LeetCode 124:二叉树中的最大路径和
  • Spring Boot 入门 与 无法解析符号 springframework 的解决
  • Three.js 快速入门教程【十一】天空盒的多种实现方式
  • C#学生管理系统 进阶(通过接口,继承接口的类,实现接口约束_对List中存储的数据进行排列)
  • 什么是requestIdleCallback?
  • Hue Docker镜像构建异常:gnutls_handshake() failed
  • 第15届 蓝桥杯 C++编程青少组中/高级选拔赛 202403 真题答案及解析
  • Win32 C++ 电源计划操作
  • 第三百七十二节 JavaFX教程 - JavaFX HTMLEditor
  • spring事件
  • 特朗普公开“怼”库克:苹果不应在印度生产手机
  • 李成钢:近期个别经济体实施所谓“对等关税”,严重违反世贸组织规则
  • 秦洪看盘|指标股发力,A股渐有突破态势
  • 中科飞测将投资超10亿元,在上海张江成立第二总部
  • 金正恩观摩朝鲜人民军各兵种战术综合训练
  • 美国和沙特签署上千亿美元军售协议