当前位置: 首页 > news >正文

go资深之路笔记(九)kafka浅析

一、作用

kafka 是一个消息队列,但不止是一个消息队列,他是一个分布式流式平台;
其实 go可选择的消息队列工具不止一个他们的适用场景各有不同,如果需要高吞吐量的话可以优先考虑,下面是ai给出的讲解:
在这里插入图片描述

二、confluent-kafka-go 客户端:

go的客户端库有多个,这里只讲 confluent-kafka-go
首先说下 他的api:

// 生产者api
func NewProducer(conf *ConfigMap) (*Producer, error) // 创建
func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error // 发送消息
func (p *Producer) Events() chan Event // 获取生产者事件(比如发送完成后回调)
func (p *Producer) Flush(timeoutMs int) int // 确保所有消息发送完成// 消费者api
func NewConsumer(conf *ConfigMap) (*Consumer, error) 	// 创建
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error)	// 订阅主题
func (c *Consumer) Poll(timeoutMs int) (event Event) 	// 拉取信息
func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error)	//提交偏移量(告诉服务端消息已经处理了,这个最好手动,不然默认提交的话失败了就丢失了)// 结构体
type ConfigMap map[string]ConfigValue			// 配置type Message struct { // 消息TopicPartition TopicPartitionValue          []byteKey            []byteTimestamp      time.TimeTimestampType  TimestampTypeOpaque         interface{}Headers        []Header
}type TopicPartition struct {		// 主题分区信息Topic     *stringPartition int32Offset    OffsetMetadata  *stringError     error
}

三、实战:

3.1 docker kafka服务器创建

docker-compose.yml

	version: '3'services:zookeeper:image: wurstmeister/zookeeperports:- "2181:2181"kafka:image: wurstmeister/kafkaports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: localhostKAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_CREATE_TOPICS: "test_topic:1:1"depends_on:- zookeeper

3.2 启动

 docker-compose up -d

3.3 修改配置:

ps:如果服务器和客户端都在本地运行不用改也行

# 进入容器内部docker-compose exec kafka bash# 看 kafka的配置在哪个目录ls -l /etc/kafka/ls -l /opt/kafka/# 我的在 /opt/kafka/  | 打开 server.properties配置vim /opt/kafka/config/server.properties# 修改配置参数:listeners=PLAINTEXT://0.0.0.0:9092advertised.listeners=PLAINTEXT://你的公网IP:9092#保存退出## 重启生效:docker-compose restart kafka 

3.4 生产者代码

confluent-kafka.go

package mainimport ("fmt""log""time""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {// 生产者配置 :cite[4]config := &kafka.ConfigMap{"bootstrap.servers": "配置填写的ip:9092", // Kafka broker地址  !!! todo 注意改成自己的ip// 如需SASL认证(如连接云服务),取消下面注释 :cite[4]:cite[6]// "security.protocol": "SASL_PLAINTEXT",// "sasl.mechanism":    "PLAIN",// "sasl.username":     "your_username",// "sasl.password":     "your_password","acks":               -1,   // 等待所有副本确认,保证数据不丢失"retries":            3,    // 失败重试次数"retry.backoff.ms":   1000, // 重试间隔"enable.idempotence": true, // 启用幂等性,避免重复消息}// 创建生产者producer, err := kafka.NewProducer(config)if err != nil {log.Fatalf("Error creating producer: %v", err)}defer producer.Close()// 监听消息传递结果go func() {for e := range producer.Events() {switch ev := e.(type) {case *kafka.Message:if ev.TopicPartition.Error != nil {fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)} else {fmt.Printf("Delivered message to %v, key: %s\n",ev.TopicPartition, string(ev.Key))}}}}()// 发送消息topic := "test_topic"for i := 0; i < 10; i++ {key := fmt.Sprintf("key-%d", i)value := fmt.Sprintf("Hello Kafka! %s", time.Now().Format("15:04:05"))err = producer.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic:     &topic,Partition: kafka.PartitionAny,},Key:   []byte(key),Value: []byte(value),}, nil)if err != nil {log.Printf("Produce error: %v", err)}time.Sleep(time.Second)}// 等待所有消息发送完成producer.Flush(15 * 1000)fmt.Println("All messages sent")
}

运行后:
在这里插入图片描述

3.5 消费者代码

package mainimport ("fmt""log""os""os/signal""syscall""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {// 消费者配置 :cite[6]config := &kafka.ConfigMap{"bootstrap.servers": "配置填写的ip:9092", // Kafka broker地址  !!! todo 注意改成自己的ip// 如需SASL认证(如连接云服务),取消下面注释 :cite[4]:cite[6]// "security.protocol": "SASL_SSL",// "sasl.mechanism":    "PLAIN",// "sasl.username":     "your_username",// "sasl.password":     "your_password","group.id":                 "test_consumer_group", // 消费组ID"auto.offset.reset":        "earliest",            // 从最早的消息开始消费"enable.auto.commit":       false,                 // 手动提交偏移量"enable.auto.offset.store": false,                 // 手动存储偏移量"max.poll.interval.ms":     300000,                // 最大poll间隔"session.timeout.ms":       10000,                 // 会话超时}// 创建消费者consumer, err := kafka.NewConsumer(config)if err != nil {log.Fatalf("Error creating consumer: %v", err)}defer consumer.Close()// 订阅主题err = consumer.SubscribeTopics([]string{"test_topic"}, nil)if err != nil {log.Fatalf("Subscribe error: %v", err)}// 处理中断信号sigchan := make(chan os.Signal, 1)signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)// 消费消息run := truefor run {select {case sig := <-sigchan:fmt.Printf("Caught signal %v: terminating\n", sig)run = falsedefault:// 轮询消息,超时时间100msevent := consumer.Poll(100)if event == nil {continue}switch e := event.(type) {case *kafka.Message:fmt.Printf("Received message: %s (key: %s) [%s] at offset %v\n",string(e.Value), string(e.Key), e.TopicPartition, e.TopicPartition.Offset)// 处理业务逻辑...// 手动提交偏移量_, err := consumer.CommitMessage(e)if err != nil {fmt.Printf("Commit error: %v\n", err)}case kafka.Error:fmt.Printf("Error: %v\n", e)if e.Code() == kafka.ErrAllBrokersDown {run = false}default:// 忽略其他事件}}}
}

运行后截图:
在这里插入图片描述

最后:这里只讲 kafka的基本适合用,至于更深层的原理和部署细节以及技巧之后在写,留个位~

http://www.dtcms.com/a/494871.html

相关文章:

  • Java String 性能优化与内存管理:现代开发实战指南
  • 【软考备考】 NoSQL数据库有哪些,键值型、文档型、列族型、图数据库的特点与适用场景
  • 论《素数的几种筛法》
  • html静态页面怎么放在网站上原平的旅游网站怎么做的
  • 网页设计与网站建设作业公众号小程序制作步骤
  • 律师怎么做网站简单大气网站模板
  • 偏振相机在半导体制造的领域的应用
  • Uniapp微信小程序开发:EF Core 中级联删除
  • Java从入门到精通 - 集合框架(二)
  • 3proxy保姆级教程:WIN连接远端HTTPS代理
  • 大厂AI各走“开源”路
  • 室内装修效果图网站有哪些百度网盟推广是什么
  • grootN1 grootN1.5 gr00t安装方法以及使用(学习)
  • Typora(跨平台MarkDown编辑器) v1.12.2 中文绿色版
  • Unity开发抖音小游戏的震动
  • 团队作业——概要设计和数据库设计
  • 在Spring Boot开发中,HEAD、OPTIONS和 TRACE这些HTTP方法各有其特定的应用场景和实现方式
  • Flink DataStream「全分区窗口处理」mapPartition / sortPartition / aggregate / reduce
  • 网站备案号码查询大连网页设计哪家好
  • Next.js 入门指南
  • arcgis api for javascript 修改地图图层要素默认的高亮效果
  • 【论文速递】2025年第28周(Jul-06-12)(Robotics/Embodied AI/LLM)
  • 宁波市鄞州区建设局网站怎么做网站静态布局
  • 一文掌握 CodeX CLI 安装以及使用!
  • Android实战进阶 - 用户闲置超时自动退出登录功能详解
  • 2二、u-boot移植
  • 淄博网站建设哪家好常德网站建设技术
  • Java Spring日志
  • OpenAI Agent Kit 全网首发深度解读与上手指南
  • 网络:2.Socket编程UDP