Kafka的下载安装
目录
一、前期准备
1、查看网卡:
2、配置静态IP
3、设置主机名
4、配置IP与主机名映射
5、关闭防火墙
6、配置免密登录
二、JDK的安装
三、Zookeeper的安装
四、Kafka的安装
1、Kafka的下载安装
2、修改配置文件
4、分发文件
5、修改其他节点broker.id及advertised.listeners
6、修改zookeeper中的 myid
7、启动ZK集群--启动停止脚本
8、启动Kafka集群--启动停止脚本
四、Kafka命令行操作
1、主题命令行操作
1.1.查看操作主题命令参数
1.2.查看当前服务器中的所有topic
1.3.创建topic
1.4.查看first主题的详情
1.5.修改分区数(注意:分区数只能增加,不能减少)
1.6.再次查看first主题的详情
1.7.删除topic(学生自己演示)
2、生产者命令行操作
2.1.查看操作生产者命令参数
2.2.发送消息
3、消费者命令行操作
3.1.查看操作消费者命令参数
3.2.消费消息
Kafka 是基于 Java 的,必须先安装 JDK。 此次我们以 kafka_2.12-3.3.1.tgz 即 Kafka3.3.1 为案例,安装jdk1.8。
⚠️ 注意:Kafka 3.9.0 要求本地必须安装 JDK 17 或以上版本。JDK 8 和 11 已不再被官方支持。
一、前期准备
1、查看网卡:
2、配置静态IP
vi /etc/sysconfig/network-scripts/ifcfg-ens32 ---- 根据自己网卡设置。
3、设置主机名
hostnamectl --static set-hostname 主机名
例如:
hostnamectl --static set-hostname hadoop001
4、配置IP与主机名映射
vi /etc/hosts
5、关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
6、配置免密登录
传送门
二、JDK的安装
传送门
三、Zookeeper的安装
传送门
四、Kafka的安装
1、Kafka的下载安装
1.1. 下载
https://archive.apache.org/dist/kafka/3.3.1/
下载 kafka_2.12-3.3.1.tgz 安装包1.2 上传
使用xshell上传到指定安装路径此处是安装路径是 /opt/module
1.3 解压重命名
tar -zxvf kafka_2.12-3.3.1.tgz
mv kafka_2.12-3.3.1 kafka
1.4 配置环境变量
vi /etc/profile
export JAVA_HOME=/opt/module/java
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$JAVA_HOME/bin:$KAFKA_HOME/bin
1.5 加载环境变量
source /etc/profile
验证环境变量是否生效:
env | grep HOME
env | grep PATH
2、修改配置文件
vi /opt/module/kafka/config/server.properties
#broker的全局唯一编号,不能重复,只能是数字。
broker.id=0
#broker对外暴露的IP和端口 (每个节点单独配置)
advertised.listeners=PLAINTEXT://hadoop001:9092
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个topic创建时的副本数,默认时1个副本
offsets.topic.replication.factor=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个segment文件的大小,默认最大1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认5分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop001:2181,hadoop002:2181,hadoop003:2181/kafka
4、分发文件
scp -r /etc/profile root@hadoop002:/etc/profile
scp -r /etc/profile root@hadoop003:/etc/profile
scp -r /opt/module/java root@hadoop002:/opt/module/java
scp -r /opt/module/java root@hadoop003:/opt/module/java
scp -r /opt/module/zookeeper root@hadoop002:/opt/module/zookeeper
scp -r /opt/module/zookeeper root@hadoop003:/opt/module/zookeeper
scp -r /opt/module/kafka root@hadoop002:/opt/module/kafka
scp -r /opt/module/kafka root@hadoop003:/opt/module/kafka
让三台机器文件生效
ssh hadoop001 "source /etc/profile"
ssh hadoop002 "source /etc/profile"
ssh hadoop003 "source /etc/profile"
5、修改其他节点broker.id及advertised.listeners
修改 hadoop002 的
vi /opt/module/kafka/config/server.properties
broker.id=1
#broker对外暴露的IP和端口 (每个节点单独配置)
advertised.listeners=PLAINTEXT://hadoop002:9092
修改 hadoop003 的
vi /opt/module/kafka/config/server.properties
broker.id=2
#broker对外暴露的IP和端口 (每个节点单独配置)
advertised.listeners=PLAINTEXT://hadoop003:9092
或者使用下面脚本
ssh hadoop002 "sed -i 's/^broker.id=0$/broker.id=1/' /opt/module/kafka/config/server.properties"
ssh hadoop003 "sed -i 's/^broker.id=0$/broker.id=2/' /opt/module/kafka/config/server.properties"ssh hadoop002 "sed -i 's/^advertised.listeners=PLAINTEXT:\/\/hadoop001:9092$/advertised.listeners=PLAINTEXT:\/\/hadoop002:9092/' /opt/module/kafka/config/server.properties"
ssh hadoop003 "sed -i 's/^advertised.listeners=PLAINTEXT:\/\/hadoop001:9092$/advertised.listeners=PLAINTEXT:\/\/hadoop003:9092/' /opt/module/kafka/config/server.properties"查看效果
ssh hadoop001 "grep -E 'broker.id|advertised.listeners|log.dirs' /opt/module/kafka/config/server.properties"
ssh hadoop002 "grep -E 'broker.id|advertised.listeners|log.dirs' /opt/module/kafka/config/server.properties"
ssh hadoop003 "grep -E 'broker.id|advertised.listeners|log.dirs' /opt/module/kafka/config/server.properties"
6、修改zookeeper中的 myid
修改 hadoop002 的 /opt/module/zookeeper/zkData/myid 内容为 2
vi /opt/module/zookeeper/zkData/myid
2
修改 hadoop003 的 /opt/module/zookeeper/zkData/myid 内容为 3
vi /opt/module/zookeeper/zkData/myid
3
或者使用下面脚本
ssh hadoop002 "echo '2' > /opt/module/zookeeper/zkData/myid"
ssh hadoop003 "echo '3' > /opt/module/zookeeper/zkData/myid"查看效果
ssh hadoop001 "cat /opt/module/zookeeper/zkData/myid"
ssh hadoop002 "cat /opt/module/zookeeper/zkData/myid"
ssh hadoop003 "cat /opt/module/zookeeper/zkData/myid"
7、启动ZK集群--启动停止脚本
touch /usr/bin/zkall.sh
chmod 777 /usr/bin/zkall.sh
vi /usr/bin/zkall.sh
#!/bin/bashcase $1 in "start"){for i in hadoop001 hadoop002 hadoop003doecho ---------- zookeeper $i 启动 ------------ssh $i "source /etc/profile; /opt/module/zookeeper/bin/zkServer.sh start"done };; "stop"){for i in hadoop001 hadoop002 hadoop003doecho ---------- zookeeper $i 停止 ------------ ssh $i "source /etc/profile; /opt/module/zookeeper/bin/zkServer.sh stop"done };; "status"){for i in hadoop001 hadoop002 hadoop003doecho ---------- zookeeper $i 状态 ------------ ssh $i "source /etc/profile; /opt/module/zookeeper/bin/zkServer.sh status"done };; esac
启动:/usr/bin/zkall.sh start
停止:/usr/bin/zkall.sh stop
状态:/usr/bin/zkall.sh status
8、启动Kafka集群--启动停止脚本
touch /usr/bin/kfall.sh
chmod 777 /usr/bin/kfall.sh
vi /usr/bin/kfall.sh
#! /bin/bashcase $1 in "start"){for i in hadoop001 hadoop002 hadoop003doecho " --------启动 $i Kafka-------"ssh $i "source /etc/profile; /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"done };; "stop"){for i in hadoop001 hadoop002 hadoop003doecho " --------停止 $i Kafka-------"ssh $i "source /etc/profile; /opt/module/kafka/bin/kafka-server-stop.sh "done };; esac
启动:/usr/bin/kfall.sh start
停止:/usr/bin/kfall.sh stop
四、Kafka命令行操作
1、主题命令行操作
1.1.查看操作主题命令参数
bin/kafka-topics.sh
参数 | 描述 |
--bootstrap-server <String: server toconnect to> | 连接的Kafka Broker主机名称和端口号。 |
--topic <String: topic> | 操作的topic名称。 |
--create | 创建主题。 |
--delete | 删除主题。 |
--alter | 修改主题。 |
--list | 查看所有主题。 |
--describe | 查看主题详细描述。 |
--partitions <Integer: # of partitions> | 设置分区数。 |
--replication-factor<Integer: replication factor> | 设置分区副本。 |
--config <String: name=value> | 更新系统默认的配置。 |
1.2.查看当前服务器中的所有topic
在任意一台安装zk的机器上都可以启动客户端
cd /opt/module/kafka/
bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --list
1.3.创建topic
cd /opt/module/kafka/
bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --create --partitions 1 --replication-factor 3 --topic first
选项说明:
--topic 定义topic名
--replication-factor 定义副本数
--partitions 定义分区数
1.4.查看first主题的详情
bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --describe --topic first
1.5.修改分区数(注意:分区数只能增加,不能减少)
bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --alter --topic first --partitions 3
1.6.再次查看first主题的详情
bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --describe --topic first
1.7.删除topic(学生自己演示)
bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --delete --topic first
2、生产者命令行操作
2.1.查看操作生产者命令参数
bin/kafka-console-producer.sh
参数 | 描述 |
--bootstrap-server <String: server toconnect to> | 连接的Kafka Broker主机名称和端口号。 |
--topic <String: topic> | 操作的topic名称。 |
2.2.发送消息
bin/kafka-console-producer.sh --bootstrap-server hadoop001:9092 --topic first
>hello world
>hello beijing
>hello china
>welecome to china
3、消费者命令行操作
3.1.查看操作消费者命令参数
bin/kafka-console-consumer.sh
参数 | 描述 |
--bootstrap-server <String: server toconnect to> | 连接的Kafka Broker主机名称和端口号。 |
--topic <String: topic> | 操作的topic名称。 |
--from-beginning | 从头开始消费。 |
--group <String: consumer group id> | 指定消费者组名称。 |
3.2.消费消息
(1)消费first主题中的数据
bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic first
(2)把主题中所有的数据都读取出来(包括历史数据)
bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --from-beginning --topic first