Windows系统下使用Kafka和Zookeeper,Python运行kafka(二)
1.配置 Zookeeper
进入解压后的 Zookeeper 目录(例如 F:\zookeeper\conf
),复制 zoo_sample.cfg
文件并命名为 zoo.cfg
(如果 zoo.cfg
已经存在,则直接编辑该文件)。
打开 zoo.cfg
文件,配置相关参数:
# 数据存储目录
dataDir=F:/zookeeper/data
# 日志存储目录
dataLogDir=F:/zookeeper/logs
# 客户端连接端口,默认 2181
clientPort=2181
# 初始化领导者选举和同步的配置,一般保持默认即可
initLimit=5
syncLimit=2
# 配置 Zookeeper 集群(单机模式下无需配置)
# server.1=localhost:2888:3888
确保 dataDir
和 dataLogDir
目录存在,如果不存在则手动创建。
可以看到有dataDir
只改了这个地方
与此同时,创建文件夹
2.启动 Zookeeper
进入 Zookeeper 的 bin 目录(例如 F:\zookeeper\bin
),打开命令提示符,执行以下命令启动 Zookeeper:
.\zkServer.cmd
如果启动成功,会看到类似以下的日志信息:
3.配置 Kafka 使用外部 Zookeeper
打开 Kafka 的配置文件 config/server.properties
,修改 zookeeper.connect
参数,指定自己的 Zookeeper 地址和端口:
zookeeper.connect=localhost:2181
这里的 localhost:2181
是 Zookeeper 的默认地址和端口,如果你的 Zookeeper 配置了其他地址或端口,则需要相应修改。
4.启动 Kafka
进入 Kafka 的目录,执行以下命令启动 Kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties
5.安装Kafka插件
安装完成点击这个kafka插件
如果没有topic
主题是连接不了的
使用以下代码创建一个名为test
的topic
主题,3个分区,2个副本,前提是先启动 Zookeeper
和 Kafka
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError# 配置 Kafka 服务器地址
bootstrap_servers = ['localhost:9092']# 创建 Kafka 管理客户端实例
admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)# 要创建的主题信息
topic_name = 'test'
num_partitions = 3 # 分区数量
replication_factor = 1 # 副本因子,调整为不超过可用 Broker 数量try:# 创建新主题topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)admin_client.create_topics([topic])print(f"主题 {topic_name} 创建成功,分区数: {num_partitions},副本因子: {replication_factor}")
except TopicAlreadyExistsError:print(f"主题 {topic_name} 已存在,无需再次创建")
except Exception as e:print(f"创建主题 {topic_name} 时出错: {e}")
finally:# 关闭管理客户端admin_client.close()
有了一个test
主题,就可以连接了
接下来发送1000条消息
for i in range(1000):message = f"message{i+1}"# 发送消息future = producer.send(topic, message)