go资深之路笔记(九)kafka浅析
一、作用
kafka 是一个消息队列,但不止是一个消息队列,他是一个分布式流式平台;
其实 go可选择的消息队列工具不止一个他们的适用场景各有不同,如果需要高吞吐量的话可以优先考虑,下面是ai给出的讲解:
二、confluent-kafka-go 客户端:
go的客户端库有多个,这里只讲 confluent-kafka-go
首先说下 他的api:
// 生产者api
func NewProducer(conf *ConfigMap) (*Producer, error) // 创建
func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error // 发送消息
func (p *Producer) Events() chan Event // 获取生产者事件(比如发送完成后回调)
func (p *Producer) Flush(timeoutMs int) int // 确保所有消息发送完成// 消费者api
func NewConsumer(conf *ConfigMap) (*Consumer, error) // 创建
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error) // 订阅主题
func (c *Consumer) Poll(timeoutMs int) (event Event) // 拉取信息
func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error) //提交偏移量(告诉服务端消息已经处理了,这个最好手动,不然默认提交的话失败了就丢失了)// 结构体
type ConfigMap map[string]ConfigValue // 配置type Message struct { // 消息TopicPartition TopicPartitionValue []byteKey []byteTimestamp time.TimeTimestampType TimestampTypeOpaque interface{}Headers []Header
}type TopicPartition struct { // 主题分区信息Topic *stringPartition int32Offset OffsetMetadata *stringError error
}
三、实战:
3.1 docker kafka服务器创建
docker-compose.yml
version: '3'services:zookeeper:image: wurstmeister/zookeeperports:- "2181:2181"kafka:image: wurstmeister/kafkaports:- "9092:9092"environment:KAFKA_ADVERTISED_HOST_NAME: localhostKAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_CREATE_TOPICS: "test_topic:1:1"depends_on:- zookeeper
3.2 启动
docker-compose up -d
3.3 修改配置:
ps:如果服务器和客户端都在本地运行不用改也行
# 进入容器内部docker-compose exec kafka bash# 看 kafka的配置在哪个目录ls -l /etc/kafka/ls -l /opt/kafka/# 我的在 /opt/kafka/ | 打开 server.properties配置vim /opt/kafka/config/server.properties# 修改配置参数:listeners=PLAINTEXT://0.0.0.0:9092advertised.listeners=PLAINTEXT://你的公网IP:9092#保存退出## 重启生效:docker-compose restart kafka
3.4 生产者代码
confluent-kafka.go
package mainimport ("fmt""log""time""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {// 生产者配置 :cite[4]config := &kafka.ConfigMap{"bootstrap.servers": "配置填写的ip:9092", // Kafka broker地址 !!! todo 注意改成自己的ip// 如需SASL认证(如连接云服务),取消下面注释 :cite[4]:cite[6]// "security.protocol": "SASL_PLAINTEXT",// "sasl.mechanism": "PLAIN",// "sasl.username": "your_username",// "sasl.password": "your_password","acks": -1, // 等待所有副本确认,保证数据不丢失"retries": 3, // 失败重试次数"retry.backoff.ms": 1000, // 重试间隔"enable.idempotence": true, // 启用幂等性,避免重复消息}// 创建生产者producer, err := kafka.NewProducer(config)if err != nil {log.Fatalf("Error creating producer: %v", err)}defer producer.Close()// 监听消息传递结果go func() {for e := range producer.Events() {switch ev := e.(type) {case *kafka.Message:if ev.TopicPartition.Error != nil {fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)} else {fmt.Printf("Delivered message to %v, key: %s\n",ev.TopicPartition, string(ev.Key))}}}}()// 发送消息topic := "test_topic"for i := 0; i < 10; i++ {key := fmt.Sprintf("key-%d", i)value := fmt.Sprintf("Hello Kafka! %s", time.Now().Format("15:04:05"))err = producer.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic,Partition: kafka.PartitionAny,},Key: []byte(key),Value: []byte(value),}, nil)if err != nil {log.Printf("Produce error: %v", err)}time.Sleep(time.Second)}// 等待所有消息发送完成producer.Flush(15 * 1000)fmt.Println("All messages sent")
}
运行后:
3.5 消费者代码
package mainimport ("fmt""log""os""os/signal""syscall""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {// 消费者配置 :cite[6]config := &kafka.ConfigMap{"bootstrap.servers": "配置填写的ip:9092", // Kafka broker地址 !!! todo 注意改成自己的ip// 如需SASL认证(如连接云服务),取消下面注释 :cite[4]:cite[6]// "security.protocol": "SASL_SSL",// "sasl.mechanism": "PLAIN",// "sasl.username": "your_username",// "sasl.password": "your_password","group.id": "test_consumer_group", // 消费组ID"auto.offset.reset": "earliest", // 从最早的消息开始消费"enable.auto.commit": false, // 手动提交偏移量"enable.auto.offset.store": false, // 手动存储偏移量"max.poll.interval.ms": 300000, // 最大poll间隔"session.timeout.ms": 10000, // 会话超时}// 创建消费者consumer, err := kafka.NewConsumer(config)if err != nil {log.Fatalf("Error creating consumer: %v", err)}defer consumer.Close()// 订阅主题err = consumer.SubscribeTopics([]string{"test_topic"}, nil)if err != nil {log.Fatalf("Subscribe error: %v", err)}// 处理中断信号sigchan := make(chan os.Signal, 1)signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)// 消费消息run := truefor run {select {case sig := <-sigchan:fmt.Printf("Caught signal %v: terminating\n", sig)run = falsedefault:// 轮询消息,超时时间100msevent := consumer.Poll(100)if event == nil {continue}switch e := event.(type) {case *kafka.Message:fmt.Printf("Received message: %s (key: %s) [%s] at offset %v\n",string(e.Value), string(e.Key), e.TopicPartition, e.TopicPartition.Offset)// 处理业务逻辑...// 手动提交偏移量_, err := consumer.CommitMessage(e)if err != nil {fmt.Printf("Commit error: %v\n", err)}case kafka.Error:fmt.Printf("Error: %v\n", e)if e.Code() == kafka.ErrAllBrokersDown {run = false}default:// 忽略其他事件}}}
}
运行后截图:
最后:这里只讲 kafka的基本适合用,至于更深层的原理和部署细节以及技巧之后在写,留个位~