spring-kafka的消息过滤器RecordFilterStrategy
是什么
RecordFilterStrategy
的核心作用是 在消息被消费者listener方法处理之前,根据自定义的逻辑决定是否丢弃(过滤掉)某条消息。
你可以把它想象成一个“保安”,每条消息到达消费者listener之前,都要经过它的检查。如果它返回 true,这条消息就会被丢弃(过滤掉),你的业务代码永远不会看到它;如果返回 false,消息则会被正常处理。
典型应用场景:
-
重复消息过滤:根据消息的唯一ID(例如数据库主键、业务ID等)判断是否已经处理过,避免重复消费。
-
消息内容过滤:检查消息体(payload),如果内容不合法、不完整或者不符合当前业务逻辑(例如不是当前地区的数据),则将其丢弃。
-
条件性消费:根据消息头(Headers)中的某些特征(如版本号、类型标识)来决定是否处理。
-
调试与特殊处理:在测试环境中,可以过滤掉某些生产数据,或者只处理特定类型的消息进行调试。
重要特性:
-
过滤是无声的:被过滤的消息会被简单地丢弃,不会抛出异常,也不会重试,更不会进入死信队列(DLT)。它就像从未收到过这条消息一样。
-
与重试机制的关系:过滤发生在重试机制之前。如果一条消息在第一次尝试时就被过滤了,那么它根本不会进入重试循环。
怎么用
使用 RecordFilterStrategy 通常需要两个步骤:
- 定义一个实现该接口的过滤器类
- 使用这个过滤器类,下面2种办法任选其一
- 可以将其配置到你的监听容器工厂ConcurrentKafkaListenerContainerFactory中
- 可以将其定义成一个spring的bean,然后在listener中进行引用。
我们以一个防止消息重复消费的案例来说明下具体怎么用。
- 在发消息的时候,我们给每一条消息的header中都设置一个唯一的消息id,这个可以使用
ProducerInterceptor
来实现。
public class IdempotentInterceptor implements ProducerInterceptor<String, String> {private static Logger log = LoggerFactory.getLogger(IdempotentInterceptor.class);@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {Headers headers = record.headers();Header header = headers.lastHeader("MSG_ID");if(header == null){record.headers().add("MSG_ID", UUID.randomUUID().toString().getBytes());}log.info("发送消息:{},msg id:{}", record.value(), new String(record.headers().lastHeader("MSG_ID").value()));return record;}@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {if(e == null){log.info("发送消息成功");} else {log.error("发送消息失败 : {}", e);}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {log.info("configure:{}", map);}
}
- 定义一个实现了
RecordFilterStrategy
接口的Bean,在里面读取消息的id,判断是否已经处理过,可以利用redis来做,然后在listener中引用这个bean即可。
// 定义filter
@Slf4j
public class IdempotentFilter implements RecordFilterStrategy<String, String> {private Map<String, String> redis = new HashMap<>();@Overridepublic boolean filter(ConsumerRecord<String, String> consumerRecord) {Headers headers = consumerRecord.headers();Header header = headers.lastHeader("MSG_ID");if(header == null){return false;}String msgId = new String(header.value());if(redis.containsKey(msgId)){log.info("丢弃重复消息:{} msg id:{}", consumerRecord.value(), msgId);return true;}else{redis.put(msgId, msgId);// set cache timeoutreturn false;}}
}
// 配置成spring的bean
public class KafkaConfiguration {@Beanpublic IdempotentFilter idempotentFilter(){return new IdempotentFilter();}
}
// 在listener中引用即可
@Component
public class DemoListener {private static Logger log = LoggerFactory.getLogger(DemoListener.class);@KafkaListener(topics = {"test-topic"}, filter = "idempotentFilter")public void onMessage(ConsumerRecord record){Object value = record.value();log.info("收到消息:{}, msg id:{}", value,new String(record.headers().lastHeader("MSG_ID").value()));}
}
application.yml:
spring:kafka:bootstrap-servers: localhost:9092 # Kafka服务器地址consumer:group-id: my-group # 默认的消费者组IDauto-offset-reset: earliest # 如果没有初始偏移量或偏移量已失效,从最早的消息开始读取key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:interceptor.classes: com.github.xjs.kafka.interceptor.IdempotentInterceptor
发送2条id一样的msg做测试:
@GetMapping("send/{msg}")public String send(@PathVariable("msg")String msg) {GenericMessage<String> message = new GenericMessage<>(msg, Map.of("MSG_ID", UUID.randomUUID().toString()));// 发送两条msg_id一样的消息kafkaTemplate.setDefaultTopic("test-topic");kafkaTemplate.send(message);kafkaTemplate.send(message);return "OK";}
控制台打印:
2025-09-16T11:03:17.022+08:00 INFO 8664 --- [nio-8080-exec-2] c.g.x.k.i.IdempotentInterceptor : 发送消息:2222,msg id:2fe41c8b-ca5d-4c39-8042-d9bee198b860
2025-09-16T11:03:17.041+08:00 INFO 8664 --- [nio-8080-exec-2] c.g.x.k.i.IdempotentInterceptor : 发送消息:2222,msg id:2fe41c8b-ca5d-4c39-8042-d9bee198b860
2025-09-16T11:03:17.064+08:00 INFO 8664 --- [ad | producer-1] c.g.x.k.i.IdempotentInterceptor : 发送消息成功
2025-09-16T11:03:17.065+08:00 INFO 8664 --- [ntainer#0-0-C-1] c.g.xjs.kafka.listener.DemoListener : 收到消息:2222, msg id:2fe41c8b-ca5d-4c39-8042-d9bee198b860
2025-09-16T11:03:17.065+08:00 INFO 8664 --- [ntainer#0-0-C-1] c.g.xjs.kafka.filter.IdempotentFilter : 丢弃重复消息:2222 msg id:2fe41c8b-ca5d-4c39-8042-d9bee198b860
2025-09-16T11:03:17.066+08:00 INFO 8664 --- [ad | producer-1] c.g.x.k.i.IdempotentInterceptor : 发送消息成功
与消费者拦截器的区别
过滤器RecordFilterStrategy
的功能看上去也可以使用拦截器ConsumerInterceptor
来实现,最终效果都是让应用程序(Consumer)看不到某条消息,但它们的设计目的、实现层级、对消费语义的影响以及使用场景有本质的区别。
-
ConsumerInterceptor
运行机制:消费者从Broker拉取一批消息,消息被反序列化,onConsume方法被调用,传入这批消息。如果你返回null,原始的那批消息会被整个丢弃。消费者的poll()方法返回一个空的集合给你的应用程序代码。Kafka客户端的Offset管理机制对此一无所知。它认为消息已经成功拉取并交付了。无论是自动提交还是手动提交,它提交的Offset都是基于最后一次成功拉取的位移,而不是基于应用程序成功处理的位移。结果就是被丢弃的消息的Offset会被提交,这些消息永远丢失,且无法回溯。这破坏了Kafka提供的“至少一次”交付语义。
-
RecordFilterStrategy
运行机制:Spring Kafka的监听器容器(如ConcurrentMessageListenerContainer)poll()到消息以后,为每条消息创建一个ConsumerRecord并封装成Message,在调用@KafkaListener的方法之前,容器会先检查是否配置了RecordFilterStrategy,RecordFilterStrategy 的 filter 方法被调用,如果返回true,就不会调用listener。它的核心优势就是过滤行为被整合到了Spring的消息处理生命周期中。Offset的提交与消息的处理结果(包括被过滤) 是同步的。被过滤的消息被视为已成功处理的业务逻辑,因此提交其Offset是正确且安全的行为,符合业务意图。
因此,不要使用 ConsumerInterceptor.onConsume 返回 null 来作为业务消息的过滤手段,这会导致数据丢失。它的正确用途是可观测性(Observability)和消息修改(需谨慎)。如果需要在Spring Kafka应用程序中过滤消息,应该使用 RecordFilterStrategy。这是框架提供的、安全的、语义正确的过滤方式。