当前位置: 首页 > news >正文

Kafka Queue: 完成 alterShareGroupOffsets Api

Share group支持重置偏移量

支持重置偏移量,换句话说也就是支持消息的重放;classic group支持按照偏移量或者时间重置到最新、最老以及指定时间的偏移量

PR list

  • https://github.com/apache/kafka/pull/18819
  • https://github.com/apache/kafka/pull/18929
  • https://github.com/apache/kafka/pull/19820

命令

kafka-share-groups.sh --bootstrap-server localhost:9092 --group S1 --topic T1 --reset-offsets --to-earliest --execute

SPSO

Share group引入了SPSO(Share-partition start offset) 的概念,可以理解为当前第一个被客户端拉走,但是未被提交的record batches,这个功能主要就是对SPSO进行操作

新增请求与响应

请求体

{"apiKey": 91,"type": "request","listeners": ["broker"],"name": "AlterShareGroupOffsetsRequest","validVersions": "0","flexibleVersions": "0+","fields": [{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId","about": "The group identifier." },{ "name": "Topics", "type": "[]AlterShareGroupOffsetsRequestTopic", "versions": "0+","about": "The topics to alter offsets for.",  "fields": [{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,"about": "The topic name." },{ "name": "Partitions", "type": "[]AlterShareGroupOffsetsRequestPartition", "versions": "0+","about": "Each partition to alter offsets for.", "fields": [{ "name": "PartitionIndex", "type": "int32", "versions": "0+","about": "The partition index." },{ "name": "StartOffset", "type": "int64", "versions": "0+","about": "The share-partition start offset." }]}]}]
}

响应体

{"apiKey": 91,"type": "response","name": "AlterShareGroupOffsetsResponse","validVersions": "0","flexibleVersions": "0+",// Supported errors:// - GROUP_AUTHORIZATION_FAILED (version 0+)// - TOPIC_AUTHORIZATION_FAILED (version 0+)// - NOT_COORDINATOR (version 0+)// - COORDINATOR_NOT_AVAILABLE (version 0+)// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)// - GROUP_ID_NOT_FOUND (version 0+)// - NON_EMPTY_GROUP (version 0+)// - KAFKA_STORAGE_ERROR (version 0+)// - INVALID_REQUEST (version 0+)// - UNKNOWN_SERVER_ERROR (version 0+)"fields": [{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+","about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },{ "name": "ErrorCode", "type": "int16", "versions": "0+","about": "The top-level error code, or 0 if there was no error." },{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null","about": "The top-level error message, or null if there was no error." },{ "name": "Responses", "type": "[]AlterShareGroupOffsetsResponseTopic", "versions": "0+","about": "The results for each topic.", "fields": [{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,"about": "The topic name." },{ "name": "TopicId", "type": "uuid", "versions": "0+","about": "The unique topic ID." },{ "name": "Partitions", "type": "[]AlterShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [{ "name": "PartitionIndex", "type": "int32", "versions": "0+","about": "The partition index." },{ "name": "ErrorCode", "type": "int16", "versions": "0+","about": "The error code, or 0 if there was no error." },{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null","about": "The error message, or null if there was no error." }]}]}]
}

主要过程

  1. 只有空的消费组支持重置
  2. 重置过程中,会触发一次Group Coordinator初始化ShareGroupState的过程
  3. Group Coordinator向Share Coordinator持久化一条InitialzeShareGroupState的请求,持久化写入__share_group_state内部Topic

过程中遇到的问题

1、代码习惯、缩进以及语言风格的问题;

2、参数校验以及异常处理,各种情况下的参数抛出,比如下面这些场景

3、因为写API的过程中参考了ConsumerGroup的实现,没有注意到错误码的处理过程,实际上对于经典消费组重置偏移量的过程,是通过提交一个新的CommitOffset请求来实现的,并没有所为顶层错误码的概念; 但是后在AlterShareGroupOffsetsResponse的响应体存在顶层的异常ErrorCode与ErrorMessage,但是实做过程中没有处理并封装到各个分区的响应中。后来AJ写了一个follow-up完成了这块:https://github.com/apache/kafka/pull/20049/files

http://www.dtcms.com/a/491753.html

相关文章:

  • 网站开发流程 原型设计深圳世茂前海中心
  • Secret 与 ConfigMap配置资源管理
  • 泛微 企业网站建设计划网站开发后台框架
  • 做网站怎么添加图片企业营业执照查询系统入口
  • 大气污染扩散Calpuff模型应用
  • 【LeetCode热题100(44/100)】二叉树的右视图
  • 打工人日报#202510016
  • 青岛公司建设网站添加友情链接的技巧
  • 10.2.3 TrinityCore 网络模块封装
  • JS逆向-安全辅助项目接口联动JSRpc进阶调用BP插件autoDecode(下)
  • 试客网站程序源码南京网站制作步骤
  • 外贸网站用什么空间好福州网站建设优化
  • RoboIntern,一款自动化办公小助手
  • 前端中的受控组件与非受控组件:核心区别与实践指南
  • 逻辑600解析本03
  • 青海省建设厅建管处网站淘宝关键词排名优化
  • Day32_【 NLP _2.RNN及其变体 _(3) GRU】
  • 网站建设毕业设计刻光盘网站开发前景知乎
  • Git的diff命令
  • VBA 自动解压 WinZip 文件
  • 站长号制作网站副本
  • BSC 链的第二次觉醒:从山寨天堂到流动性引擎的演化逻辑
  • h5游戏免费下载:《下一个数字》
  • AgentScope:论文及实战
  • 网站建设域名怎么收费的郑州经济技术开发区建设局
  • plsql 异地连接 Oracle 的方法
  • Kernel5.4 Timer定时器使用
  • Spring Boot消息队列与事件驱动详解
  • sql中连接方式
  • 个人网站转为企业网站百度推广怎么登录