当前位置: 首页 > news >正文

kafka学习笔记(三、消费者Consumer使用教程——消费性能多线程提升思考)

在这里插入图片描述


1.简介

KafkaConsumer是非线程安全的,它定义了一个acquire()方法来检测当前是否只有一个线程在操作,如不是则会抛出ConcurrentModifcationException异常。

acquire()可以看做是一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire()方法和release()方法成对出现,表示相应的加锁操作和解锁操作。

KafkaConsumer虽然是单线程的执行方式,但是在某些情况下如:生产者发送消息的速度远大于消费者消费的速度,这样长时间可能会造成消息的丢失,此时我们就需要消费者采用多线程消费的方式增加消费速度。

2.多线程实现的方式

2.1.线程封闭多线程

即为每个线程实例化一个KafkaConsumer,如图所示,一个线程对应一个KafkaConsumer实例,所有的消费线程都属于同一个消费者组。

这种方式的并发度受限分区的实际个数

在这里插入图片描述
实现代码示例:

public class kafkaConsumer1 {public void ConsuermMultithread1() {Properties props = initConsifg(); // 此处初始化消费者配置参数省略int consumerThreadNum = 5;for (int i = 0; i < consumerThreadNum; i++) {new KafkaConsumerThread(props, topic).start();}}// 消费线程public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;public KafkaConsumerThread(Properties prop, String topic) {this.kafkaConsumer = new KafkaConsumer<>(prop);this.kafkaConsumer.subscribe(Arrays.asList(topic));}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record: records) {// 处理消息}}} catch (Exception e) {e.printStackTrace();} finally {kafkaConsumer.close();}}}
}

2.1.消息处理模块多线程

此方法是对上面方法的进一步优化,在消息处理模块增加多线程来处理消息,进一步提升消息消费的速度。
在这里插入图片描述

public class kafkaConsumer1 {public void ConsuermMultithread1() {Properties props = initConsifg(); // 此处初始化消费者配置参数省略int consumerThreadNum = 5;for (int i = 0; i < consumerThreadNum; i++) {new KafkaConsumerThread(props, topic).start();}}public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;private ExecutorService executorService;private int threadNumber;public KafkaConsumerThread(Properties prop, String topic, int threadNumber) {this.kafkaConsumer = new KafkaConsumer<>(prop);this.kafkaConsumer.subscribe(Collections.singletonList(topic));this.threadNumber = threadNumber;executorService = new ThreadPoolExecutor(threadNumber, threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {executorService.submit(new RecordsHandler(records));}}} catch (Exception e) {e.printStackTrace();} finally {kafkaConsumer.close();}}}public static class RecordsHandler extends Thread {public final ConsumerRecords<String, String> records;public RecordsHandler(ConsumerRecords<String, String> records) {this.records = records;}@Overridepublic void run() {/// 处理records  }}
}

此方法需要引入一个共享的offsets来参与提交。

相关文章:

  • 使用 HTML + JavaScript 实现一个日历任务管理系统
  • 乐观锁:高效并发无锁方案
  • SpringBoot如何实现一个自定义Starter?
  • 华为云Flexus+DeepSeek征文|华为云 Flexus X 加速 Dify 平台落地:高性能、低成本、强可靠性的云上选择
  • 第304个Vulnhub靶场演练攻略:digital world.local:FALL
  • springboot集成websocket给前端推送消息
  • 生活小记啊
  • AWTK 嵌入式Linux平台实现多点触控缩放旋转以及触点丢点问题解决
  • 计算机视觉---GT(ground truth)
  • 每日八股文5.31
  • 【2025年软考中级】第二章2.2 程序设计语言的基本成分
  • VIP》》IP地址漂移
  • 5G 网络中的双向认证机制解析
  • MIT 6.S081 2020 Lab6 Copy-on-Write Fork for xv6 个人全流程
  • 神奇的平方和运算
  • MySQL存储架构深度解析:从引擎选型到云原生实践(2025最新版)
  • 005 flutter基础,初始文件讲解(4)
  • threejs渲染器和前端UI界面
  • JVM类加载高阶实战:从双亲委派到弹性架构的设计进化
  • 【机器学习基础】机器学习入门核心算法:XGBoost 和 LightGBM
  • 做问卷的网站有那些/拼多多代运营公司十大排名
  • 网站建设主要做什么/seo网站培训优化怎么做
  • 做电子简历的网站/网络推广方式方法
  • 大流量网站解决访问量/免费网络项目资源网
  • 网站建设北京/谷歌搜索引擎网页版入口
  • 滑动门代码 wordpress/丈哥seo博客工具