docker安装命令,其中172.16.11.111是宿主机ip,14818是宿主机端口,对应容器端口9092:
docker run -d \--name kafka \-p 14818:9092 \-p 9093:9093 \-v /tmp/kraft-combined-logs:/tmp/kraft-combined-logs \-e TZ=Asia/Shanghai \-e KAFKA_NODE_ID=1 \-e KAFKA_PROCESS_ROLES=broker,controller \-e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.11.111:14818 \-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \-e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \-e KAFKA_NUM_PARTITIONS=3 \-e KAFKA_LOG_DIRS=/tmp/kraft-combined-logs \-e CLUSTER_ID=5L6g3nShT-eMCtK--X86sw \apache/kafka-native:4.1.0
k3s的yaml,其中172.16.11.111是宿主机ip,14818是宿主机端口,对应容器端口9092:
apiVersion: apps/v1
kind: Deployment
metadata:labels:app: kafkaname: kafkanamespace: moonfdd
spec:replicas: 1selector:matchLabels:app: kafkatemplate:metadata:labels:app: kafkaspec:initContainers:- name: kafka-fix-data-volume-permissionsimage: alpineimagePullPolicy: IfNotPresentcommand:- sh- -c- "chown -R 1000:1000 /tmp/kraft-combined-logs"volumeMounts:- mountPath: /tmp/kraft-combined-logsname: volvcontainers:- env:- name: TZvalue: Asia/Shanghai- name: KAFKA_NODE_IDvalue: "1"- name: KAFKA_PROCESS_ROLESvalue: broker,controller- name: KAFKA_LISTENERSvalue: PLAINTEXT://:9092,CONTROLLER://:9093- name: KAFKA_ADVERTISED_LISTENERSvalue: PLAINTEXT://172.16.11.111:14818- name: KAFKA_CONTROLLER_LISTENER_NAMESvalue: CONTROLLER- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAPvalue: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT- name: KAFKA_CONTROLLER_QUORUM_VOTERSvalue: 1@localhost:9093- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTORvalue: "1"- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTORvalue: "1"- name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISRvalue: "1"- name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MSvalue: "0"- name: KAFKA_NUM_PARTITIONSvalue: "3"- name: KAFKA_LOG_DIRSvalue: /tmp/kraft-combined-logs- name: CLUSTER_IDvalue: "5L6g3nShT-eMCtK--X86sw" image: 'apache/kafka-native:4.1.0'imagePullPolicy: IfNotPresentname: kafkavolumeMounts:- mountPath: /tmp/kraft-combined-logsname: volvvolumes:- hostPath:path: /root/k8s/moonfdd/kafka/tmp/kraft-combined-logstype: DirectoryOrCreatename: volv
---
apiVersion: v1
kind: Service
metadata:labels:app: kafkaname: kafkanamespace: moonfdd
spec:ports:- port: 9092protocol: TCPtargetPort: 9092name: 9092-9092- port: 9093protocol: TCPtargetPort: 9093name: 9093-9093selector:app: kafkatype: NodePort
go发送kafka消息:github.com/segmentio/kafka-go
package mainimport ("context""log""github.com/segmentio/kafka-go"
)func main() {w := kafka.NewWriter(kafka.WriterConfig{Brokers: []string{"172.16.11.111:14818"}, Topic: "test-topic", Balancer: &kafka.LeastBytes{}, })err := w.WriteMessages(context.Background(),kafka.Message{Key: []byte("Key-A"),Value: []byte("Hello Kafka from Go!"),},)if err != nil {log.Fatalf("could not write message: %v", err)}log.Println("Message sent successfully!")w.Close()
}
go接收kafka消息:github.com/segmentio/kafka-go
package mainimport ("context""log""github.com/segmentio/kafka-go"
)func main() {r := kafka.NewReader(kafka.ReaderConfig{Brokers: []string{"172.16.11.111:14818"}, Topic: "test-topic", GroupID: "my-consumer-group", MinBytes: 10e3, MaxBytes: 10e6, })for {m, err := r.ReadMessage(context.Background())if err != nil {log.Fatalf("could not read message: %v", err)}log.Printf("offset:%d | key:%s | value:%s\n", m.Offset, string(m.Key), string(m.Value))}
}
go发送kafka消息:github.com/IBM/sarama
package mainimport ("fmt""log""time""github.com/IBM/sarama"
)func main() {config := sarama.NewConfig()config.Producer.Return.Successes = true config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 3 config.Net.SASL.Enable = falseconfig.Net.TLS.Enable = falseconfig.Version = sarama.MaxVersionproducer, err := sarama.NewSyncProducer([]string{"172.16.11.111:14818"}, config)if err != nil {log.Fatalf("创建生产者失败: %v", err)}defer producer.Close()message := &sarama.ProducerMessage{Topic: "test-topic",Key: sarama.StringEncoder("message-key"),Value: sarama.StringEncoder(fmt.Sprintf("Hello Kafka! %v", time.Now())),}partition, offset, err := producer.SendMessage(message)if err != nil {log.Fatalf("发送消息失败: %v", err)}fmt.Printf("消息发送成功! 分区: %d, 偏移量: %d\n", partition, offset)
}
go接收kafka消息:github.com/IBM/sarama
package mainimport ("context""fmt""log""os""os/signal""github.com/IBM/sarama"
)type Consumer struct{}func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {return nil
}func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {return nil
}func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for msg := range claim.Messages() {fmt.Printf("Topic:%s Partition:%d Offset:%d Value:%s\n",msg.Topic, msg.Partition, msg.Offset, string(msg.Value))session.MarkMessage(msg, "")}return nil
}func main() {brokers := []string{"172.16.11.111:14818"}groupID := "my-group" topics := []string{"test-topic"}config := sarama.NewConfig()config.Version = sarama.MaxVersion config.Consumer.Return.Errors = trueconfig.Consumer.Offsets.Initial = sarama.OffsetNewestconsumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)if err != nil {log.Fatalf("Error creating consumer group: %v", err)}defer consumerGroup.Close()consumer := &Consumer{}ctx, cancel := context.WithCancel(context.Background())defer cancel()go func() {for err := range consumerGroup.Errors() {log.Printf("Error: %v", err)}}()log.Println("Kafka consumer started...")go func() {sigchan := make(chan os.Signal, 1)signal.Notify(sigchan, os.Interrupt)<-sigchancancel()}()for {if err := consumerGroup.Consume(ctx, topics, consumer); err != nil {log.Printf("Error from consumer: %v", err)}if ctx.Err() != nil {return}}
}