当前位置: 首页 > news >正文

Kafka使用-Consumer

消费者

主要步骤

Kafka消费者的使用流程主要包括配置消费者、订阅主题、拉取消息、处理消息和提交偏移量等关键步骤。

  1. 消费者配置与初始化‌
    必须配置bootstrap.servers(Kafka集群地址)、group.id(消费者组ID)和序列化器推荐使用手动提交偏移量以保证消息处理的可靠性‌

    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092");
    consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
    consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
    consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    
  2. 主题订阅‌
    支持订阅特定主题列表或使用正则表达式匹配多个主题。新主题匹配正则表达式时会触发再均衡。

    consumer.subscribe(Collections.singletonList("first-topic"));
    
  3. 消息拉取与处理‌
    通过poll()方法批量拉取消息。在获取消息前可以使用自定义拦截器对消息进行处理。在消息处理完成后手动提交偏移量

    ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
    
  4. 消费者组管理‌
    同一消费者组内的消费者共享主题分区。消费者数量变化或分区数变化时会触发再均衡。

    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "qichsiii");
    

自定义拦截器

public class MyConsumerInceptor implements ConsumerInterceptor<Integer, String> {@Overridepublic ConsumerRecords<Integer, String> onConsume(ConsumerRecords<Integer, String> consumerRecords) {Iterator<ConsumerRecord<Integer, String>> iterator = consumerRecords.iterator();ConsumerRecords<Integer,String> cr = new ConsumerRecords<>(Collections.EMPTY_MAP);while(iterator.hasNext()){ConsumerRecord<Integer, String> record = iterator.next();if(record.key()%2==0){System.out.println("拦截器偶数key:"+record.key()+",value"+record.value());}}return consumerRecords;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}
public class KafkaConsumerTest {public static void main(String[] args) throws InterruptedException {// 创建配置对象Map<String, Object> consumerConfig = new HashMap<>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092");consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//        consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);consumerConfig.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInceptor.class.getName());consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");// 创建消费者对象KafkaConsumer<String,String> consumer = new KafkaConsumer<>(consumerConfig);// 订阅主题consumer.subscribe(Collections.singletonList("first-topic"));while(true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {System.out.println("接收数据"+stringStringConsumerRecord);}Thread.sleep(1000L);}}
}

提交偏移量

kafka默认自动提交偏移量

  • 消息重复消费风险
    当enable.auto.commit=true时,默认每5秒(由auto.commit.interval.ms控制)提交一次偏移量‌。若在两次提交间隔内发生再均衡,新消费者会从上次提交的偏移量开始消费,导致已处理但未提交的消息被重复处理‌。例如:提交间隔5秒,第3秒发生再均衡,则3秒内的消息可能被重复消费‌
  • 提交时机不可控
    自动提交基于时间间隔而非消息处理完成状态,可能提交未处理完的偏移量。若消费者在提交后崩溃,未处理的消息会被跳过(丢失)‌。

解决方法

  • 手动提交‌:关闭自动提交(enable.auto.commit=false),在消息处理完成后调用commitSync()(同步)或commitAsync()(异步)‌。
  • 幂等处理‌:业务逻辑中实现去重机制(如数据库唯一键校验)‌。

例子:

public class KafkaConsumerAutoOffsetTest {public static void main(String[] args) {// 创建配置对象Map<String, Object> consumerConfig = new HashMap<>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop101:9092");consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//        consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 创建消费者对象KafkaConsumer<String,String> consumer = new KafkaConsumer<>(consumerConfig);// 订阅主题consumer.subscribe(Collections.singletonList("first-topic"));while(true) {ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> stringStringConsumerRecord : poll) {System.out.println("接收数据"+stringStringConsumerRecord);}// TODO 手动提交偏移量// TODO 拉取之后,还没处理数据就提交了。数据处理发生问题。出现露消费问题。
//            consumer.commitAsync();//异步consumer.commitSync();// 同步}}
}

分区分配策略

通过 partition.assignment.strategy 配置分区分配策略:
RangeAssignor(默认):按分区范围分配。
RoundRobinAssignor:轮询分配分区。
StickyAssignor:尽量减少分区重新分配时的变动。

consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());
http://www.dtcms.com/a/545593.html

相关文章:

  • 诸暨网站建设怎么建立微网站?
  • 【Docker】【1.docker常用命令总结】
  • 深圳的网站建设公司的外文名是wordpress异步加载
  • 创客匠人2025万人高峰论坛:如何融合创始人IP与AI?
  • Linux中完成根文件系统的最终准备和切换prepare_namespace函数的实现
  • A800 部署 Qwen2-VL-8B-Instruct 完整指南
  • rust:第一个程序HelloWorld
  • 给新公司建网站用代理访问永久域名
  • 启动Hana失败 FAIL: process hdbdaemon HDB Daemon not running
  • iOS 26 内存占用监控 多工具协同下的性能稳定性分析实战
  • Kubernetes service管理
  • 布吉企业网站建设百度网站两两学一做心得体会
  • 深入仓颉(Cangjie)编程语言:循环的革命——从“命令式”操控到“声明式”安全迭代
  • 画出网站和目录结构图wordpress 自定义表
  • linux gpio errno EBUSY问题举例分析
  • 如何在 macOS 中清理 Homebrew 软件包 ?
  • 手机网站设计立找亿企邦湖南长沙房价2023年最新房价
  • 如何选择徐州网站开发wordpress新建页面位置
  • nestjs引篇
  • apmserv 设置网站目录yy头像在线制作网站
  • 基于YOLO+多模态大模型+人脸识别+视频检索的智慧公安综合研判平台(vue+flask+AI算法)
  • 二手车网站程序德阳网站建设 选哪家好
  • 极智算服务器用的还是自己的网络吗安全吗
  • Jenkins vs GitLab CI/CD vs Arbess,CI/CD工具一文纵评
  • 机器学习—— 回归分析之如何建立回归模型
  • MySQL中SUBSTRING_INDEX函数作用
  • 网站设计评级杭州网站搜索排名
  • 2.1 AI与大模型介绍
  • 厦门网站建设阿里流量型网站 cms
  • 【笔试真题】- 科大讯飞研发岗-2025.09.27