当前位置: 首页 > news >正文

消费者API

目录

  • 独立消费者案例(订阅主题)
  • 独立消费者案例(订阅分区)
  • 消费者组案例

独立消费者案例(订阅主题)

在这里插入图片描述

package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("first");kafkaConsumer.subscribe(topics);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

独立消费者案例(订阅分区)

在这里插入图片描述

package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerPartition {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);ArrayList<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition("first",0));kafkaConsumer.assign(topicPartitions);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

消费者组案例

测试同一个主题的分区数据,只能由于一个消费者组中的一个一个消费

消费者1

package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("four");kafkaConsumer.subscribe(topics);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

消费者2

package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer1 {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("four");kafkaConsumer.subscribe(topics);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

消费者3

package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer2 {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("four");kafkaConsumer.subscribe(topics);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

三个消费者的组ID相同,会形成消费者组,每个消费者消费一个分区数据

生产者发送数据

package com.tsg.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();// 连接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092");// 指定对应的key和value的序列化类型properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置acksproperties.put(ProducerConfig.ACKS_CONFIG,"all");// 重试次数retries,默认是int最大值,2^31 - 1properties.put(ProducerConfig.RETRIES_CONFIG,3);properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.tsg.kafka.producer.MyPartitioner");// 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 发送数据for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<String, String>("four",2,"","分区2"), new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null){System.out.println("主题:" +metadata.topic() + " 分区: " +metadata.partition());}}});}// 关闭资源kafkaProducer.close();}
}
package com.tsg.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();// 连接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092");// 指定对应的key和value的序列化类型properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置acksproperties.put(ProducerConfig.ACKS_CONFIG,"all");// 重试次数retries,默认是int最大值,2^31 - 1properties.put(ProducerConfig.RETRIES_CONFIG,3);properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.tsg.kafka.producer.MyPartitioner");// 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 发送数据for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<String, String>("four",1,"","分区2"), new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null){System.out.println("主题:" +metadata.topic() + " 分区: " +metadata.partition());}}});}// 关闭资源kafkaProducer.close();}
}

```c
package com.tsg.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();// 连接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092");// 指定对应的key和value的序列化类型properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置acksproperties.put(ProducerConfig.ACKS_CONFIG,"all");// 重试次数retries,默认是int最大值,2^31 - 1properties.put(ProducerConfig.RETRIES_CONFIG,3);properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.tsg.kafka.producer.MyPartitioner");// 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 发送数据for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<String, String>("four",0,"","分区2"), new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null){System.out.println("主题:" +metadata.topic() + " 分区: " +metadata.partition());}}});}// 关闭资源kafkaProducer.close();}
}
http://www.dtcms.com/a/338939.html

相关文章:

  • 云电脑 vs 传统PC:全面对比3A游戏与AI训练的成本与性能
  • Leetcode 3654. Minimum Sum After Divisible Sum Deletions
  • 【多模态大模型的三化】
  • [PV]AXI R/W/RW带宽计算的tcl脚本
  • AI赋能商业数据分析:从海量数据挖掘到智能决策洞察,激活企业增长新动能
  • Redisson 分布式锁核心机制解析
  • Flink原理与实践:第一章大数据技术概述总结
  • 微软行业案例:英格兰足球超级联赛(Premier League)
  • 丝杆支撑座在自动化生产线中的关键支撑
  • arcgis-提取范围中最大占比面积的信息或唯一值
  • Jenkins服务器SSH公钥配置步骤
  • nodejs koa框架使用
  • 计算机大数据毕业设计推荐:基于Spark的气候疾病传播可视化分析系统【Hadoop、python、spark】
  • 《算法导论》第 30 章:多项式与快速傅里叶变换(FFT)
  • vue3中封装hooks
  • uniapp 应用未安装:软件包与现有软件包存在冲突
  • GEO 优化专家孟庆涛:技术破壁者重构 AI 时代搜索逻辑
  • Flask 路由与视图函数绑定机制
  • 正式签约 | OpenLoong 项目正式捐赠至开放原子开源基金会,成为全国首个具身智能方向孵化项目!
  • 【图像算法 - 18】慧眼辨良莠:基于深度学习与OpenCV的麦田杂草智能识别检测系统(附完整代码)
  • 哈希:两数之和
  • Mac(七)右键新建文件的救世主 iRightMouse
  • python将epub文件转pdf
  • UniApp 实现pdf上传和预览
  • 大模型级部署:从本地轻量化到云原生方案
  • 基于单片机智能密码锁/密码箱/门锁/门禁系统
  • Python爬虫实战:研究ICP-Checker,构建ICP 备案信息自动查询系统
  • PiscCode迅速集成YOLO-Pose 实现姿态关键点轨迹跟踪应用
  • 从繁琐到优雅:Java Lambda 表达式全解析与实战指南
  • 第1章 React组件开发基础