当前位置: 首页 > wzjs >正文

喷泉网站哪里做因酷网站建设

喷泉网站哪里做,因酷网站建设,phpcms 还有人用吗,天津营销型网站建设费用1.简介 Kafka的poll()方法消费无法精准的掌握其消费的起始位置,auto.offset.reset参数也只能在比较粗粒度的指定消费方式。更细粒度的消费方式kafka提供了seek()方法可以指定位移消费允许消费者从特定位置(如固定偏移量、时间戳或分区首尾)开…

在这里插入图片描述


1.简介

Kafka的poll()方法消费无法精准的掌握其消费的起始位置,auto.offset.reset参数也只能在比较粗粒度的指定消费方式。更细粒度的消费方式kafka提供了seek()方法可以指定位移消费允许消费者从特定位置(如固定偏移量、时间戳或分区首尾)开始消费消息。

2.指定消费位置

2.1.从特定偏移量开始消费

使用seek(TopicPartition partition, long offset)指定具体偏移量。

源码分析:

  • seek()方法更新消费者内部的subscriptions对象的position字段,记录目标偏移量。
  • 后续poll()时,Fetcher类根据此位置向Broker发送拉取请求。

代码示例:

consumer.subscribe(Collections.singleton("test-topic"));
Set<TopicPartition> assignment = new HashSet<>();
// 确保分配到分区
while (assignment.isEmpty()) {consumer.poll(Duration.ofMillis(100));assignment = consumer.assignment();
}
// 设置所有分区从offset=100开始消费
assignment.forEach(tp -> consumer.seek(tp, 100));

2.2.从时间戳开始消费

使用offsetsForTimes()获取时间戳对应的偏移量,再调用seek()

源码分析:

offsetsForTimes()向Broker发送ListOffsetRequest,查询满足时间戳条件的最早或最新偏移量。

代码实例:

Map<TopicPartition, Long> timestamps = assignment.stream().collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 24 * 3600 * 1000L));
// 获取24小时前的偏移量
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
offsets.forEach((tp, offsetAndTs) -> {if (offsetAndTs != null) consumer.seek(tp, offsetAndTs.offset());
});

2.3.从分区首尾消费

使用seekToBeginning()seekToEnd(),或通过beginningOffsets()/endOffsets()获取首尾偏移量后手动设置。

代码实例:

// 从分区末尾开始消费(等效于auto.offset.reset=latest)
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
assignment.forEach(tp -> consumer.seek(tp, endOffsets.get(tp)));

2.4.注意事项

  1. 分区分配与poll()的依赖
    seek()必须在分区分配完成后调用,否则会抛出IllegalStateException。需通过循环poll()确保分配到分区。

  2. 数据过期问题
    若指定偏移量对应的消息已被删除(如日志清理导致),seek()将失效。此时需使用beginningOffsets()获取当前最小有效偏移量。

  3. 异步提交与位移覆盖风险
    异步提交(commitAsync())失败时不会重试,可能因位移回滚导致重复消费。需结合同步提交(commitSync())保证原子性

  4. seek()方法提供了我们可以将消费者位移保存在外部的能力,还可以配合在均衡监听器来提供更加精准的消费能力。

3.完整代码实例

public class SeekToTimestampDemo {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "seek-demo");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("enable.auto.commit", "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singleton("test-topic"));// 等待分区分配Set<TopicPartition> assignment = new HashSet<>();while (assignment.isEmpty()) {consumer.poll(Duration.ofMillis(100));assignment = consumer.assignment();}// 获取24小时前的时间戳对应偏移量Map<TopicPartition, Long> timestamps = assignment.stream().collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 86400000L));Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);// 指定位移offsets.forEach((tp, offsetAndTs) -> {if (offsetAndTs != null) {consumer.seek(tp, offsetAndTs.offset());} else {// 处理无有效偏移量的情况(如从头开始)consumer.seekToBeginning(Collections.singleton(tp));}});while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));records.forEach(record -> System.out.printf("offset=%d, value=%s%n", record.offset(), record.value()));}}
}

文章转载自:

http://hlrclMrd.cnLmp.cn
http://h4LbaMn4.cnLmp.cn
http://BmUxeitJ.cnLmp.cn
http://tqEwqDcU.cnLmp.cn
http://5UtNzrk9.cnLmp.cn
http://NwZtcfA7.cnLmp.cn
http://KUH1BUSC.cnLmp.cn
http://DkmeJU7Z.cnLmp.cn
http://J3CHc1B2.cnLmp.cn
http://HaoUQX0z.cnLmp.cn
http://BSgQOv5M.cnLmp.cn
http://8MsbOtBv.cnLmp.cn
http://NbVmygnV.cnLmp.cn
http://MiRDts8S.cnLmp.cn
http://H8hPP2Kb.cnLmp.cn
http://C7iweiSf.cnLmp.cn
http://kAaZh4hU.cnLmp.cn
http://WO9ElFRi.cnLmp.cn
http://CnM6E9bI.cnLmp.cn
http://siqElQsf.cnLmp.cn
http://siTI8YsX.cnLmp.cn
http://Sed8ZCUw.cnLmp.cn
http://aH5emmTU.cnLmp.cn
http://VAMo2y1p.cnLmp.cn
http://1k7irBGq.cnLmp.cn
http://2H0OuVLl.cnLmp.cn
http://LZ3BXQYU.cnLmp.cn
http://VmXCp3Hf.cnLmp.cn
http://9z5WrJNm.cnLmp.cn
http://VmuvqGoR.cnLmp.cn
http://www.dtcms.com/wzjs/628948.html

相关文章:

  • 做网站认证违法吗wordpress 网站卡
  • 长沙网站建设哪家公司好相亲网站怎么做
  • 如何网上建设网站wordpress 数据库 备份
  • 做的网站里面显示乱码怎么解决ps个人主页设计模板
  • 网站设计步骤网站排名所以关键词下降
  • 营销型网站四大功能数字营销1+x网站
  • 电影网站开发任务书企业网站建设要
  • 绍兴网络公司网站建设我有网站 怎么做淘宝推广
  • 贵阳网站建设公司软件开发项目报价模板
  • 烟台哪家公司可以做网站山东住房与城乡建设网站
  • 工业和信息网站备案管理系统seo程序
  • 学校网站建设的意见百度广告联盟官网
  • 专门做婚姻法的网站买网站
  • 做消费信贷网站价格wordpress投稿 图片
  • 网站规划可以分成哪几步申请域名流程后怎样做网站
  • 理财公司网站建设方案php软件安装
  • 网站备案归哪里管天猫电商平台
  • wordpress建站详细教程视频做网站为什么要做备案接入
  • AAP网站开发需要多少钱做公众号一个月挣多少钱
  • 网站添加留言板功能十大舆情网站
  • 企业网站定制开发流程开一家网店
  • 优质网站建设公司哪家好深圳住房建设局官方网站
  • 越秀营销型网站产品单页网站
  • 挂机宝如何做网站企业管理培训课程培训机构
  • 建设人才库网站在线天堂おっさんとわたし
  • 衡水购物网站制作长沙推广软件
  • 做淘宝客网站哪个好用个人 申请域名做网站
  • 三亚h5网站定制开发公司建立一个个人介绍网站
  • ii6创建网站武夷山市建设局网站
  • 宁波网络建站模板广州网页设计html