当前位置: 首页 > wzjs >正文

网站制作网页设计怎么做网络广告推广

网站制作网页设计,怎么做网络广告推广,北京今日新闻发布会直播,扬中新闻网视频KafkaListener 是 Spring Kafka 提供的一个核心注解,用于标记一个方法作为 Kafka 消息的消费者。下面是对该注解的详细解析: 基本用法 KafkaListener(topics "myTopic", groupId "myGroup") public void listen(String message)…

@KafkaListener 是 Spring Kafka 提供的一个核心注解,用于标记一个方法作为 Kafka 消息的消费者。下面是对该注解的详细解析:

基本用法

@KafkaListener(topics = "myTopic", groupId = "myGroup")
public void listen(String message) {System.out.println("Received Message: " + message);
}

主要属性

1. 必需属性

  • topics / topicPattern:指定监听的 topic
    • topics:逗号分隔的 topic 列表
    • `topicPattern**:使用正则表达式匹配 topic
@KafkaListener(topics = "topic1,topic2")
// 或
@KafkaListener(topicPattern = "test.*")

2. 消费者配置

  • groupId:指定消费者组 ID
  • containerFactory:指定使用的 KafkaListenerContainerFactory
@KafkaListener(topics = "myTopic", groupId = "myGroup", containerFactory = "myFactory")

3. 消息处理

  • id:为监听器指定唯一 ID
  • concurrency:设置并发消费者数量
@KafkaListener(id = "myListener", topics = "myTopic", concurrency = "3")

4. 高级配置

  • containerGroup:指定容器组(Spring Kafka 2.5+)
  • errorHandler:指定错误处理器
  • idIsGroup:是否使用监听器 ID 作为组 ID(默认 false)

消息处理方法签名

监听器方法可以接受多种形式的参数:

  1. 简单消息处理

    @KafkaListener(topics = "myTopic")
    public void listen(String message) { ... }
    
  2. 带元数据的消息处理

    @KafkaListener(topics = "myTopic")
    public void listen(ConsumerRecord<?, ?> record) { ... }
    
  3. 批量消息处理

    @KafkaListener(topics = "myTopic")
    public void listen(List<String> messages) { ... }
    
  4. 带确认的消息处理

    @KafkaListener(topics = "myTopic")
    public void listen(String message, Acknowledgment ack) {// 处理消息后手动确认ack.acknowledge();
    }
    

配置选项

可以通过 @KafkaListenercontainerFactory 属性引用自定义配置:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> myFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;
}@KafkaListener(topics = "myTopic", containerFactory = "myFactory")
public void listen(String message) { ... }

错误处理

可以通过以下方式处理错误:

  1. 配置错误处理器

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setErrorHandler(new SeekToCurrentErrorHandler());return factory;
    }
    
  2. 使用 @SendTo 发送到死信队列

    @KafkaListener(topics = "myTopic", groupId = "myGroup")
    @SendTo("myDltTopic")
    public String listen(String message) {// 处理失败时返回错误消息return "error";
    }
    

注意事项

  1. 监听器方法应该是 public 的
  2. 避免在监听器方法中执行长时间运行的操作
  3. 考虑消息处理的幂等性
  4. 对于批量处理,确保方法参数是 List 类型
  5. 在 Spring Boot 中,许多配置可以通过 application.properties/yml 设置

完整示例

@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return factory;}
}@Service
public class KafkaMessageListener {@KafkaListener(topics = "myTopic", groupId = "myGroup", containerFactory = "kafkaListenerContainerFactory")public void listen(String message, Acknowledgment ack) {try {System.out.println("Received Message: " + message);// 业务处理逻辑ack.acknowledge();} catch (Exception e) {// 错误处理}}
}

@KafkaListener 注解提供了灵活的方式来消费 Kafka 消息,开发者可以根据具体需求进行配置和扩展。

ConcurrentKafkaListenerContainerFactory详解

在Spring Kafka中,ConcurrentKafkaListenerContainerFactory是一个核心配置类,用于创建并发消息监听容器,支持多线程消费Kafka消息,以下是其详细介绍:

1、核心作用

  1. 并发消费支持:通过创建多个KafkaMessageListenerContainer实例(每个对应一个线程),实现多线程并发消费消息。例如设置concurrency=3会创建3个消费者线程,每个线程处理分配到的分区。
  2. 线程安全保障:生成的ConcurrentMessageListenerContainer内部委托给多个单线程的KafkaMessageListenerContainer实例,保证线程安全性(Kafka Consumer本身非线程安全)。

2、关键特性

  1. 并发度配置

    • 通过setConcurrency()方法设置并发消费者数量,可提高消息处理速度和吞吐量。
    • 配置规则为concurrency<=分区数/应用实例数,设置过多会导致线程闲置。
  2. 批量处理支持

    • 通过setBatchListener(true)启用批量消费
    • 配合MAX_POLL_RECORDS_CONFIG参数控制单次poll最大返回记录数
  3. 错误处理机制

    • 可配置自定义错误处理器(如SeekToCurrentErrorHandler
    • 支持重试策略集成
  4. 分区分配控制

    • 可自定义分区分配逻辑
    • 配合group.id实现消费者组协调

3、配置示例

@Configuration
@EnableKafka
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3); // 设置并发消费者数量factory.setBatchListener(true); // 启用批量消费factory.getContainerProperties().setPollTimeout(3000); // 设置轮询超时factory.setErrorHandler(new SeekToCurrentErrorHandler()); // 设置错误处理器return factory;}@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); // 批量消费配置return new DefaultKafkaConsumerFactory<>(props);}
}

4、使用场景

  1. 高吞吐量需求:通过增加并发消费者数量提升处理能力
  2. 批量数据处理:需要批量处理消息的场景
  3. 复杂错误处理:需要自定义错误处理逻辑的场景
  4. 多主题监听:需要同时监听多个主题的场景

5、注意事项

  1. 顺序性问题:并发消费可能导致消息顺序混乱,需业务保证
  2. 重复处理问题:需实现幂等性处理机制
  3. 数据库访问:需注意并发访问控制
  4. 资源限制:并发度设置需考虑系统资源限制

在这里插入图片描述

http://www.dtcms.com/wzjs/446691.html

相关文章:

  • 银川网站建站公司中山360推广
  • 做门户网站的公司有哪些seo综合查询爱站
  • 网站如何更新内容从哪里找网络推广公司
  • 建设网站费用主要包括哪些广州网络推广
  • 北京做养生SPA的网站建设需要多少钱
  • 区域门户网站源码自己在家怎么做跨境电商
  • 三级分销网站制作中山谷歌推广
  • 做物流网站电话培训机构优化
  • 免费建立小程序网站搜索引擎有哪些好用
  • 域名地址大全seo牛人
  • 那里可以找建网站的人软考十大最靠谱it培训机构
  • 上海做网站找谁温州seo网站推广
  • 优质的网站建设微博推广方式
  • 设计师推荐网站欣赏谷歌浏览器 免费下载
  • 山东天狐做网站cms网站交易网
  • 网页制作模板在哪买seo代码优化步骤
  • 做网站建设的价格如何用google搜索产品关键词
  • 搭建微信小程序需要什么seo入门到精通
  • 南阳建设网站招聘营销型网站建设哪家好
  • .tv可以做门户网站不江东怎样优化seo
  • 建设部网站园林绿化资质标准有什么平台可以发布推广信息
  • wordpress安装主题 ftp合作seo公司
  • 网站建设与管理用什么软件有哪些成都排名推广
  • 网站两边的悬浮框怎么做app推广拉新一手渠道
  • 网站建设 品牌塑造计划营销策划案例
  • 校园微网站建设b站推广入口2023mmm
  • 东莞网站(建设信科网络)seo推广培训班
  • 建设网站需要多少钱济南兴田德润o地址seo页面优化公司
  • 重庆蒲公英网站建设公司淘宝美工培训
  • 研究生网站建设交换友情链接的条件