当前位置: 首页 > news >正文

Kafka-2 Docker 部署单节点环境(SpringBoot验证)

Kafka-2 Docker 部署单节点环境(SpringBoot验证)

Kafka 的发行版

我们知道 Linux 系统有很多的发行版,Centos、Ubuntu 等等,包括 Jdk 也存在这种情况,Kafka 也有类似“发行版”的概念,不过这种说法在 Kafka 生态圈非常陌生。

Apache Kafka

最“正宗”的 Kafka,自 Kafka 开源伊始,它便在 Apache 基金会孵化并最终毕业成为顶级项目,它也被称为社区版 Kafka。其他的发行版要么是原封不动的继承了 Apache Kafka,要么在此之上拓展了新功能。

Confluent Kafka

Confluent 公司主要从事商业化 Kafka 工具开发,并在此基础上发布了 Confluent Kafka。Confluent Kafka 提供了一些 Apache Kafka 没有的高级特性,比如跨数据中心备份、Schema 注册中心以及集群监控工具等。

Cloudera/Hortonworks Kafka

Cloudera 提供的 CDH 和 Hortonworks 提供的 HDP 是非常著名的大数据平台(其中都集成了 Apache Kafka),里面集成了目前主流的大数据框架,能够帮助用户实现从分布式存储、集群调度、流处理到机器学习、实时数据库等全方位的数据处理。

Docker 部署 Kafka

Kafka 在早期版本中为了分布式的一些特性,需要配合 Zookeeper 一起部署使用,但在后期逐渐开始摆脱对于 Zookeeper 的依赖,随着 KIP-500 提案的推进,Kafka将逐步去除对ZooKeeper的依赖,转而使用社区自研的基于Raft算法的共识机制来实现这些功能。

Kafka 在 2.8 版本就开始尽量移除 Zookeeper,3.x 版本为了兼容历史版本支持两种方式,而 4.x 版本正式移除 Zookeeper,必须使用 KRaft 模式。

为了简化部署以及使用新版本,直接部署不需要 Zookeeper 的版本。

docker-compose.yml 文件
name: "kafka"
services:kafka:image: 'bitnami/kafka:3.6'container_name: kafkarestart: alwaysulimits:nofile:soft: 65536hard: 65536environment:- TZ=Asia/Shanghai- KAFKA_CFG_NODE_ID=0- KAFKA_CFG_PROCESS_ROLES=controller,broker- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://[your server ip]:9094- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLERports:- '9092:9092'- '9094:9094'

17 行中的 [your server ip] 需要替换为对外进行连接的 ip,例如该运行 Kafka 的机器是 192.168.22.163,那么这个 IP 就是 192.168.22.163

简单解释一下这些环境变量的含义:

  • TZ=Asia/Shanghai
    作用:设置容器内的时区为 Asia/Shanghai。这是为了确保 Kafka 和容器内部的时间同步,特别是在有时间戳和日志记录时,确保时区一致。
  • KAFKA_CFG_NODE_ID=0
    作用:设置 Kafka 节点的唯一标识符。通常在 Kafka 集群中,每个 Kafka broker 都会有一个唯一的 node.id,在这里设定为 0,这通常是为单节点或测试环境配置。对于生产环境,你可能会将其设置为不同的值,以避免冲突。
  • KAFKA_CFG_PROCESS_ROLES=controller,broker
    作用:定义 Kafka 进程的角色。Kafka 支持多个角色,例如:
    broker:表示 Kafka 作为一个消息代理的角色。
    controller:表示 Kafka 控制器的角色,负责集群的元数据管理。
    在这里,Kafka 配置同时作为 broker 和 controller 角色。通常,只有一个 Kafka broker 需要成为控制器角色(领导者),但在一些情况下,多个 broker 可以共同担当控制器角色。
  • KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
    作用:设置 Kafka 控制器的选举成员(quorum voters)。Kafka 控制器是集群中负责协调分区副本和领导者选举的组件。0@kafka:9093 表示节点 0 的控制器进程在 kafka:9093 处运行,这是 Kafka 节点的控制端口。
    控制器选举机制保证了只有一个控制器在集群中有效,这个配置用于确定控制器的可用性。
  • KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
    作用:设置 Kafka 启动时监听的端口和协议:
    PLAINTEXT://:9092:指定 Kafka 以明文协议(PLAINTEXT)监听 9092 端口,这通常是 Kafka 的默认监听端口。
    CONTROLLER://:9093:设置 Kafka 控制器的监听端口为 9093。
    EXTERNAL://:9094:定义一个外部访问端口,通常用于外部客户端连接 Kafka 集群。在这个配置中,9094 用于外部客户端与 Kafka 集群进行通信。
  • KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://[your server ip]:9094
    作用:设置 Kafka 向客户端广告(暴露)哪些端口,用于外部连接。它会告诉客户端如何访问 Kafka:
    PLAINTEXT://kafka:9092:客户端通过 kafka 容器名称访问 Kafka 集群的 9092 端口,通常用于 Docker 内部容器间通信。
    EXTERNAL://[your server ip]:9094:对于外部客户端,暴露 [your server ip]:9094 这个公网 IP 和端口,允许外部系统连接 Kafka。
  • KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
    作用:设置 Kafka 各个监听器的安全协议。这允许你为每个监听器指定不同的安全协议:
    CONTROLLER:PLAINTEXT:控制器监听使用 PLAINTEXT 协议(不加密)。
    PLAINTEXT:PLAINTEXT:表示默认的 PLAINTEXT 监听器使用 PLAINTEXT 协议。
    EXTERNAL:PLAINTEXT:外部访问使用 PLAINTEXT 协议,不加密。
  • KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    作用:指定 Kafka 控制器监听器的名称。在 Kafka 配置中,controller 角色使用一个专门的监听器端口(在 KAFKA_CFG_LISTENERS 中配置为 CONTROLLER://:9093)。这个配置指定了 Kafka 控制器使用 CONTROLLER 监听器。
  • ports
    作用:定义容器对外暴露的端口映射:
    ‘9092:9092’:将容器的 9092 端口映射到宿主机的 9092 端口。此端口用于客户端连接 Kafka。
    ‘9094:9094’:将容器的 9094 端口映射到宿主机的 9094 端口,通常用于外部客户端的访问。

执行 docker compose up -d 即可

下载不下来镜像怎么办

由于一些网络的限制,可能没办法 pull 下来镜像,因为镜像实际上也就是一个文件,第一种方式我们可以显式指定 pull 国内网站的镜像

docker pull swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/bitnami/kafka:3.6

下载下来的 tag 是很长的

[root@iZm5e8izu4t87flkkvb649Z ~]# docker images
REPOSITORY                                                         TAG       IMAGE ID       CREATED         SIZE
swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/bitnami/kafka   3.6       4de139ca4e3a   16 months ago   642MB

我们重新打一个 tag 即可

docker tag  swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/bitnami/kafka:3.6  docker.io/bitnami/kafka:3.6
[root@iZm5e8izu4t87flkkvb649Z ~]# docker images
REPOSITORY                                                         TAG       IMAGE ID       CREATED         SIZE
bitnami/kafka                                                      3.6       4de139ca4e3a   16 months ago   642MB
swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/bitnami/kafka   3.6       4de139ca4e3a   16 months ago   642MB
验证 Kafka 运行情况

首先进入到 Kafka 容器中: docker exec -it 608ff8a412a1 bash

  1. 查看所有主题(Topic):
kafka-topics.sh --list --bootstrap-server localhost:9092
  1. 向 [test-topic-xsdl] 主题发送消息:
kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic-xsdl
$ kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic-xsdl
>this
[2025-10-15 13:46:54,462] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test-topic-xsdl=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
>is
>test-topic-xsdl
>topic

在第 2 行输入第一次指令后由于没有这个 topic 所以会进行警告,输入完成后可以 ctrl+c 退出

  1. 消费者查看 [test-topic-xsdl] 主题的消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic-xsdl --from-beginning
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic-xsdl --from-beginning
.......
this
is
test-topic-xsdl
topic

SpringBoot 程序中如何使用 Kafka

我使用的是:SpringBoot 2.6.13,JDK 11

引入 Pom 依赖
        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
修改配置文件

192.168.22.163 是我部署 Kafka 的机器 ip,自行替换

server:port: 8080spring:kafka:listener:type: singleproducer:bootstrap-servers: 192.168.22.163:9094key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:bootstrap-servers: 192.168.22.163:9094group-id: test-groupkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
编写相关代码
@RestController
@RequestMapping("/kafka")
public class KafkaController {@Autowiredprivate KafkaProducerService kafkaProducerService;@GetMapping("/send")public String sendMessage(@RequestParam String message) {kafkaProducerService.sendMessage(message);return "Message sent: " + message;}}

生产者

@Service("kafkaProducerService")
public class KafkaProducerService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send("test-topic", message);System.out.println("Sent message: " + message);}}

消费者

@Service("kafkaConsumerService")
public class KafkaConsumerService {@KafkaListener(topics = "test-topic", groupId = "test-group")public void listen(String message) {System.out.println("Received message: " + message);}}

启动后测试接口:GET http://localhost:8080/kafka/send?message=hello,springboot-kafka

观察控制台出现如下输出:

Sent message: hello,springboot-kafka
2025-10-15 14:01:06.827  INFO 28740 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1] ProducerId set to 1000 with epoch 0
Received message: hello,springboot-kafka

其实已经可以看出 Kafka 的作用了,代码中生产者只是产生了一条消息到 Kafka,消费者就收到了消息并进行了消费,生产者无需直接调用消费者方法,减少了类之间的耦合。

http://www.dtcms.com/a/487363.html

相关文章:

  • 从0开始了解kafka《第二篇 kafka的安装、管理和配置》
  • 02 SQL数据检索入门 - SELECT语句详解
  • 从分词器构建到强化学习:nanochat开源项目下载与部署全流程教程,教你一步步训练ChatGPT语言模型
  • 长安镇仿做网站注册网站好的平台
  • 加强公司内部网站建设正邦设计公司
  • 巩义网站建设价格怎么注册个人工作室
  • 四川网站建设益友网站地图怎么用
  • 制作一个响应式网站开发工具景观设计公司名称
  • 网站设计网站开发优化欢迎你的加入
  • MySql 基本操作指令大全
  • 军用网站建设ui设计一个月挣多少钱
  • 海宁市住房和城乡建设网站网站源码程序修改
  • 做足球预测的网站小程序开发教程视频
  • 消息队列相关知识总结
  • Kafka集群Broker一点通
  • 怎样看网站建设制作方松北区建设局网站
  • 锂电电芯卷绕提质增效!光子精密边缘传感器+颜色传感器组合方案
  • 堆的 shift down 操作详解
  • QT(day1)
  • 天津做网站的公司怎么样google手机官网
  • 门户网站建设滞后微信小程序功能开发
  • miniconda 配置问题 ,未解之谜
  • 西安做营销型网站建设男性产品哪个网站可以做
  • 开源CICD工具深度横评,Jenkins vs Arbess哪个更适合你?
  • 厦门建设工程招标中心网站ps 做ui比较好的网站有哪些
  • 犀牛云网站做的怎么样深圳企业网站建设服务
  • 有关maven的一些知识点
  • 低代码建站平台汽车电子商务网站建设规划书
  • 数据的“点阵密码”:用散点图看出相关性
  • 微信游戏网站开发绵阳市网站建立