当前位置: 首页 > wzjs >正文

辽中网站建设高端网站公司

辽中网站建设,高端网站公司,成都电商网站开发公司,代做网站 猪八戒网位移提交: Kafka的每条消息都有唯一的 offset, 用来表示消息在分区中对应的位置。有的也称之为 “偏移量”。 消费者每次在 poll() 拉取消息,它要返回的是还没有消费过的消息集, 因此,需要记录上一次消费时的消费位…

位移提交:

Kafka的每条消息都有唯一的 offset, 用来表示消息在分区中对应的位置。有的也称之为 “偏移量”。

消费者每次在 poll() 拉取消息,它要返回的是还没有消费过的消息集,

因此,需要记录上一次消费时的消费位移,并且持久化。

消费者在消费完消息之后,需要执行消费位移的提交。

自动位移提交:

Kafka默认的消费位移的提交方式是 自动提交。

自动提交,由消费者客户端参数 enable.auto.commit 配置,默认值是 true。

默认的自动提交,是定期提交,提交的周期由 auto.commit.interval.ms 配置,默认是 5s。

自动位移提交,有可能会重复消费和消息丢失。

假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那又得从上一次位移提交的地方重新开始消费,这样就会重复消费。

手动位移提交:

手动位移提交,由消费者客户端参数 enable.auto.commit 配置, 设置为 false 就是手动位移提交。

手动位移提交,可以分为 同步提交、异步提交。

commitSync() 同步提交

同步提交,会阻塞消费者线程直到位移提交完成。

示例代码:

public class OffsetCommitSync {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//消费者订阅主题consumer.subscribe(Collections.singletonList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {//do something}//手动提交位移consumer.commitSync();System.out.println("手动提交位移成功.");}}public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//不自动提交,采用手动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);return props;}}

commitAsync() 异步提交 :

异步提交,在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交,可以使消费者的性能得到一定的增强。

异步提交,将 consumer.commitSync(); 换成 commitAsync。

如果还需要回调,就用 OffsetCommitCallback对象作为参数。

示例如下:

public class OffsetCommitAsyncCallback {public static final String BROKER_LIST = "localhost:9092";public static final String TOPIC = "myTopic1";public static final String GROUP_ID = "group.demo";public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {//do something}//异步回调,如果不需要回调,就采用无参的方法consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if (exception == null) {System.out.println(offsets);} else {log.error("fail to commit offsets {}", offsets, exception);}}});}}public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);return props;}}

参考资料:

《深入理解kafka:核心设计与实践原理》

http://www.dtcms.com/wzjs/163806.html

相关文章:

  • 焦作 网站建设电话营销外包公司
  • .php的网站是怎么做的网站建设是干嘛的
  • 济南网站建设平台官网今天的新闻发布会
  • 做本地网站需要什么资质上海发布最新情况
  • 湖南网站建设企业2022年最火的关键词
  • 网站项目团队介绍潍坊seo推广
  • 软件开发工程师有前途吗seo优化网络公司
  • 东莞市建设工程质量监督网站我赢seo
  • 郑州网站建设哪家搜索引擎竞价广告
  • 流程做网站苏州seo整站优化
  • 做网站要学什么c语言青岛seo整站优化哪家专业
  • 海外网站开发谷歌浏览器安卓版下载
  • 武汉 外贸网站建设办公软件速成培训班
  • 深圳模板网站建设哪家好购买链接平台
  • 武威市住房和城乡建设局网站优化的定义
  • 网站写文案网络营销竞价推广
  • 培训机构网站建设方案怎么制作自己的网站
  • 搭建网站的免费程序曼联vs曼联直播
  • 安徽鲲鹏建设集团有限公司网站seo建设者
  • 网络营销公长沙官网seo技术
  • 微信里面的小程序百度seo点击软件
  • 南京网站建设学习海外推广渠道
  • 怎么建设网站数据库个人网站的制作
  • 做网站要建立站点吗文山seo
  • 西宁高端网站建设站内优化怎么做
  • 宝山北京网站建设推广平台
  • 营销型网站建设实战感想优化电脑的软件有哪些
  • 广州商砼建站规范公众号seo排名软件
  • 网站建设费的摊销年限b站推广网站2024mmm
  • 光谷 网站建设公司搜狗输入法下载安装