当前位置: 首页 > news >正文

docker和k3s安装kafka,go语言发送和接收kafka消息

docker安装命令,其中172.16.11.111是宿主机ip,14818是宿主机端口,对应容器端口9092:

docker run -d \--name kafka \-p 14818:9092 \-p 9093:9093 \-v /tmp/kraft-combined-logs:/tmp/kraft-combined-logs \-e TZ=Asia/Shanghai \-e KAFKA_NODE_ID=1 \-e KAFKA_PROCESS_ROLES=broker,controller \-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.11.111:14818 \-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \-e KAFKA_NUM_PARTITIONS=3 \-e KAFKA_LOG_DIRS=/tmp/kraft-combined-logs \-e CLUSTER_ID=5L6g3nShT-eMCtK--X86sw \apache/kafka-native:4.1.0

k3s的yaml,其中172.16.11.111是宿主机ip,14818是宿主机端口,对应容器端口9092:

apiVersion: apps/v1
kind: Deployment
metadata:labels:app: kafkaname: kafkanamespace: moonfdd
spec:replicas: 1selector:matchLabels:app: kafkatemplate:metadata:labels:app: kafkaspec:initContainers:- name: kafka-fix-data-volume-permissionsimage: alpineimagePullPolicy: IfNotPresentcommand:- sh- -c- "chown -R 1000:1000 /tmp/kraft-combined-logs"volumeMounts:- mountPath: /tmp/kraft-combined-logsname: volvcontainers:- env:- name: TZvalue: Asia/Shanghai- name: KAFKA_NODE_IDvalue: "1"- name: KAFKA_PROCESS_ROLESvalue: broker,controller- name: KAFKA_LISTENERSvalue: PLAINTEXT://:9092,CONTROLLER://:9093- name: KAFKA_ADVERTISED_LISTENERSvalue: PLAINTEXT://172.16.11.111:14818- name: KAFKA_CONTROLLER_LISTENER_NAMESvalue: CONTROLLER- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAPvalue: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT- name: KAFKA_CONTROLLER_QUORUM_VOTERSvalue: 1@localhost:9093- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTORvalue: "1"- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTORvalue: "1"- name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISRvalue: "1"- name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MSvalue: "0"- name: KAFKA_NUM_PARTITIONSvalue: "3"- name: KAFKA_LOG_DIRSvalue: /tmp/kraft-combined-logs- name: CLUSTER_IDvalue: "5L6g3nShT-eMCtK--X86sw"  # 固定集群ID,仅首次启动格式化使用image: 'apache/kafka-native:4.1.0'imagePullPolicy: IfNotPresentname: kafkavolumeMounts:- mountPath: /tmp/kraft-combined-logsname: volvvolumes:- hostPath:path: /root/k8s/moonfdd/kafka/tmp/kraft-combined-logstype: DirectoryOrCreatename: volv
---
apiVersion: v1
kind: Service
metadata:labels:app: kafkaname: kafkanamespace: moonfdd
spec:ports:- port: 9092protocol: TCPtargetPort: 9092name: 9092-9092- port: 9093protocol: TCPtargetPort: 9093name: 9093-9093selector:app: kafkatype: NodePort

go发送kafka消息:github.com/segmentio/kafka-go

package mainimport ("context""log""github.com/segmentio/kafka-go"
)func main() {// 创建一个Kafka writer(Producer)w := kafka.NewWriter(kafka.WriterConfig{Brokers:  []string{"172.16.11.111:14818"}, // Kafka broker 地址Topic:    "test-topic",                    // 发送的 topicBalancer: &kafka.LeastBytes{},             // 负载均衡策略})// 写入消息err := w.WriteMessages(context.Background(),kafka.Message{Key:   []byte("Key-A"),Value: []byte("Hello Kafka from Go!"),},)if err != nil {log.Fatalf("could not write message: %v", err)}log.Println("Message sent successfully!")// 关闭 writerw.Close()
}

go接收kafka消息:github.com/segmentio/kafka-go

package mainimport ("context""log""github.com/segmentio/kafka-go"
)func main() {// 创建 Kafka reader(Consumer)r := kafka.NewReader(kafka.ReaderConfig{Brokers:  []string{"172.16.11.111:14818"}, // Kafka broker 地址Topic:    "test-topic",                    // 订阅的 topicGroupID:  "my-consumer-group",             // 消费者组,确保相同组会读取上一 offsetMinBytes: 10e3,                            // 最小fetch字节数MaxBytes: 10e6,                            // 最大fetch字节数})for {// 读取消息(会自动从上次的 offset 开始)m, err := r.ReadMessage(context.Background())if err != nil {log.Fatalf("could not read message: %v", err)}log.Printf("offset:%d | key:%s | value:%s\n", m.Offset, string(m.Key), string(m.Value))}// r.Close() // 如果你打算退出循环时关闭
}

go发送kafka消息:github.com/IBM/sarama

package mainimport ("fmt""log""time""github.com/IBM/sarama"
)func main() {// 配置生产者config := sarama.NewConfig()config.Producer.Return.Successes = true          // 确保消息发送成功config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认config.Producer.Retry.Max = 3                    // 重试次数// 重要:配置客户端使用正确的主机config.Net.SASL.Enable = falseconfig.Net.TLS.Enable = falseconfig.Version = sarama.MaxVersion// 创建同步生产者producer, err := sarama.NewSyncProducer([]string{"172.16.11.111:14818"}, config)if err != nil {log.Fatalf("创建生产者失败: %v", err)}defer producer.Close()// 构造消息message := &sarama.ProducerMessage{Topic: "test-topic",Key:   sarama.StringEncoder("message-key"),Value: sarama.StringEncoder(fmt.Sprintf("Hello Kafka! %v", time.Now())),}// 发送消息partition, offset, err := producer.SendMessage(message)if err != nil {log.Fatalf("发送消息失败: %v", err)}fmt.Printf("消息发送成功! 分区: %d, 偏移量: %d\n", partition, offset)
}

go接收kafka消息:github.com/IBM/sarama

package mainimport ("context""fmt""log""os""os/signal""github.com/IBM/sarama"
)type Consumer struct{}func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {// 会话初始化,可以在这里做一些准备工作return nil
}func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {// 会话结束时的清理操作return nil
}func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {// claim.Messages() 会不断返回新消息for msg := range claim.Messages() {fmt.Printf("Topic:%s Partition:%d Offset:%d Value:%s\n",msg.Topic, msg.Partition, msg.Offset, string(msg.Value))// 标记该消息已被处理,Kafka会自动保存offsetsession.MarkMessage(msg, "")}return nil
}func main() {// Kafka集群地址brokers := []string{"172.16.11.111:14818"}groupID := "my-group" // 消费者组ID,保持不变才能从上次offset消费topics := []string{"test-topic"}// 配置config := sarama.NewConfig()config.Version = sarama.MaxVersion // Kafka版本config.Consumer.Return.Errors = true// 非首次启动时自动从上次位置开始config.Consumer.Offsets.Initial = sarama.OffsetNewest// OffsetNewest: 如果没有历史offset,从最新开始;// OffsetOldest: 如果没有历史offset,从最旧开始。// 创建消费者组consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)if err != nil {log.Fatalf("Error creating consumer group: %v", err)}defer consumerGroup.Close()consumer := &Consumer{}ctx, cancel := context.WithCancel(context.Background())defer cancel()go func() {for err := range consumerGroup.Errors() {log.Printf("Error: %v", err)}}()log.Println("Kafka consumer started...")// 优雅退出go func() {sigchan := make(chan os.Signal, 1)signal.Notify(sigchan, os.Interrupt)<-sigchancancel()}()// 循环消费for {if err := consumerGroup.Consume(ctx, topics, consumer); err != nil {log.Printf("Error from consumer: %v", err)}// 检查退出if ctx.Err() != nil {return}}
}
http://www.dtcms.com/a/410288.html

相关文章:

  • GraphRAG(知识图谱结合大模型)对人工智能中自然语言处理的深层语义分析的影响与启示
  • 石化建设分会网站广州市城市建设档案馆网站
  • 建网站是自己做还是用CMS邢台做网站咨询
  • MySQL GTID一致性错误全解析:从连接池复用到完美解决方案
  • PostgreSQL表备份并重命名出现索引、外键仍指向旧表,恢复后仍失败的问题
  • 【生态再升级】IvorySQL 4.5 与银河麒麟高级服务器操作系统V11完成适配认证!
  • 智慧团建系统官方网站登录网站制作 呼和浩特
  • 个体商户建自己的网站做销售小广告怎么能弄干净
  • 设计模式(C++)详解——迭代器模式(2)
  • perl踩坑系列=====正则表达式捕获
  • MQ-2烟雾传感器详解——从工作原理到实际应用
  • Ubuntu启动终端时默认窗口最大化
  • 整站seo排名郑州经济技术开发区政务服务中心
  • 求一些做里番的网站php网站建设实训引言
  • 嵌入用户idea到大模型并针对Verilog语言生成任务的微调实验报告
  • 【AI算力系统设计分析】1000PetaOps 算力云计算系统设计方案(大模型训练推理专项版)
  • JAVA露营基地预约户外露营预约下单系统小程序
  • ✨WPF编程基础【1.2】:XAML中的属性
  • 【MySQL】性能优化与核心机制深度解析
  • 珠海 网站建设和推广网站建设服务方案ppt模板下载
  • JUC:AQS源码分析(三)
  • 极简文件列表
  • CSS Grid 网格布局完整指南:从容器到项目,实战详解
  • 百度手机模板网站阿里云WordPress主题
  • 批量获取oracle的AWR报告方法
  • docker jenkins gitlab 流水线构建
  • MySQL 配置调优参数:从基础到生产级优化指南
  • 旅游网站设计模板cdr里做网站超级链接
  • TypeScript + React + Ant Design 前端架构入门:搭建一个 Flask 个人博客前端
  • 小九源码-springboot051-智能推荐旅游平台