Linux上kafka部署和使用
文章目录
- 低版本
- 高版本
- Linux命令
背景:测试超过kafka的设置的最大连接数后,是否会对之前的连接产生影响。
结论:查过最大连接数后,拒绝连接
[root@pulsartest1 kafka_2.13-4.1.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:8092 --topic C --from-beginning
[2025-11-05 15:43:25,399] WARN [Consumer clientId=console-consumer, groupId=console-consumer-38745] Bootstrap broker localhost:8092 (id: -1 rack: null isFenced: false) disconnected (org.apache.kafka.clients.NetworkClient)
目的:记录kafka的部署和一些linux命令
低版本
kafka版本2.3.0
注意使用java8,java17可能会报dns解析错误。
问题参考:JDK导致ActiveMQ、Kafka连接zookeeper失败:Session 0x0 for server 10.1.21.244/:2181, unexpected error, closing socket connection and attempting reconnect - 没有星星的夏季 - 博客园
部署参考:https://baimoz.me/2679/
部署
#下载安装包
wget https://archive.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz
#找一个目录,解压
tar -xvzf kafka_2.12-2.3.0.tgz
启动
强制使用java8启动,JAVA8_HOME是设置的Java8的环境变量
JAVA_HOME=$JAVA8_HOME bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
JAVA_HOME=$JAVA8_HOME bin/kafka-server-start.sh config/server.properties
kafka命令
#创建topic
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic A --partitions 1 --replication-factor 1
#查询topic
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list#生产者(低版本broker-list,高版本bootstrap-server)“A”是topic名称
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic A
#消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic A --from-beginning
配置
broker.id=0
listeners=PLAINTEXT://:9092
zookeeper.connect=127.0.0.1:2181
#最大连接数
max.connections=10
log.dirs=/tmp/kafka-logs
zookeeper.connection.timeout.ms=6000
#副本因子(默认是3,这里只启动了一个单节点)
offsets.topic.replication.factor=1
高版本
kafka所有版本:https://archive.apache.org/dist/kafka/
Kafka 4.x 要求 Java 11+
部署
wget https://archive.apache.org/dist/kafka/4.1.0/kafka_2.13-4.1.0.tgz
解压...
配置
# 创建 kraft 配置目录(若不存在)
mkdir -p config/kraftcat > config/kraft/server.properties << EOF
# 核心角色配置(Kafka 4.x 必填)
process.roles=broker,controller
node.id=1
# 控制器集群配置:端口改为 8093(原 9093)
controller.quorum.voters=1@localhost:8093
# 监听地址:业务端口 8092(原 9092),控制器端口 8093(原 9093)
listeners=PLAINTEXT://:8092,CONTROLLER://:8093
inter.broker.listener.name=PLAINTEXT
controller.listener.names=CONTROLLER
log.dirs=/tmp/kafka-logs-8092
max.connections=10
# 关闭 SASL 认证(测试环境避坑)
sasl.enabled.mechanisms=PLAIN
security.protocol=PLAINTEXT
# 1. 设置消费者偏移量主题(__consumer_offsets)的默认副本因子为 1
offsets.topic.replication.factor=1# 2. 设置所有系统主题(包括 __consumer_offsets、__transaction_state 等)的默认副本因子为 1
default.replication.factor=1max.connections.per.ip=10
max.connections.per.listener=10
EOF
# 1. 生成集群 ID(复制输出的 UUID,如:abcdef12-3456-7890-abcd-ef1234567890)
bin/kafka-storage.sh random-uuid# 2. 初始化数据目录(替换 <集群ID> 为上面生成的 UUID)
bin/kafka-storage.sh format -t <集群ID> -c config/kraft/server.properties
- 成功提示:
Formatting complete。
启动 Kafka 4.x(KRaft 模式,无需 ZK)
bin/kafka-server-start.sh config/kraft/server.properties
kafka命令
#生产者命令有所不同
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic A
配置错误,更改配置重启后
bin/kafka-storage.sh format -t <集群ID> -c config/kraft/server.properties
# 删除之前可能残留的 9092/8092 数据目录
rm -rf /tmp/kafka-logs /tmp/kafka-logs-8092
Linux命令
监视连接数
#查pid
ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}'
#查询连接数
netstat -anp | grep $3339446 | grep ESTABLISHED | grep ':8092' | wc -l
watch -n 1 "netstat -anp | grep 614052 | grep ESTABLISHED | wc -l"
