当前位置: 首页 > news >正文

Kafka的下载安装

目录

 一、前期准备

1、查看网卡:

2、配置静态IP

3、设置主机名

4、配置IP与主机名映射

5、关闭防火墙

6、配置免密登录

二、JDK的安装

三、Zookeeper的安装

四、Kafka的安装

1、Kafka的下载安装 

2、修改配置文件

4、分发文件

5、修改其他节点broker.id及advertised.listeners

6、修改zookeeper中的 myid

7、启动ZK集群--启动停止脚本

8、启动Kafka集群--启动停止脚本

四、Kafka命令行操作

1、主题命令行操作

1.1.查看操作主题命令参数

1.2.查看当前服务器中的所有topic

1.3.创建topic

1.4.查看first主题的详情

1.5.修改分区数(注意:分区数只能增加,不能减少)

1.6.再次查看first主题的详情

1.7.删除topic(学生自己演示)

2、生产者命令行操作

2.1.查看操作生产者命令参数

2.2.发送消息

3、消费者命令行操作

3.1.查看操作消费者命令参数

3.2.消费消息


       Kafka 是基于 Java 的,必须先安装 JDK。  此次我们以 kafka_2.12-3.3.1.tgz  即 Kafka3.3.1 为案例,安装jdk1.8。

⚠️ 注意:Kafka 3.9.0 要求本地必须安装 JDK 17 或以上版本。JDK 8 和 11 已不再被官方支持。
     

 一、前期准备

1、查看网卡:

2、配置静态IP

vi /etc/sysconfig/network-scripts/ifcfg-ens32  ----  根据自己网卡设置。

3、设置主机名

hostnamectl --static set-hostname  主机名

例如:

hostnamectl --static set-hostname  hadoop001

4、配置IP与主机名映射

vi /etc/hosts

5、关闭防火墙

systemctl stop firewalld

systemctl disable firewalld

6、配置免密登录

传送门

二、JDK的安装

传送门

三、Zookeeper的安装

传送门

四、Kafka的安装

1、Kafka的下载安装 

1.1. 下载

 https://archive.apache.org/dist/kafka/3.3.1/
​下载  kafka_2.12-3.3.1.tgz 安装包

1.2 上传
使用xshell上传到指定安装路径

此处是安装路径是 /opt/module

​​

1.3 解压重命名

tar -zxvf kafka_2.12-3.3.1.tgz

mv kafka_2.12-3.3.1 kafka

​​

1.4 配置环境变量

vi  /etc/profile

export JAVA_HOME=/opt/module/java

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export KAFKA_HOME=/opt/module/kafka

export PATH=$PATH:$JAVA_HOME/bin:$KAFKA_HOME/bin

1.5 加载环境变量

source  /etc/profile

验证环境变量是否生效:

env | grep HOME

env | grep PATH

2、修改配置文件

vi /opt/module/kafka/config/server.properties

#broker的全局唯一编号,不能重复,只能是数字。

broker.id=0

#broker对外暴露的IP和端口 (每个节点单独配置)

advertised.listeners=PLAINTEXT://hadoop001:9092

#处理网络请求的线程数量

num.network.threads=3

#用来处理磁盘IO的线程数量

num.io.threads=8

#发送套接字的缓冲区大小

socket.send.buffer.bytes=102400

#接收套接字的缓冲区大小

socket.receive.buffer.bytes=102400

#请求套接字的缓冲区大小

socket.request.max.bytes=104857600

#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分隔

log.dirs=/opt/module/kafka/datas

#topic在当前broker上的分区个数

num.partitions=1

#用来恢复和清理data下数据的线程数量

num.recovery.threads.per.data.dir=1

# 每个topic创建时的副本数,默认时1个副本

offsets.topic.replication.factor=1

#segment文件保留的最长时间,超时将被删除

log.retention.hours=168

#每个segment文件的大小,默认最大1G

log.segment.bytes=1073741824

# 检查过期数据的时间,默认5分钟检查一次是否数据过期

log.retention.check.interval.ms=300000

#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)

zookeeper.connect=hadoop001:2181,hadoop002:2181,hadoop003:2181/kafka

4、分发文件

scp -r  /etc/profile  root@hadoop002:/etc/profile

scp -r  /etc/profile  root@hadoop003:/etc/profile

scp -r  /opt/module/java root@hadoop002:/opt/module/java 

scp -r  /opt/module/java root@hadoop003:/opt/module/java 

scp -r  /opt/module/zookeeper  root@hadoop002:/opt/module/zookeeper

scp -r  /opt/module/zookeeper  root@hadoop003:/opt/module/zookeeper

scp -r  /opt/module/kafka root@hadoop002:/opt/module/kafka

scp -r  /opt/module/kafka root@hadoop003:/opt/module/kafka

让三台机器文件生效

ssh hadoop001 "source /etc/profile"
ssh hadoop002 "source /etc/profile"
ssh hadoop003 "source /etc/profile"

5、修改其他节点broker.idadvertised.listeners

修改 hadoop002 的

vi /opt/module/kafka/config/server.properties

broker.id=1

#broker对外暴露的IP和端口 (每个节点单独配置)

advertised.listeners=PLAINTEXT://hadoop002:9092

修改 hadoop003 的

vi /opt/module/kafka/config/server.properties

broker.id=2

#broker对外暴露的IP和端口 (每个节点单独配置)

advertised.listeners=PLAINTEXT://hadoop003:9092

或者使用下面脚本

ssh hadoop002 "sed -i 's/^broker.id=0$/broker.id=1/' /opt/module/kafka/config/server.properties"
ssh hadoop003 "sed -i 's/^broker.id=0$/broker.id=2/' /opt/module/kafka/config/server.properties"

ssh hadoop002 "sed -i 's/^advertised.listeners=PLAINTEXT:\/\/hadoop001:9092$/advertised.listeners=PLAINTEXT:\/\/hadoop002:9092/' /opt/module/kafka/config/server.properties"
ssh hadoop003 "sed -i 's/^advertised.listeners=PLAINTEXT:\/\/hadoop001:9092$/advertised.listeners=PLAINTEXT:\/\/hadoop003:9092/' /opt/module/kafka/config/server.properties"

查看效果

ssh hadoop001 "grep -E 'broker.id|advertised.listeners|log.dirs' /opt/module/kafka/config/server.properties"

ssh hadoop002 "grep -E 'broker.id|advertised.listeners|log.dirs' /opt/module/kafka/config/server.properties"

ssh hadoop003 "grep -E 'broker.id|advertised.listeners|log.dirs' /opt/module/kafka/config/server.properties"

6、修改zookeeper中的 myid

修改 hadoop002 的 /opt/module/zookeeper/zkData/myid 内容为 2

vi /opt/module/zookeeper/zkData/myid

2

修改 hadoop003 的 /opt/module/zookeeper/zkData/myid 内容为 3

vi /opt/module/zookeeper/zkData/myid

3

或者使用下面脚本

ssh hadoop002 "echo '2' > /opt/module/zookeeper/zkData/myid"
ssh hadoop003 "echo '3' > /opt/module/zookeeper/zkData/myid"

查看效果

ssh hadoop001 "cat /opt/module/zookeeper/zkData/myid"

ssh hadoop002 "cat /opt/module/zookeeper/zkData/myid"

ssh hadoop003 "cat /opt/module/zookeeper/zkData/myid"

7、启动ZK集群--启动停止脚本

touch /usr/bin/zkall.sh

chmod 777 /usr/bin/zkall.sh

vi /usr/bin/zkall.sh

#!/bin/bashcase $1 in
"start"){for i in hadoop001 hadoop002 hadoop003doecho ---------- zookeeper $i 启动 ------------ssh $i "source /etc/profile; /opt/module/zookeeper/bin/zkServer.sh start"done
};;
"stop"){for i in hadoop001 hadoop002 hadoop003doecho ---------- zookeeper $i 停止 ------------    ssh $i "source /etc/profile; /opt/module/zookeeper/bin/zkServer.sh stop"done
};;
"status"){for i in hadoop001 hadoop002 hadoop003doecho ---------- zookeeper $i 状态 ------------    ssh $i "source /etc/profile; /opt/module/zookeeper/bin/zkServer.sh status"done
};;
esac

启动:/usr/bin/zkall.sh start

停止:/usr/bin/zkall.sh stop

状态:/usr/bin/zkall.sh status

8、启动Kafka集群--启动停止脚本

touch /usr/bin/kfall.sh

chmod 777 /usr/bin/kfall.sh

vi /usr/bin/kfall.sh

#! /bin/bashcase $1 in
"start"){for i in hadoop001 hadoop002 hadoop003doecho " --------启动 $i Kafka-------"ssh $i "source /etc/profile; /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"done
};;
"stop"){for i in hadoop001 hadoop002 hadoop003doecho " --------停止 $i Kafka-------"ssh $i "source /etc/profile; /opt/module/kafka/bin/kafka-server-stop.sh "done
};;
esac

启动:/usr/bin/kfall.sh start

停止:/usr/bin/kfall.sh stop

四、Kafka命令行操作

1、主题命令行操作

1.1.查看操作主题命令参数

        bin/kafka-topics.sh

参数

描述

--bootstrap-server <String: server toconnect to>

连接的Kafka Broker主机名称和端口号。

--topic <String: topic>

操作的topic名称。

--create

创建主题。

--delete

删除主题。

--alter

修改主题。

--list

查看所有主题。

--describe

查看主题详细描述。

--partitions <Integer: # of partitions>

设置分区数。

--replication-factor<Integer: replication factor>

设置分区副本。

--config <String: name=value>

更新系统默认的配置。

1.2.查看当前服务器中的所有topic

在任意一台安装zk的机器上都可以启动客户端

cd /opt/module/kafka/

bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --list

1.3.创建topic

cd /opt/module/kafka/

bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --create --partitions 1 --replication-factor 3 --topic first

选项说明:

--topic 定义topic名

--replication-factor  定义副本数

--partitions  定义分区数

1.4.查看first主题的详情

bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --describe --topic first

1.5.修改分区数(注意:分区数只能增加,不能减少)

bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --alter --topic first --partitions 3

1.6.再次查看first主题的详情

bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --describe --topic first

1.7.删除topic(学生自己演示)

bin/kafka-topics.sh --bootstrap-server hadoop001:9092 --delete --topic first

2、生产者命令行操作

2.1.查看操作生产者命令参数

bin/kafka-console-producer.sh

参数

描述

--bootstrap-server <String: server toconnect to>

连接的Kafka Broker主机名称和端口号。

--topic <String: topic>

操作的topic名称。

2.2.发送消息

bin/kafka-console-producer.sh --bootstrap-server hadoop001:9092 --topic first

>hello world

>hello beijing

>hello china

>welecome to china

3、消费者命令行操作

3.1.查看操作消费者命令参数

bin/kafka-console-consumer.sh

参数

描述

--bootstrap-server <String: server toconnect to>

连接的Kafka Broker主机名称和端口号。

--topic <String: topic>

操作的topic名称。

--from-beginning

从头开始消费。

--group <String: consumer group id>

指定消费者组名称。

3.2.消费消息

(1)消费first主题中的数据

bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic first

2把主题中所有的数据都读取出来(包括历史数据)

bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --from-beginning --topic first

相关文章:

  • Matlab自学笔记六十一:快速上手解方程
  • 用户行为序列建模(篇九)-【阿里】BERT4Rec
  • 在 Spring Boot 中使用 MyBatis-Plus 的详细教程
  • 实战篇----利用 LangChain 和 BERT 用于命名实体识别-----完整代码
  • Java爬虫实战指南:按关键字搜索京东商品
  • rabbitmq springboot 有哪些配置参数
  • Leetcode 3482. 分析组织层级
  • 状态模式 - Flutter中的状态变身术,让对象随“状态“自由切换行为!
  • 对于“随机种子”的作用的理解
  • 71. 简化路径 —day94
  • 【网络】:DNS协议、ICMP协议、NAT技术
  • Cursor1.1.6安装c++插件
  • .netcore 一个mvc到静态html实现
  • 【数据分析】Python+Tushare实现均线金叉死叉交易策略回测
  • 黑马JVM解析笔记(六):深入理解JVM类加载机制与运行时优化
  • 【JS-6.2-模板字符串】ES6 模板字符串:现代JavaScript的字符串处理利器
  • 可达性分析算法Test
  • 如何将Excel表的内容转化为json格式呢?
  • 深入理解Mysql索引底层数据结构和算法
  • InnoDB的redo日志涉及文件及结构