当前位置: 首页 > wzjs >正文

郑州cms建站模板电子商务网站建设与维护实训报告

郑州cms建站模板,电子商务网站建设与维护实训报告,找网络公司做网站需要注意的,博物馆网站建设必要性Kafka consumer_offsets 主题深度剖析 在 Apache Kafka 的消息消费机制中,确保消息被可靠消费是一个核心问题。为了解决这个问题,Kafka 设计了一个特殊的内部主题 consumer_offsets,用于跟踪和管理消费者组的消费进度。 consumer_offsets 的…

Kafka consumer_offsets 主题深度剖析

在 Apache Kafka 的消息消费机制中,确保消息被可靠消费是一个核心问题。为了解决这个问题,Kafka 设计了一个特殊的内部主题 consumer_offsets,用于跟踪和管理消费者组的消费进度。

consumer_offsets 的基本概念

consumer_offsets 是 Kafka 的一个内部主题,它具有以下特征:

  1. 默认包含 50 个分区(可通过 offsets.topic.num.partitions 配置)
  2. 使用 3 个副本因子(可通过 offsets.topic.replication.factor 配置)
  3. 采用日志压缩(log compaction)的清理策略
  4. 消息格式为二进制的键值对

这个主题存储了所有消费者组的位移信息。每个消费者组消费某个主题分区时,都会定期将自己的消费位置(offset)提交到这个主题中。当消费者重启或发生再平衡时,可以从这个主题中恢复之前的消费位置,确保消息不会丢失或重复消费。

通过代码来演示如何实现消费者位移的提交和管理:

public class ConsumerOffsetDemo {private final KafkaConsumer<String, String> consumer;private final String topic;private final String groupId;public ConsumerOffsetDemo(String bootstrapServers, String topic, String groupId) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 关闭自动提交,手动控制位移提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");this.consumer = new KafkaConsumer<>(props);this.topic = topic;this.groupId = groupId;}public void consumeAndCommit() {try {consumer.subscribe(Collections.singletonList(topic));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理消息processRecord(record);// 手动提交单条消息的位移Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));consumer.commitSync(offsets);}}} finally {consumer.close();}}
}

位移提交机制

位移提交是 consumer_offsets 主题的核心功能。当消费者消费消息时,需要定期将自己的消费进度提交到这个主题。提交的消息包含以下信息:

  1. key:包含 <消费者组ID, 主题名称, 分区号> 的三元组
  2. value:包含 offset(位移)、timestamp(时间戳)等信息

提交方式分为自动提交和手动提交:

  1. 自动提交:由消费者自动定期提交,通过 auto.commit.interval.ms 配置提交间隔
  2. 手动提交:由应用程序控制提交时机,可以选择同步提交或异步提交

下面是一个完整的位移监控实现:

public class OffsetMonitor {private final AdminClient adminClient;private final KafkaConsumer<byte[], byte[]> consumer;public OffsetMonitor(String bootstrapServers) {Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);this.adminClient = AdminClient.create(props);props.put(ConsumerConfig.GROUP_ID_CONFIG, "offset-monitor");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());this.consumer = new KafkaConsumer<>(props);}public Map<String, ConsumerGroupOffset> getConsumerGroupOffsets(String groupId) {Map<String, ConsumerGroupOffset> result = new HashMap<>();try {// 获取消费者组的位移信息ListConsumerGroupOffsetsResult offsetsResult = adminClient.listConsumerGroupOffsets(groupId);Map<TopicPartition, OffsetAndMetadata> offsets = offsetsResult.partitionsToOffsetAndMetadata().get();// 获取主题的结束位移Map<TopicPartition, Long> endOffsets = consumer.endOffsets(offsets.keySet());// 计算消费延迟for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {TopicPartition tp = entry.getKey();long committedOffset = entry.getValue().offset();long endOffset = endOffsets.get(tp);long lag = endOffset - committedOffset;result.put(tp.topic(), new ConsumerGroupOffset(committedOffset, endOffset, lag));}} catch (Exception e) {e.printStackTrace();}return result;}
}

位移管理和运维

在实际运维中,我们需要对 consumer_offsets 主题进行管理和监控。主要包括以下几个方面:

  1. 位移重置:当需要重新消费某个主题的消息时,可以重置消费者组的位移
  2. 消费者组管理:包括删除不再使用的消费者组等操作
  3. 监控告警:监控消费延迟,及时发现消费异常

下面是一个位移管理工具的实现:

public class OffsetManager {private final AdminClient adminClient;public OffsetManager(String bootstrapServers) {Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);this.adminClient = AdminClient.create(props);}// 重置消费者组位移public void resetOffset(String groupId, String topic, int partition, long offset) {try {TopicPartition tp = new TopicPartition(topic, partition);Map<TopicPartition, OffsetAndMetadata> offsetMap = Collections.singletonMap(tp, new OffsetAndMetadata(offset));adminClient.alterConsumerGroupOffsets(groupId, offsetMap).all().get();System.out.printf("Successfully reset offset for group=%s, topic=%s, " +"partition=%d to %d%n",groupId, topic, partition, offset);} catch (Exception e) {e.printStackTrace();}}// 删除消费者组public void deleteConsumerGroup(String groupId) {try {adminClient.deleteConsumerGroups(Collections.singleton(groupId)).all().get();System.out.printf("Successfully deleted consumer group: %s%n", groupId);} catch (Exception e) {e.printStackTrace();}}// 监控消费延迟public void monitorConsumerLag(String groupId, String topic) {try {TopicPartition tp = new TopicPartition(topic, 0);Map<TopicPartition, OffsetAndMetadata> offsetMap = adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();long currentOffset = offsetMap.get(tp).offset();long endOffset = getEndOffset(tp);long lag = endOffset - currentOffset;if (lag > 10000) { // 设置告警阈值System.out.printf("Warning: High lag detected for group=%s, topic=%s: %d%n",groupId, topic, lag);}} catch (Exception e) {e.printStackTrace();}}private long getEndOffset(TopicPartition tp) {try (KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(new Properties())) {Map<TopicPartition, Long> endOffsets = consumer.endOffsets(Collections.singleton(tp));return endOffsets.get(tp);}}
}

consumer_offsets 主题是 Kafka 消息消费机制的核心组件,它通过存储和管理消费位移信息,确保了消息消费的可靠性和可恢复性。


文章转载自:

http://T8kpXmMm.kqbjy.cn
http://IQWB6v4E.kqbjy.cn
http://uLklHJMP.kqbjy.cn
http://tUB6bski.kqbjy.cn
http://sB8Wh0R9.kqbjy.cn
http://EfeQqirC.kqbjy.cn
http://pewj6PQ2.kqbjy.cn
http://DEmwfuXK.kqbjy.cn
http://Jhkrm34y.kqbjy.cn
http://r3hK2KO4.kqbjy.cn
http://QUL3unCJ.kqbjy.cn
http://UexZIkp9.kqbjy.cn
http://C1YVLsgD.kqbjy.cn
http://ISpA8JNp.kqbjy.cn
http://dF1pTJvm.kqbjy.cn
http://RK0VOTd2.kqbjy.cn
http://2ulgesG9.kqbjy.cn
http://znx3MKCJ.kqbjy.cn
http://uw2080ci.kqbjy.cn
http://nmJeoFks.kqbjy.cn
http://jgdxlOF8.kqbjy.cn
http://p27zo91p.kqbjy.cn
http://VFkvveiY.kqbjy.cn
http://kg0yiqFk.kqbjy.cn
http://XR9QRGlK.kqbjy.cn
http://wR6aFD9o.kqbjy.cn
http://IFB8QHid.kqbjy.cn
http://ZppymmIT.kqbjy.cn
http://QTVkbapp.kqbjy.cn
http://G5hOcdWg.kqbjy.cn
http://www.dtcms.com/wzjs/654444.html

相关文章:

  • 做网站可以用ai做做试题网站
  • wordpress修改为中文自学seo能找到工作吗
  • Wordpress外贸网站搭建公司广州市城乡和住房建设局官网
  • 购物网站app摄影作品网站app十大排名
  • 怎么自己做直播网站吗本机可以做网站的服务器吗
  • 网站优化 价格查询网站模板制作视频教程
  • 个体户能否从事网站建设试描述一下网站建设的基本流程图
  • 网站配置服务Wordpresswordpress批量定时更新
  • 抖音seo排名优化公司seo外链建设的方法有
  • 在南海建设工程交易中心网站有哪些小程序免费模板平台
  • 做网站需要代码么网上营销的平台有哪些
  • 做cpa必须要有网站吗wordpress外网ip访问
  • 做软件的网站php免费信息网站建设平台
  • 学校做网站的软件wordpress入门建站教程
  • 在网站上显示地图加强主流网站建设
  • 免费网站推广平台排行榜企业年检网上申报入口
  • 做信息浏览的网站策划案北京公司注册虚拟地址
  • 在线做原型的网站郑州网站制作建设
  • 公司网站开发维护网站模板出售
  • 从零开始做电影网站急招网络销售招聘
  • 哪里有网站建设哪家好企业信用信息查询公示系统陕西
  • 要做网站房屋租赁网站开发模版
  • flash 做网站教程淘宝网网页设计作业
  • 大岭山网站建设北京的制作网站的公司
  • 南京网页网站制作国外网站设计案例
  • 网站怎么做订单邯郸注册网络科技公司
  • 网站开发的可行性分析cpa广告联盟网站建设教程
  • 网站建设费用什么意思龙门城乡规划建设局网站
  • 什么是网站和网页谷歌搜索引擎网址
  • 网站发语音功能如何做西安网站制作模板