当前位置: 首页 > news >正文

Kafka Go客户端--Sarama

Kafka Go客户端

在Go中里面有三个比较有名气的Go客户端。

  • Sarama:用户数量最多,早期这个项目是在Shopify下面,现在挪到了IBM下。
  • segmentio/kafka-go:没啥大的缺点。
  • confluent-kafka-go:需要启用cgo,跨平台问题比较多,交叉编译也不支持。

Sarama 使用入门:tools

IBM/sarama: Sarama is a Go library for Apache Kafka.

在 Sarama 里面提供了一些简单的命令行工具,可以看做是 Shell脚本提供的功能一个子集。

Consumer和 producer中的用得比较多

在这里插入图片描述

1.设置 Go 代理(如果内网无法直连 proxy.golang.org)

export GOPROXY=https://goproxy.cn,direct
export GOSUMDB=sum.golang.google.cn

2.在虚拟机上执行安装命令:

  • ​ go install github.com/IBM/sar ama/tools/kafka-console-consumer@latest
  • ​ go install github.com/lBM/sarama/tools/kafka-console-producer@latest

3.把可执行文件所在目录加到 PATH(如果还没加)

export PATH=$PATH:$(go env GOBIN)

4.确认可执行文件在哪里

# 查看 GOBIN,如果你没显式设置,就会是空
go env GOBIN# 查看 GOPATH,默认是 $HOME/go(对于 root 用户就是 /root/go)
go env GOPATH#我的是/home/cxz/go/lib:/home/cxz/go/work

5.查看安装结果

ls /home/cxz/go/lib/bin
#应该能够看到kafka-console-consumer  kafka-console-producer

6.临时生效

export PATH=$PATH:/home/cxz/go/lib/bin# 然后验证
which kafka-console-consumer
# 应该输出 /home/cxz/go/lib/bin/kafka-console-consumer

7.永久生效

echo 'export PATH=$PATH:/home/cxz/go/lib/bin' >> ~/.bashrc
# 或者,如果你用的是 zsh:
# echo 'export PATH=$PATH:/home/cxz/go/lib/bin' >> ~/.zshrc# 然后重新加载配置
source ~/.bashrc

Sarama 使用入门:发送消息

虚拟机上执行

kafka-console-consumer -topic=test_topic -brokers=192.168.24.101:9094

Goland上执行

package mainimport ("github.com/IBM/sarama""github.com/stretchr/testify/assert""testing"
)var addrs = []string{"192.168.24.101:9094"}func TestSyncProducer(t *testing.T) {//创建一个 Sarama 的配置对象。cfg := sarama.NewConfig()//表示生产者要等待 Kafka 确认消息成功写入后再返回(同步模式)。如果不设置这个,SyncProducer.SendMessage 会一直失败。cfg.Producer.Return.Successes = true //同步的Producer一定要设置//创建一个同步的生产者实例producer, err := sarama.NewSyncProducer(addrs, cfg)assert.NoError(t, err)//构建消息并发送_, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: "test_topic",//消息数据本体Value: sarama.StringEncoder("hello world ,这是一条使用kafka的消息"),//会在生产者和消费者之间传递,消息头,可传递自定义键值对,比如 trace_id 用于链路追踪。Headers: []sarama.RecordHeader{{Key:   []byte("trace_id"),Value: []byte("123456"),},},//只作用于发送过程。元信息,在发送过程中使用,可以用来传递额外信息,发送完成后会原样返回(不会传给消费者)。Metadata: "这是metadata",})assert.NoError(t, err)
}

10.执行结果

Partition:	0
Offset:	0
Key:	
Value:	hello world ,这是一条使用kafka的消息

使用控制台工具连接Kafka

Sarama 使用入门:指定分区

可以注意到,前面所有的消息都被发送到了 Partition 0 上面。

正常来说,在 Sarama 里面,可以通过指定 config 中的Partitioner来指定最终的目标分区。

常见的方法:

  • ​ Random:随机挑一个。
  • ​ RoundRobin:轮询。
  • ​ Hash(默认):根据 key 的哈希值来筛选一个。
  • ​ Manual: 根据 Message 中的 partition 字段来选择。
  • ​ ConsistentCRC:一致性哈希,用的是 CRC32 算法。
  • ​ Custom:实际上不 Custom,而是自定义一部分Hash 的参数,本质上是一个 Hash 的实现。
//默认HashPartitioner  适合: 按用户 ID、订单 ID 等字段分区场景
cfg.Producer.Partitioner = sarama.NewHashPartitioner
//使用 CRC32 算法 计算 Key 的哈希。 适合: 需要高一致性分布的业务,例如日志收集系统
cfg.Producer.Partitioner = sarama.NewConsistentCRCHashPartitioner
//忽略 Key,每条消息随机分配 partition。  适合: 普通消息队列、广播类场景。
cfg.Producer.Partitioner = sarama.NewRandomPartitioner
//需要手动指定 partition(ProducerMessage.Partition 字段)。适合: 明确知道要写哪个 partition,例如做数据分流
cfg.Producer.Partitioner = sarama.NewManualPartitioner
//用于实现你自己的 Partitioner  一般不推荐使用这个空参函数(它会 panic),应实现完整接口。
cfg.Producer.Partitioner = sarama.NewCustomPartitioner()
//允许你使用自定义哈希函数来做 key 分区。  适合: 有特定哈希策略需求时,例如分布要尽可能均匀。
cfg.Producer.Partitioner = sarama.NewCustomHashPartitioner(func() hash.Hash32 {})Topic: "test_topic",
//分区依据
Key:   sarama.StringEncoder("user_123"), // 🔑 这里是分区依据
//消息数据本体
Value: sarama.StringEncoder("hello world ,这是一条使用kafka的消息"),

最典型的场景,就是利用Partitioner来保证同一个业务的消息一定发送到同一个分区上,从而保证业 有序。

Sarama 使用入门:异步发送

Sarama有一个异步发送的producer,它的用法稍微复杂一点。

  • 把Return.Success和 Errors都设置为true,这是为了后面能够拿到发送结果。
  • 初始化异步producer。
  • 从producer里面拿到Input的channel,并且发送 一条消息。
  • ​ 利用select case,同时**监听Success和Error两个channel,**来获得发送成功与否的信息。
func TestAsyncProducer(t *testing.T) {cfg := sarama.NewConfig()//怎么知道发送是否成功cfg.Producer.Return.Errors = truecfg.Producer.Return.Successes = trueproducer, err := sarama.NewAsyncProducer(addrs, cfg)require.NoError(t, err)messages := producer.Input()go func() {for {messages <- &sarama.ProducerMessage{Topic: "test_topic",//分区依据Key: sarama.StringEncoder("user_123"), // 🔑 这里是分区依据//消息数据本体Value: sarama.StringEncoder("hello world ,这是一条使用kafka的消息"),//会在生产者和消费者之间传递Headers: []sarama.RecordHeader{{Key:   []byte("trace_id"),Value: []byte("123456"),},},//只作用于发送过程Metadata: "这是metadata",}}}()errCh := producer.Errors()succCh := producer.Successes()for {//两个都不满足就会阻塞select {case err := <-errCh:t.Log("发送出了问题", err.Err)case <-succCh:t.Log("发送成功")}}
}

Sarama 使用入门:acks

在Kafka里面,生产者在发送数据的时候,有一个很关键的参数,就是 acks。
有三个取值:

  • ​ 0:客户端发一次,不需要服务端的确认。
  • ​ 1:客户端发送,并且需要服务端写入到主分区。
  • ​ -1:客户端发送,并且需要服务端同步到所有的ISR 上。

从上到下,性能变差,但是数据可靠性上升。需要性能,选 0,需要消息不丢失,选-1。

理解acks你就要抓住核心点,谁ack才算数?

  • 0:TCP协议返回了ack就可以。
  • 1:主分区确认写入了就可以。
  • -1:所有的ISR都确认了就可以。

在这里插入图片描述

ISR (In Sync Replicas),用通俗易懂的话来说,就是跟上了节奏的从分区。

什么叫做跟上了节奏?就是它和主分区保持了数据同步。

所以,当消息被同步到从分区之后,如果主分区崩溃了那么依旧可以保证在从分区上还有数据。

在这里插入图片描述

sarama 使用入门:启动消费者

Sarama的消费者设计不是很直观,稍微有点复杂。

  • ​ 首先要初始化一个ConsumerGroup。
  • ​ 调用ConsumerGroup上的Consume方法。
  • ​ 为 Consume 方法传入一个 ConsumerGroupHandler的辅助方法。
package mainimport ("context""github.com/IBM/sarama""github.com/stretchr/testify/assert""log""testing"
)func TestConsumer(t *testing.T) {cfg := sarama.NewConfig()//正常来说,一个消费者都是归属一个消费者组的//消费者就是你的业务consumerGroup, err := sarama.NewConsumerGroup(addrs, "test_group", cfg)assert.NoError(t, err)err = consumerGroup.Consume(context.Background(), []string{"test_topic"}, testConsumerGroupHandler{})//你消费结束,就会到这里t.Log(err)
}type testConsumerGroupHandler struct {
}func (t testConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {log.Println("Setup session:", session)return nil
}func (t testConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {log.Println("Cleanup session:", session)return nil
}func (t testConsumerGroupHandler) ConsumeClaim(//代表的是你和Kafka的会话(从建立连接到连接彻底断掉的那一段时间)session sarama.ConsumerGroupSession,claim sarama.ConsumerGroupClaim) error {msgs := claim.Messages()for msg := range msgs {//var bizMsg MyBizMsg//err := json.Unmarshal(msg.Value, &bizMsg)//if err != nil {//	//这就是消费消息出错//	//大多数时候就是重试//	//记录日志//	continue//}log.Println(string(msg.Value))session.MarkMessage(msg, "")}//什么情况下会到这里//msg被人关了,也就是要退出消费逻辑return nil
}type MyBizMsg struct {Name string
}

sarama 使用入门:ConsumerGroupHandler

下面的代码就是对ConsumerGroupHandler的实现,关键就是在消费了msg之后,如果消费成功了,要记得提交。

也就是调用MarkMessage方法。

至于 Setup 和 Cleanup 方法反而用得不多。

type testConsumerGroupHandler struct {
}func (t testConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {log.Println("Setup session:", session)return nil
}func (t testConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {log.Println("Cleanup session:", session)return nil
}func (t testConsumerGroupHandler) ConsumeClaim(//代表的是你和Kafka的会话(从建立连接到连接彻底断掉的那一段时间)session sarama.ConsumerGroupSession,claim sarama.ConsumerGroupClaim) error {msgs := claim.Messages()for msg := range msgs {//var bizMsg MyBizMsg//err := json.Unmarshal(msg.Value, &bizMsg)//if err != nil {//	//这就是消费消息出错//	//大多数时候就是重试//	//记录日志//	continue//}log.Println(string(msg.Value))session.MarkMessage(msg, "")}//什么情况下会到这里//msg被人关了,也就是要退出消费逻辑return nil
}

sarama 使用入门:利用context来控制消费者退出

可以利用初始化ConsumerGroup 时候传入的ctx来控制消费者组退出消息。

下图中,我传入了一个超时的context,那么:

	start := time.Now()//这里是测试,我们就控制消费10sctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)defer cancel()//开始消费,会在这里阻塞住err = consumerGroup.Consume(ctx, []string{"test_topic"}, testConsumerGroupHandler{})//你消费结束,就会到这里t.Log(err, time.Since(start).String())	

下图中,我主动调用了cancel,那么:

	start := time.Now()//这里是测试,我们就控制消费5sctx, cancel := context.WithCancel(context.Background())time.AfterFunc(time.Second*5, func() {cancel()})//开始消费,会在这里阻塞住err = consumerGroup.Consume(ctx, []string{"test_topic"}, testConsumerGroupHandler{})//你消费结束,就会到这里t.Log(err, time.Since(start).String())
  • 如果超时了
  • 如果我主动调用了cancel

以上两种情况,任何一种情况出现了,都会让消费者退出消息。

sarama 使用入门:指定偏移量消费

在部分场景下,我们会希望消费历史消息,或者从某个消息开始消费,那么可以考虑在Setup里面设置偏移量。

关键调用是 ResetOffset。

不过一般建议走离线渠道,操作Kafka集群去重置对应的偏移量。

核心在于,你并不是每次重新部署,重新启动都是要重置这个偏移量的。

只要你的消费者组在这个分区上有过“已提交的 offset”,Kafka 就会优先使用这个提交的 offset,而忽略你在 Setup() 中设置的 offset

// 在每次 rebalance 或初次连接 Kafka 后调用,用于初始化。
func (t testConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {//执行一些初始化的事情log.Println("Setup")//假设要重置到0var offset int64 = 0//遍历所有的分区partitions := session.Claims()["test_topic"]for _, p := range partitions {session.ResetOffset("test_topic", p, offset, "")//session.ResetOffset("test_topic", p, sarama.OffsetNewest, "")//session.ResetOffset("test_topic", p, sarama.OffsetOldest, "")}return nil
}

sarama使用入门:异步消费,批量提交

正常来说,为了在异步消费失败之后还能继续重试,可以考虑异步消费一批,提交一批。

下图中,ctx.Done分支用来控制凑够一批的超时机制,防止生产者的速率很低,一直凑不够一批。

func (t testConsumerGroupHandler) ConsumeClaim(//代表的是你和Kafka的会话(从建立连接到连接彻底断掉的那一段时间)//可以通过 session 控制 offset 提交,获取消费者信息,并感知退出时机。session sarama.ConsumerGroupSession,//claim 是你获取消息的入口claim sarama.ConsumerGroupClaim) error {msgs := claim.Messages()//设置批量处理的条数const batchSize = 10for {ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)var eg errgroup.Groupvar last *sarama.ConsumerMessagefor i := 0; i < batchSize; i++ {done := falseselect {case <-ctx.Done()://这边表示超时了done = truecase msg, ok := <-msgs:if !ok {cancel()return nil}last = msgmsg1 := msgeg.Go(func() error {//我就在这里消费time.Sleep(time.Second)//你在这里重试log.Println(string(msg1.Value))return nil})}if done {break}}cancel()err := eg.Wait()if err != nil {//这边能怎么办?//记录日志continue}//就这样session.MarkMessage(last, "")}return nil
}

另外一个分支就是读取消息,并且提交到errgroup里面执行。

Sleep是模拟长时间业务执行。

相关文章:

  • OpenCV进阶操作:风格迁移以及DNN模块解析
  • 基于STM32、HAL库的TDA7719TR音频接口芯片驱动程序设计
  • 基于Win在VSCode部署运行OpenVINO模型
  • MySQL 8.0 OCP 1Z0-908 题目解析(2)
  • 基于STM32、HAL库的ADAU1701JSTZ音频接口芯片驱动程序设计
  • Windows部署LatentSync唇形同步(字节跳动北京交通大学联合开源)
  • 仓颉Magic亮相GOSIM AI Paris 2025:掀起开源AI框架新热潮
  • 初始“协议”
  • golang 定时器
  • 软件测试——面试八股文(入门篇)
  • React vs Vue:点击外部事件处理的对比与实现
  • 深入理解 TypeScript 的 Partial<T> 类型
  • 菜狗的脚步学习
  • [SAP] 通过程序名获取事务码TCode
  • Scala和Go差异
  • c++STL-通用(反向)迭代器适配器
  • 算法第十七天|654. 最大二叉树、617.合并二叉树、700.二叉搜索树中的搜索、98.验证二叉搜索树
  • 第十六章 常用存储器介绍
  • 手机相册的 “智能分类” 功能
  • 数学复习笔记 7
  • 美国和沙特签署上千亿美元军售协议
  • 受美关税影响,本田预计新财年净利下降七成,并推迟加拿大建厂计划
  • 在笔墨金石间,看胡问遂与梅舒适的艺术对话
  • 沈阳卫健委通报“健康证”办理乱象:涉事医院已被立案查处
  • 27岁杨阳拟任苏木镇党委副职,系2020年内蒙古自治区选调生
  • 警方通报“网约车司机偷拍女乘客”:已被行政拘留