基于Kafka实现动态监听topic功能
生命无罪,健康万岁,我是laity。
我曾七次鄙视自己的灵魂:
第一次,当它本可进取时,却故作谦卑;
第二次,当它在空虚时,用爱欲来填充;
第三次,在困难和容易之间,它选择了容易;
第四次,它犯了错,却借由别人也会犯错来宽慰自己;
第五次,它自由软弱,却把它认为是生命的坚韧;
第六次,当它鄙夷一张丑恶的嘴脸时,却不知那正是自己面具中的一副;
第七次,它侧身于生活的污泥中,虽不甘心,却又畏首畏尾。
基于Kafka实现动态监听topic功能
业务场景:导条根据各家接口进行数据分发其中包含动态kafka-topic,各家通过监听topic实现获取数据从而实现后续业务。
实现逻辑
pom
yaml 方案1 接收的是String
kafka:bootstrap-servers: youKafkaIp:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔listener:type: batchconsumer:enable-auto-commit: falsevalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerkey-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliestgroup-id: consumer-sbproducer:value-serializer: org.apache.kafka.common.serialization.StringSerializerkey-serializer: org.apache.kafka.common.serialization.StringSerializer
yaml 方案2 接收的是Byte
kafka:bootstrap-servers: youKafkaIp:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔listener:type: batchconsumer:enable-auto-commit: falsevalue-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializerkey-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializerauto-offset-reset: earliestgroup-id: consumer-sbproducer:value-serializer: org.apache.kafka.common.serialization.StringSerializerkey-serializer: org.apache.kafka.common.serialization.StringSerializer
收消息CODE
KafkaConfig.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;import java.util.HashMap;
import java.util.Map;/*** @author laity*/
@EnableKafka
@Configuration
public class KafkaConfig {// 解决 Could not create message listener - MessageHandlerMethodFactory not set TODO:WWS 不好使/*@Beanpublic KafkaListenerAnnotationBeanPostProcessor kafkaListenerAnnotationBeanPostProcessor() {KafkaListenerAnnotationBeanPostProcessor processor = new KafkaListenerAnnotationBeanPostProcessor();processor.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());return processor;}*/@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> map = new HashMap<>();map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "youKafkaIp:9092");map.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-laity");map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());return new DefaultKafkaConsumerFactory<String, String>(map);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(5);// new DefaultMessageHandlerMethodFactory()return factory;}// implements KafkaListenerConfigurer + 解决 Could not create message listener - MessageHandlerMethodFactory not set/*@Overridepublic void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {registrar.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());}*/
}
KafkaListenerController.java
package cn.iocoder.yudao.server.controller.admin.szbl;import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.server.controller.admin.szbl.common.config.kafka.MyComponent;
import cn.iocoder.yudao.server.controller.admin.szbl.vo.InitSceneVO;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.security.PermitAll;/*** @author laity*/
@RestController
@RequestMapping("/kafka")
public class KafkaListenerController {private final MyComponent component;public KafkaListenerController(MyComponent component) {this.component = component;}private String topic;// 用于接收导条分发数据接口@PostMapping("/reception")@PermitAllpublic CommonResult<Boolean> putAwayL(@RequestBody InitSceneVO vo) {// …… 业务逻辑// 去执行 监听固定的topiccomponent.startListening(vo.getGzTopicName());return CommonResult.success(true);}
}
DynamicKafkaListenerService.java
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.stereotype.Service;import java.lang.reflect.Method;
import java.time.LocalDateTime;
import java.util.Objects;/*** @author laity 动态管理Kafka监听器*/
@Service
public class DynamicKafkaListenerService {private final KafkaListenerEndpointRegistry registry;private final ConcurrentKafkaListenerContainerFactory<String, String> factory;@Autowiredpublic DynamicKafkaListenerService(KafkaListenerEndpointRegistry registry, ConcurrentKafkaListenerContainerFactory<String, String> factory) {this.registry = registry;this.factory = factory;}public void addListener(String topic, String groupId, Object bean, Method method) {if (AopUtils.isAopProxy(bean)) {try {bean = ((Advised) bean).getTargetSource().getTarget();} catch (Exception e) {throw new RuntimeException(e);}}MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();assert bean != null;endpoint.setBean(bean);endpoint.setMethod(method);endpoint.setTopics(topic);endpoint.setGroup(groupId);endpoint.setId(method.getName() + "_" + LocalDateTime.now());endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory()); // 之前怎么点都点不出来这个属性 突然又出来了……无语registry.registerListenerContainer(endpoint, factory, true); // 指定容器工厂}public void removeListener(String beanName) {// 断言Objects.requireNonNull(registry.getListenerContainer(beanName)).stop();registry.unregisterListenerContainer(beanName);}
}
BlueKafkaConsumer.java
import org.springframework.stereotype.Component;/*** @author laity*/
@Component
public class BlueKafkaConsumer {// @KafkaListener(topics = "#{__listener.getTopicName()}", groupId = "consumer-laity")public void listen(Object record) {System.out.println("======================= 接收动态KafkaTopics Received message ========================");System.out.println(record.toString());}}
MyComponent.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.lang.reflect.Method;/*** @author laity*/
@Component
public class MyComponent {private final DynamicKafkaListenerService kafkaListenerService;private final BlueKafkaConsumer blueKafkaConsumer;@Autowiredpublic MyComponent(DynamicKafkaListenerService kafkaListenerService, BlueKafkaConsumer blueKafkaConsumer) {this.kafkaListenerService = kafkaListenerService;this.blueKafkaConsumer = blueKafkaConsumer;}public void startListening(String topic) {try {Method blueMethod = BlueKafkaConsumer.class.getMethod("listen", Object.class);kafkaListenerService.addListener(topic, "consumer-laity", blueKafkaConsumer, blueMethod);} catch (NoSuchMethodException e) {throw new RuntimeException(e);}}public void stopListening(String beanName) {kafkaListenerService.removeListener(beanName);}// init@PostConstruct // 这个是服务启动时调用 但我想要的时实时可变的public void init() {}}
世界上最可贵的两个词,一个叫认真,一个叫坚持,认真的人改变自己,坚持的人改变命运,有些事情不是看到了希望才去坚持,而是坚持了才有希望。我是Laity,正在前进的Laity。