当前位置: 首页 > news >正文

基于Kafka实现动态监听topic功能

生命无罪,健康万岁,我是laity。

我曾七次鄙视自己的灵魂:

第一次,当它本可进取时,却故作谦卑;

第二次,当它在空虚时,用爱欲来填充;

第三次,在困难和容易之间,它选择了容易;

第四次,它犯了错,却借由别人也会犯错来宽慰自己;

第五次,它自由软弱,却把它认为是生命的坚韧;

第六次,当它鄙夷一张丑恶的嘴脸时,却不知那正是自己面具中的一副;

第七次,它侧身于生活的污泥中,虽不甘心,却又畏首畏尾。

基于Kafka实现动态监听topic功能

业务场景:导条根据各家接口进行数据分发其中包含动态kafka-topic,各家通过监听topic实现获取数据从而实现后续业务。

实现逻辑

pom

yaml 方案1 接收的是String

  kafka:bootstrap-servers: youKafkaIp:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔listener:type: batchconsumer:enable-auto-commit: falsevalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerkey-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliestgroup-id: consumer-sbproducer:value-serializer: org.apache.kafka.common.serialization.StringSerializerkey-serializer: org.apache.kafka.common.serialization.StringSerializer

yaml 方案2 接收的是Byte

  kafka:bootstrap-servers: youKafkaIp:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔listener:type: batchconsumer:enable-auto-commit: falsevalue-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializerkey-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializerauto-offset-reset: earliestgroup-id: consumer-sbproducer:value-serializer: org.apache.kafka.common.serialization.StringSerializerkey-serializer: org.apache.kafka.common.serialization.StringSerializer

收消息CODE

KafkaConfig.java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;import java.util.HashMap;
import java.util.Map;/*** @author laity*/
@EnableKafka
@Configuration
public class KafkaConfig {// 解决 Could not create message listener - MessageHandlerMethodFactory not set  TODO:WWS 不好使/*@Beanpublic KafkaListenerAnnotationBeanPostProcessor kafkaListenerAnnotationBeanPostProcessor() {KafkaListenerAnnotationBeanPostProcessor processor = new KafkaListenerAnnotationBeanPostProcessor();processor.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());return processor;}*/@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> map = new HashMap<>();map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "youKafkaIp:9092");map.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-laity");map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());return new DefaultKafkaConsumerFactory<String, String>(map);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(5);// new DefaultMessageHandlerMethodFactory()return factory;}// implements KafkaListenerConfigurer + 解决 Could not create message listener - MessageHandlerMethodFactory not set/*@Overridepublic void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {registrar.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());}*/
}

KafkaListenerController.java

package cn.iocoder.yudao.server.controller.admin.szbl;import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.server.controller.admin.szbl.common.config.kafka.MyComponent;
import cn.iocoder.yudao.server.controller.admin.szbl.vo.InitSceneVO;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.security.PermitAll;/*** @author laity*/
@RestController
@RequestMapping("/kafka")
public class KafkaListenerController {private final MyComponent component;public KafkaListenerController(MyComponent component) {this.component = component;}private String topic;// 用于接收导条分发数据接口@PostMapping("/reception")@PermitAllpublic CommonResult<Boolean> putAwayL(@RequestBody InitSceneVO vo) {// …… 业务逻辑// 去执行 监听固定的topiccomponent.startListening(vo.getGzTopicName());return CommonResult.success(true);}
}

DynamicKafkaListenerService.java

import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.stereotype.Service;import java.lang.reflect.Method;
import java.time.LocalDateTime;
import java.util.Objects;/*** @author laity 动态管理Kafka监听器*/
@Service
public class DynamicKafkaListenerService {private final KafkaListenerEndpointRegistry registry;private final ConcurrentKafkaListenerContainerFactory<String, String> factory;@Autowiredpublic DynamicKafkaListenerService(KafkaListenerEndpointRegistry registry, ConcurrentKafkaListenerContainerFactory<String, String> factory) {this.registry = registry;this.factory = factory;}public void addListener(String topic, String groupId, Object bean, Method method) {if (AopUtils.isAopProxy(bean)) {try {bean = ((Advised) bean).getTargetSource().getTarget();} catch (Exception e) {throw new RuntimeException(e);}}MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();assert bean != null;endpoint.setBean(bean);endpoint.setMethod(method);endpoint.setTopics(topic);endpoint.setGroup(groupId);endpoint.setId(method.getName() + "_" + LocalDateTime.now());endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory()); // 之前怎么点都点不出来这个属性 突然又出来了……无语registry.registerListenerContainer(endpoint, factory, true); // 指定容器工厂}public void removeListener(String beanName) {// 断言Objects.requireNonNull(registry.getListenerContainer(beanName)).stop();registry.unregisterListenerContainer(beanName);}
}

BlueKafkaConsumer.java

import org.springframework.stereotype.Component;/*** @author laity*/
@Component
public class BlueKafkaConsumer {// @KafkaListener(topics = "#{__listener.getTopicName()}", groupId = "consumer-laity")public void listen(Object record) {System.out.println("======================= 接收动态KafkaTopics Received message ========================");System.out.println(record.toString());}}

MyComponent.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.lang.reflect.Method;/*** @author laity*/
@Component
public class MyComponent {private final DynamicKafkaListenerService kafkaListenerService;private final BlueKafkaConsumer blueKafkaConsumer;@Autowiredpublic MyComponent(DynamicKafkaListenerService kafkaListenerService, BlueKafkaConsumer blueKafkaConsumer) {this.kafkaListenerService = kafkaListenerService;this.blueKafkaConsumer = blueKafkaConsumer;}public void startListening(String topic) {try {Method blueMethod = BlueKafkaConsumer.class.getMethod("listen", Object.class);kafkaListenerService.addListener(topic, "consumer-laity", blueKafkaConsumer, blueMethod);} catch (NoSuchMethodException e) {throw new RuntimeException(e);}}public void stopListening(String beanName) {kafkaListenerService.removeListener(beanName);}// init@PostConstruct // 这个是服务启动时调用 但我想要的时实时可变的public void init() {}}

世界上最可贵的两个词,一个叫认真,一个叫坚持,认真的人改变自己,坚持的人改变命运,有些事情不是看到了希望才去坚持,而是坚持了才有希望。我是Laity,正在前进的Laity。

http://www.dtcms.com/a/294783.html

相关文章:

  • 元图CAD:高效分割图纸的智能解决方案
  • CSP-J系列【2024】P11230 [CSP-J 2024] 接龙题解
  • 数据持久化--PlayerPrefs
  • GRE实验
  • ROS是什么?
  • 力扣面试150(39/150)
  • PyTorch中的词嵌入层(nn.Embedding)详解与实践指南
  • js生成器
  • 【黑产大数据】2025年上半年互联网黑灰产趋势年度总结
  • MySQL 8.0 OCP 1Z0-908 题目解析(33)
  • 【硬件】Fan in和Fan out
  • 《地牢探险者:深渊回响》(C++游戏,爆肝7.8个小时,全文14591字)(求一个免费的三连)
  • c# sqlsuger 获取关联表中一个值
  • SET: Spectral Enhancement for Tiny Object Detection(CVPR2025)
  • iOS WebView 远程调试实战 解决表单输入被键盘遮挡和焦点丢失问题
  • VRRP技术-设备备份技术
  • 5️⃣ set(集合)速查表
  • UI测试平台TestComplete:高效覆盖风险,加速持续交付
  • 情况:后端涉及到异步操作,数据还没更新完就直接向前端返回success的结果。
  • 如何做好性能测试?
  • 自定义类型:结构体,联合和枚举
  • 慢 SQL接口性能优化实战
  • 线路板快板厂家有哪些?
  • HCIA复习+网络类型及数据链路层协议复习
  • 【已解决】ubuntu执行sudo apt update镜像源更新失败:404 Not Found 没有 Release 文件。
  • 2025暑期—04环境设置-D2L+Jupyter
  • Java应用程序内存占用分析
  • Android root和完整性检测实现方案深度分析
  • 第三章 浏览器 【1. 文档对象模型(DOM)】
  • Pandas库全面学习指南(一)