当前位置: 首页 > news >正文

基于OpenEuler部署kafka消息队列

目录

一、环境准备

二、安装docker

1、配置阿里源

2、安装docker

3、编辑配置文件

4、重新加载 systemd 的配置

5、启动docker

三、Kafka的安装和使用

1、下载kafka和zookeeper镜像

2、启动zookeeper容器

 3、启动kafka容器

四、操作步骤详解

1. 连接到 ZooKeeper

2. 查看 Kafka 的核心节点

3、进入kafka容器里

4、回到zookeeper容器里查看所有topic(主题)

5、 创建topic名称为second,1个副本,分区2

6、回到zookeeper容器里查看

7、kafkar容器里查看first此topic信息

8、回到zookeeper容器里查看

9、删除topic里名称为second主题

10、 回到zookeeper容器查看second主题是否删除         成功删除topic里名称为second主题

11、修改分区数


一、环境准备

关闭防火墙和SElinux

systemctl disable --now firewalld
setenforce 0
sed -i 's/SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/config 

kafka服务器:IP 192.168.158.37

二、安装docker

1、配置阿里源

cat <<EOF >> /etc/yum.repos.d/docker-ce.repo 
[docker-ce-stable]
name=Docker CE Stable - $basearch
baseurl=https://mirrors.aliyun.com//docker-ce/linux/centos/9/x86_64/stable/
enabled=1
gpgcheck=1
gpgkey=https://mirrors.aliyun.com/docker-ce/linux/centos/gpg
EOF

2、安装docker

yum install -y docker-ce

3、编辑配置文件

vim /etc/docker/daemon.json
{"registry-mirrors": ["https://0vmzj3q6.mirror.aliyuncs.com","https://docker.m.daocloud.io","https://mirror.baidubce.com","https://dockerproxy.com","https://mirror.iscas.ac.cn","https://huecker.io","https://dockerhub.timeweb.cloud","https://noohub.ru","https://vlgh0kqj.mirror.aliyuncs.com"]
}

4、重新加载 systemd 的配置

 systemctl daemon-reload

5、启动docker

#设置docker开机自启
systemctl enable --now docker

三、Kafka的安装和使用

默认监听端口号

zookeeper  默认监听端口号 2181

Kafkam       默认监听端口号 9092

1、下载kafka和zookeeper镜像
# docker直接拉取kafka和zookeeper的镜像
docker pull wurstmeister/kafka
docker pull wurstmeister/zookeeper 
2、启动zookeeper容器

首先需要启动zookeeper,如果不先启动,启动kafka没有地方注册消息

[root@localhost ~]# docker run -it --name zookeeper -p 12181:2181 -d wurstmeister/zookeeper:latest
f13306aaeacdfa721bbaddf5627cf18412fb5b62a90383744a974ec2a5b6abe2
[root@localhost ~]# docker ps
CONTAINER ID   IMAGE                           COMMAND                   CREATED          STATUS                   PORTS                                                                       NAMES
f13306aaeacd   wurstmeister/zookeeper:latest   "/bin/sh -c '/usr/sb…"   11 seconds ago   Up 11 seconds            22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:12181->2181/tcp, [::]:12181->2181/tcp   zookeeper
 3、启动kafka容器

注意需要启动三台,注意端口的映射,都是映射到9092

第一台

docker run -it --name kafka01 -p 19092:9092 -d -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.158.37:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.158.37:19092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest

第二台

docker run -it --name kafka02 -p 19093:9092 -d -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=192.168.158.37:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.158.37:19093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest

第三台

docker run -it --name kafka03 -p 19094:9092 -d -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT=192.168.158.37:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.158.37:19094 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest

上面端口的映射注意都是映射到Kafka的9092端口上!否则将不能够连接!

[root@localhost ~]# docker ps
CONTAINER ID   IMAGE                           COMMAND                   CREATED              STATUS                 PORTS                                                                       NAMES
50c1c4800c0a   wurstmeister/kafka:latest       "start-kafka.sh"          41 seconds ago       Up 41 seconds          0.0.0.0:19094->9092/tcp, [::]:19094->9092/tcp                               kafka03
f73e354ae495   wurstmeister/kafka:latest       "start-kafka.sh"          51 seconds ago       Up 51 seconds          0.0.0.0:19093->9092/tcp, [::]:19093->9092/tcp                               kafka02
192055d53819   wurstmeister/kafka:latest       "start-kafka.sh"          About a minute ago   Up About a minute      0.0.0.0:19092->9092/tcp, [::]:19092->9092/tcp                               kafka01
f13306aaeacd   wurstmeister/zookeeper:latest   "/bin/sh -c '/usr/sb…"   4 hours ago          Up 4 hours             22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:12181->2181/tcp, [::]:12181->2181/tcp   zookeeper

四、操作步骤详解

1. 连接到 ZooKeeper
#进入zookeeper容器
[root@localhost ~]# docker exec -it zookeeper bash
root@f13306aaeacd:/opt/zookeeper-3.4.13# cd bin/#./zkCli.sh --server <ZooKeeper服务器IP:端口>
root@f13306aaeacd:/opt/zookeeper-3.4.13/bin# ./zkCli.sh -server 127.0.0.1:2181
。。。。省略。。
或者
root@f13306aaeacd:/opt/zookeeper-3.4.13/bin# ./zkCli.sh -server 192.168.158.37:12181
2. 查看 Kafka 的核心节点
# 查看所有活跃的Broker
[zk: 192.168.158.37:12181(CONNECTED) 0] ls /brokers/ids
[2, 1, 0]
3、进入kafka容器里
[root@localhost ~]# docker exec -it kafka0
kafka01  kafka02  kafka03  
[root@localhost ~]# docker exec -it kafka01 bash
root@192055d53819:/# ls
bin  boot  dev  etc  home  kafka  lib  lib64  media  mnt  opt  proc  root  run  sbin  srv  sys  tmp  usr  var
root@192055d53819:/# cd opt/kafka_2.13-2.8.1/bin/
root@192055d53819:/opt/kafka_2.13-2.8.1/bin# ls
connect-distributed.sh        kafka-dump-log.sh                    kafka-storage.sh
connect-mirror-maker.sh       kafka-features.sh                    kafka-streams-application-reset.sh
connect-standalone.sh         kafka-leader-election.sh             kafka-topics.sh
kafka-acls.sh                 kafka-log-dirs.sh                    kafka-verifiable-consumer.sh
kafka-broker-api-versions.sh  kafka-metadata-shell.sh              kafka-verifiable-producer.sh
kafka-cluster.sh              kafka-mirror-maker.sh                trogdor.sh
kafka-configs.sh              kafka-preferred-replica-election.sh  windows
kafka-console-consumer.sh     kafka-producer-perf-test.sh          zookeeper-security-migration.sh
kafka-console-producer.sh     kafka-reassign-partitions.sh         zookeeper-server-start.sh
kafka-consumer-groups.sh      kafka-replica-verification.sh        zookeeper-server-stop.sh
kafka-consumer-perf-test.sh   kafka-run-class.sh                   zookeeper-shell.sh
kafka-delegation-tokens.sh    kafka-server-start.sh
kafka-delete-records.sh       kafka-server-stop.sh

创建topic名称为first(主题),1个副本,分区3

root@192055d53819:/opt/kafka_2.13-2.8.1/bin# ./kafka-topics.sh --zookeeper 192.168.158.37:12181 --create --topic first --replication-factor 1 --partitions 3
Created topic first.
4、回到zookeeper容器里查看所有topic(主题)
[zk: 192.168.158.37:12181(CONNECTED) 1] ls /brokers/topics
[first]
5、 创建topic名称为second,1个副本,分区2
root@192055d53819:/opt/kafka_2.13-2.8.1/bin# ./kafka-topics.sh --zookeeper 192.168.158.37:12181 --create --topic second --replication-factor 1 --partitions 2
Created topic second.
6、回到zookeeper容器里查看
[zk: 192.168.158.37:12181(CONNECTED) 2] ls /brokers/topics
[second, first]
7、kafkar容器里查看first此topic信息
root@192055d53819:/opt/kafka_2.13-2.8.1/bin# ./kafka-topics.sh --zookeeper 192.168.158.37:12181 --describe --topic first
Topic: first    TopicId: YOkMY_PsS8quclVZfbjtJA PartitionCount: 3       ReplicationFactor: 1    Configs: Topic: first    Partition: 0    Leader: 0       Replicas: 0     Isr: 0Topic: first    Partition: 1    Leader: 1       Replicas: 1     Isr: 1Topic: first    Partition: 2    Leader: 2       Replicas: 2     Isr: 2

kafka容器里调用生产者生产消息

root@192055d53819:/opt/kafka_2.13-2.8.1/bin# ./kafka-console-producer.sh --broker-list 192.168.158.37:19092,192.168.158.37:19093,192.168.158.37:19094 --topic first
>ulimit
>redis
>mysql
>kafka
>kubernetes

调用消费者消费消息,from-beginning表示读取全部的消息,注意它的顺序是乱的

root@192055d53819:/opt/kafka_2.13-2.8.1/bin# ./kafka-console-consumer.sh --bootstrap-server 192.168.158.37:19092,192.168.158.37:19093,192.168.158.37:19094 --topic first --from-beginning
redis
kubernetes
ulimit
mysql
kafka

8、回到zookeeper容器里查看

      就会看到consumer_offsets,消费者消费消息的变量就存到topic主题里边了

[zk: 192.168.158.37:12181(CONNECTED) 3] ls /brokers/topics
[__consumer_offsets, second, first]
9、删除topic里名称为second主题

      注意:这里的删除是没有真正的删除

root@192055d53819:/opt/kafka_2.13-2.8.1/bin# ./kafka-topics.sh --zookeeper 192.168.158.37:12181 --delete --topic second
Topic second is marked for deletion.	 #第二个主题已标记为删除。
Note: This will have no impact if delete.topic.enable is not set to true.	#注意:如果 delete.topic.enable 未设置为 true,此操作将不会产生任何影响可以看到删除的时候只是被标记为删除marked for deletion并没有真正的删除,如果需要真正的删除,需要再config/server.properties中设置delete.topic.enable=true
10、 回到zookeeper容器查看second主题是否删除
         成功删除topic里名称为second主题
[zk: 192.168.158.37:12181(CONNECTED) 6] ls /brokers/topics
[__consumer_offsets, first]
11、修改分区数

        将分区扩展到3
        先查看现在的分区数,PartitionCount(分区有三个)3

root@192055d53819:/opt/kafka_2.13-2.8.1/bin# ./kafka-topics.sh --zookeeper 192.168.158.37:12181 --describe --topic first
Topic: first    TopicId: YOkMY_PsS8quclVZfbjtJA PartitionCount: 3       ReplicationFactor: 1    Configs: Topic: first    Partition: 0    Leader: 0       Replicas: 0     Isr: 0Topic: first    Partition: 1    Leader: 1       Replicas: 1     Isr: 1Topic: first    Partition: 2    Leader: 2       Replicas: 2     Isr: 2

将分区扩展到4

root@192055d53819:/opt/kafka_2.13-2.8.1/bin# ./kafka-topics.sh --zookeeper 192.168.158.37:12181 --alter --topic first --partitions 4
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!警告:如果为有键的主题增加分区,消息的分区逻辑或顺序将会受到影响
分区添加成功

查看分区已扩展为4

root@192055d53819:/opt/kafka_2.13-2.8.1/bin# ./kafka-topics.sh --zookeeper 192.168.158.37:12181 --describe --topic first
Topic: first    TopicId: YOkMY_PsS8quclVZfbjtJA PartitionCount: 4       ReplicationFactor: 1    Configs: Topic: first    Partition: 0    Leader: 0       Replicas: 0     Isr: 0Topic: first    Partition: 1    Leader: 1       Replicas: 1     Isr: 1Topic: first    Partition: 2    Leader: 2       Replicas: 2     Isr: 2Topic: first    Partition: 3    Leader: 0       Replicas: 0     Isr: 0

http://www.dtcms.com/a/394920.html

相关文章:

  • Flink TCP Channel复用:NettyServer、NettyProtocol详解
  • Sass和Less的区别【前端】
  • Kotlin互斥锁Mutex协程withLock实现同步
  • Seedream 4.0 测评|AI 人生重开:从极速创作到叙事实践
  • vscode clangd 保姆教程
  • MySQL时间戳转换
  • 【Spark+Hive+hadoop】基于spark+hadoop基于大数据的人口普查收入数据分析与可视化系统
  • 分布式专题——17 ZooKeeper经典应用场景实战(下)
  • TDengine 2.6 taosdump数据导出备份 导入恢复
  • 探索 Yjs 协同应用场景 - 分布式撤销管理
  • 【软考中级 - 软件设计师 - 基础知识】数据结构之栈与队列​
  • LeetCode 385 迷你语法分析器 Swift 题解:从字符串到嵌套数据结构的解析过程
  • windows系统使用sdkman管理java的jdk版本,WSL和Git Bash哪个更能方便管理jdk版本
  • 生产环境K8S的etcd备份脚本
  • Mac电脑多平台Git账号配置
  • Etcd详解:Kubernetes的大脑与记忆库
  • 深刻理解PyTorch中RNN(循环神经网络)的output和hn
  • 大模型如何赋能写作:从创作到 MCP 自动发布的全链路解析
  • C++设计模式之创建型模式:工厂方法模式(Factory Method)
  • 传输层协议——UDP/TCP
  • 三板汇茶咖空间签约“可信资产IPO与数链金融RWA”链改2.0项目联合实验室
  • 【MySQL】MySQL 表文件误删导致启动失败及无法外部连接解决方案
  • LVS简介
  • 如何将联系人从iPhone转移到iPhone的7种方法
  • 『 MySQL数据库 』MySQL复习(一)
  • 3005. 最大频率元素计数
  • ACP(七)优化RAG应用提升问答准确度
  • 鸿蒙:使用bindPopup实现气泡弹窗
  • Langchan4j 框架 AI 无限循环调用文件创建工具解决方案记录
  • Python GIS 开发里最核心的4个基础组件(理论+实操篇)