当前位置: 首页 > wzjs >正文

企业建站系统官网今天最新新闻事件报道

企业建站系统官网,今天最新新闻事件报道,沈阳世纪兴网站制作公司,付费推广方式有哪些文章目录 过滤消息TAG模式过滤FilterByTagProducer.javaFilterByTagConsumer.java SQL表达式过滤FilterBySQLProducer.javaFilterBySQLConsumer.java 类过滤模式(基于4.2.0版本) 过滤消息 消息过滤包括基于表达式过滤与基于类模式两种过滤模式。其中表达…

文章目录

  • 过滤消息
    • TAG模式过滤
      • FilterByTagProducer.java
      • FilterByTagConsumer.java
    • SQL表达式过滤
      • FilterBySQLProducer.java
      • FilterBySQLConsumer.java
    • 类过滤模式(基于4.2.0版本)

过滤消息

消息过滤包括基于表达式过滤与基于类模式两种过滤模式。其中表达式过滤⼜分为TAG和SQL92模式

TAG模式过滤

发送消息时我们会为每⼀条消息设置TAG标签,同⼀⼤类中的消息放在⼀个主题TOPIC下,但是如果
进⾏分类我们则可以根据TAG进⾏分类,每⼀类消费者可能不是关系某个主题下的所有消息,我们就可
以通过TAG进⾏过滤,订阅关注的某⼀类数据。

FilterByTagProducer.java

package com.example.rocketmq.demo.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;//通过TAG 实现 过滤消息
public class FilterByTagProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = newDefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("localhost:9876");//Launch the instance.producer.start();String[] tags = {"TAGA","TAGB","TAGC"};for (int i = 0; i < 10; i++) {String tag =   tags[i%tags.length];//每个消息设置一个tag,tag 二级分类Message msg = new Message("TopicTest",tag,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}

FilterByTagConsumer.java

package com.example.rocketmq.demo.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class FilterByTagConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// Specify name server addresses.consumer.setNamesrvAddr("localhost:9876");//订阅Topic,只订阅标签为A或B的消息consumer.subscribe("TopicTest", "TAGA || TAGB");// Register callback to execute on arrival of messages fetched from brokers.consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//Launch the consumer instance.consumer.start();System.out.printf("Consumer Started.%n");}
}

SQL表达式过滤

SQL92表达式消息过滤,是通过消息的属性运⾏SQL过滤表达式进⾏条件匹配,消息发送时需要设置⽤户的属性putUserProperty⽅法设置属性。
支持的语法:

  1. 数值⽐较, 如 > , >= , < , <= , BETWEEN , = ;
  2. 字符⽐较, 如 = , <> , IN ;
  3. IS NULL or IS NOT NULL ;
  4. 逻辑连接符 AND , OR , NOT ;

支持的类型:

  1. 数值型, 如123, 3.1415;
  2. 字符型, 如 ‘abc’, 必须⽤单引号;
  3. NULL , 特殊常数;
  4. 布尔值, TRUE or FALSE ;

FilterBySQLProducer.java

package com.example.rocketmq.demo.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class FilterBySQLProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer = newDefaultMQProducer("please_rename_unique_group_name");// Specify name server addresses.producer.setNamesrvAddr("localhost:9876");producer.start();String[] tags = {"TagA","TagB","TagC","TagD"};for (int i = 0; i < 10; i++) {try {String tag = tags[i % tags.length];//构建消息Message msg = new Message("TopicTest" /* Topic */,tag /* Tag */,("RocketMQ消息测试,消息的TAG="+tag+  ", 属性 age = " + i + ", == " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));//每个消息设置属性为age,age值为0-9msg.putUserProperty("age", i+"");SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();
//                Thread.sleep(1000);}}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}

FilterBySQLConsumer.java

package com.example.rocketmq.demo.filter;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;public class FilterBySQLConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");// Specify name server addresses.consumer.setNamesrvAddr("localhost:9876");//订阅Topicconsumer.subscribe("TopicTest", MessageSelector.bySql("age between 0 and 6"));consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer===启动成功!");}
}

类过滤模式(基于4.2.0版本)

RocketMQ通过定义消息过滤类的接⼝实现消息过滤

http://www.dtcms.com/wzjs/232833.html

相关文章:

  • 邱县做网站宁波seo推广服务
  • 天津网站优化流程寻找郑州网站优化公司
  • 哪个网站推广比较好怎么根据视频链接找到网址
  • wordpress 同分类评论调用佛山seo关键词排名
  • 那个网站直接回做二手发电机市场营销策划公司排名
  • 一家专门做软件的网站东莞搜索网络优化
  • 音乐网站怎么做社交的网站制作和推广
  • 网站改版与优化协议书免费建站网站
  • wordPress如何把菜单加入导航吉林网站seo
  • 6网站建设做网站网页优化包括什么
  • wordpress 图片显示插件南京百度关键字优化价格
  • 下拉框代码自做生成网站广告关键词排名
  • 网站程序合同大连seo建站
  • 网页设计创建网站的基本流程登封网络推广公司
  • 公司网站如何做seoseo应该如何做
  • 海南网站建设软件成功品牌策划案例
  • app开发程序北京网站优化推广方案
  • 网站效果图可以做动态的嘛百度下载app下载安装到手机
  • 旬阳县建设局网站品牌营销策划怎么写
  • 那个网站可以做网站测速对比推广app赚佣金平台
  • 北京 网站建设托管公司上海哪家优化公司好
  • 绵阳做网站的公司有哪些百度认证怎么认证
  • wordpress 附件 文件夹seo推广是什么意思呢
  • 使用wordpress做网站seo排名啥意思
  • 湖南省网站建设项目江苏免费关键词排名外包
  • 做网站 广州福州seo排名优化
  • 深圳专业建设网站哪个公司好优化设计答案五年级上册
  • 租赁网站空间搜索引擎优化课程
  • 广州市门户网站建设品牌全国知名网站排名
  • 门户网站建设采购站长统计性宝app