当前位置: 首页 > news >正文

spring-kafka的消息过滤器RecordFilterStrategy

是什么

RecordFilterStrategy 的核心作用是 在消息被消费者listener方法处理之前,根据自定义的逻辑决定是否丢弃(过滤掉)某条消息。

你可以把它想象成一个“保安”,每条消息到达消费者listener之前,都要经过它的检查。如果它返回 true,这条消息就会被丢弃(过滤掉),你的业务代码永远不会看到它;如果返回 false,消息则会被正常处理。

典型应用场景:

  • 重复消息过滤:根据消息的唯一ID(例如数据库主键、业务ID等)判断是否已经处理过,避免重复消费。

  • 消息内容过滤:检查消息体(payload),如果内容不合法、不完整或者不符合当前业务逻辑(例如不是当前地区的数据),则将其丢弃。

  • 条件性消费:根据消息头(Headers)中的某些特征(如版本号、类型标识)来决定是否处理。

  • 调试与特殊处理:在测试环境中,可以过滤掉某些生产数据,或者只处理特定类型的消息进行调试。

重要特性:

  • 过滤是无声的:被过滤的消息会被简单地丢弃,不会抛出异常,也不会重试,更不会进入死信队列(DLT)。它就像从未收到过这条消息一样。

  • 与重试机制的关系:过滤发生在重试机制之前。如果一条消息在第一次尝试时就被过滤了,那么它根本不会进入重试循环。

怎么用

使用 RecordFilterStrategy 通常需要两个步骤:

  • 定义一个实现该接口的过滤器类
  • 使用这个过滤器类,下面2种办法任选其一
    • 可以将其配置到你的监听容器工厂ConcurrentKafkaListenerContainerFactory中
    • 可以将其定义成一个spring的bean,然后在listener中进行引用。

我们以一个防止消息重复消费的案例来说明下具体怎么用。

  • 在发消息的时候,我们给每一条消息的header中都设置一个唯一的消息id,这个可以使用ProducerInterceptor来实现。
public class IdempotentInterceptor implements ProducerInterceptor<String, String> {private static Logger log = LoggerFactory.getLogger(IdempotentInterceptor.class);@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {Headers headers = record.headers();Header header = headers.lastHeader("MSG_ID");if(header == null){record.headers().add("MSG_ID", UUID.randomUUID().toString().getBytes());}log.info("发送消息:{},msg id:{}", record.value(), new String(record.headers().lastHeader("MSG_ID").value()));return record;}@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {if(e == null){log.info("发送消息成功");} else {log.error("发送消息失败 : {}", e);}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {log.info("configure:{}", map);}
}
  • 定义一个实现了RecordFilterStrategy接口的Bean,在里面读取消息的id,判断是否已经处理过,可以利用redis来做,然后在listener中引用这个bean即可。
// 定义filter
@Slf4j
public class IdempotentFilter implements RecordFilterStrategy<String, String> {private Map<String, String> redis = new HashMap<>();@Overridepublic boolean filter(ConsumerRecord<String, String> consumerRecord) {Headers headers = consumerRecord.headers();Header header = headers.lastHeader("MSG_ID");if(header == null){return false;}String msgId = new String(header.value());if(redis.containsKey(msgId)){log.info("丢弃重复消息:{} msg id:{}", consumerRecord.value(), msgId);return true;}else{redis.put(msgId, msgId);// set cache timeoutreturn false;}}
}
// 配置成spring的bean
public class KafkaConfiguration {@Beanpublic IdempotentFilter idempotentFilter(){return new IdempotentFilter();}
}
// 在listener中引用即可
@Component
public class DemoListener {private static Logger log = LoggerFactory.getLogger(DemoListener.class);@KafkaListener(topics = {"test-topic"}, filter = "idempotentFilter")public void onMessage(ConsumerRecord record){Object value = record.value();log.info("收到消息:{}, msg id:{}", value,new String(record.headers().lastHeader("MSG_ID").value()));}
}

application.yml:

spring:kafka:bootstrap-servers: localhost:9092  # Kafka服务器地址consumer:group-id: my-group  # 默认的消费者组IDauto-offset-reset: earliest  # 如果没有初始偏移量或偏移量已失效,从最早的消息开始读取key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:interceptor.classes: com.github.xjs.kafka.interceptor.IdempotentInterceptor

发送2条id一样的msg做测试:

@GetMapping("send/{msg}")public String send(@PathVariable("msg")String msg) {GenericMessage<String> message = new GenericMessage<>(msg, Map.of("MSG_ID", UUID.randomUUID().toString()));// 发送两条msg_id一样的消息kafkaTemplate.setDefaultTopic("test-topic");kafkaTemplate.send(message);kafkaTemplate.send(message);return "OK";}

控制台打印:

2025-09-16T11:03:17.022+08:00  INFO 8664 --- [nio-8080-exec-2] c.g.x.k.i.IdempotentInterceptor          : 发送消息:2222,msg id:2fe41c8b-ca5d-4c39-8042-d9bee198b860
2025-09-16T11:03:17.041+08:00  INFO 8664 --- [nio-8080-exec-2] c.g.x.k.i.IdempotentInterceptor          : 发送消息:2222,msg id:2fe41c8b-ca5d-4c39-8042-d9bee198b860
2025-09-16T11:03:17.064+08:00  INFO 8664 --- [ad | producer-1] c.g.x.k.i.IdempotentInterceptor          : 发送消息成功
2025-09-16T11:03:17.065+08:00  INFO 8664 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener      : 收到消息:2222, msg id:2fe41c8b-ca5d-4c39-8042-d9bee198b860
2025-09-16T11:03:17.065+08:00  INFO 8664 --- [ntainer#0-0-C-1] c.g.xjs.kafka.filter.IdempotentFilter    : 丢弃重复消息:2222 msg id:2fe41c8b-ca5d-4c39-8042-d9bee198b860
2025-09-16T11:03:17.066+08:00  INFO 8664 --- [ad | producer-1] c.g.x.k.i.IdempotentInterceptor          : 发送消息成功

与消费者拦截器的区别

过滤器RecordFilterStrategy的功能看上去也可以使用拦截器ConsumerInterceptor来实现,最终效果都是让应用程序(Consumer)看不到某条消息,但它们的设计目的、实现层级、对消费语义的影响以及使用场景有本质的区别。

  • ConsumerInterceptor
    运行机制:消费者从Broker拉取一批消息,消息被反序列化,onConsume方法被调用,传入这批消息。如果你返回null,原始的那批消息会被整个丢弃。消费者的poll()方法返回一个空的集合给你的应用程序代码。

    Kafka客户端的Offset管理机制对此一无所知。它认为消息已经成功拉取并交付了。无论是自动提交还是手动提交,它提交的Offset都是基于最后一次成功拉取的位移,而不是基于应用程序成功处理的位移。结果就是被丢弃的消息的Offset会被提交,这些消息永远丢失,且无法回溯。这破坏了Kafka提供的“至少一次”交付语义。

  • RecordFilterStrategy
    运行机制:Spring Kafka的监听器容器(如ConcurrentMessageListenerContainer)poll()到消息以后,为每条消息创建一个ConsumerRecord并封装成Message,在调用@KafkaListener的方法之前,容器会先检查是否配置了RecordFilterStrategy,RecordFilterStrategy 的 filter 方法被调用,如果返回true,就不会调用listener。

    它的核心优势就是过滤行为被整合到了Spring的消息处理生命周期中。Offset的提交与消息的处理结果(包括被过滤) 是同步的。被过滤的消息被视为已成功处理的业务逻辑,因此提交其Offset是正确且安全的行为,符合业务意图。

因此,不要使用 ConsumerInterceptor.onConsume 返回 null 来作为业务消息的过滤手段,这会导致数据丢失。它的正确用途是可观测性(Observability)和消息修改(需谨慎)。如果需要在Spring Kafka应用程序中过滤消息,应该使用 RecordFilterStrategy。这是框架提供的、安全的、语义正确的过滤方式。


文章转载自:

http://jXacPuF1.sqqpb.cn
http://SIQagWEI.sqqpb.cn
http://5uvx0xq1.sqqpb.cn
http://RXxpoL3N.sqqpb.cn
http://kL9Ec2Me.sqqpb.cn
http://RLbVyWBt.sqqpb.cn
http://QQUave8Y.sqqpb.cn
http://vPG64wEO.sqqpb.cn
http://o017yGcx.sqqpb.cn
http://iUxDKxse.sqqpb.cn
http://IDm34CVo.sqqpb.cn
http://PKVD0G91.sqqpb.cn
http://BoWFIGJg.sqqpb.cn
http://B0QFs8qC.sqqpb.cn
http://vmz7M9QI.sqqpb.cn
http://dHrApcZD.sqqpb.cn
http://UaGaB6aB.sqqpb.cn
http://6eEK5xKS.sqqpb.cn
http://kqWZBXPh.sqqpb.cn
http://HEb1gEbh.sqqpb.cn
http://VnSNmw5i.sqqpb.cn
http://mCu5b53g.sqqpb.cn
http://nuMuN5VU.sqqpb.cn
http://4gm3xGPa.sqqpb.cn
http://6VlmbIdq.sqqpb.cn
http://phSnXrTg.sqqpb.cn
http://eZCOlhLv.sqqpb.cn
http://06HEZv0O.sqqpb.cn
http://sWE32qbH.sqqpb.cn
http://oMlAZnAs.sqqpb.cn
http://www.dtcms.com/a/386402.html

相关文章:

  • gin中sse流式服务
  • 论文笔记(九十一)GWM: Towards Scalable Gaussian World Models for Robotic Manipulation
  • Simulink(MATLAB)与 LabVIEW应用对比
  • [BX]和loop指令,debug和masm汇编编译器对指令的不同处理,循环,大小寄存器的包含关系,操作数据长度与寄存器的关系,段前缀
  • Django RBAC权限实战全流程
  • 智启燃气新未来丨众智鸿图精彩亮相2025燃气运营与安全研讨会
  • Docker Push 常见报错及解决方案汇总
  • OCR 后结构化处理最佳实践
  • 软考 系统架构设计师系列知识点之杂项集萃(148)
  • P1425 小鱼的游泳时间
  • 弧焊机器人氩气焊接节能方法
  • 机器人导论 第六章 动力学(2)——拉格朗日动力学推导与详述
  • 在uniapp中调用虚拟机调试vue项目
  • UE5 GAS 技能系统解析:EGameplayAbilityTriggerSource 枚举详解
  • MySQL 基础概念与简单使用
  • PostgreSQL高可用架构实战:构建企业级数据连续性保障体系
  • (二)昇腾AI处理器计算资源层基础
  • C++17新特性:用[*this]告别悬垂指针,提升并发健壮性
  • Buck电路输出电容设计:从理论到实践的完整指南
  • Gin + Gorm:完整 CRUD API 与关系操作指南
  • 996引擎-ItemTips特效框层级自定义
  • 软考高级系统架构设计师之构件与中间件技术篇
  • Maya绑定案例:摆动、扭曲、拉伸(样条IK高级扭曲、表达式)
  • FOG钻井多花数倍成本?MEMS陀螺定向短节如何为成本做“减法”?
  • 性能分析工具的使用
  • DNS-Windows上使用DNS
  • Go 语言开发京东商品详情 API:构建高并发数据采集服务
  • 通用计算流体力学CFD软件VirtualFlow 2025发布,5大亮点
  • 趣味学RUST基础篇(实战Web server)完结
  • 机器人导论 第六章 动力学(1)——牛顿欧拉法推导与详述