Python脚本(Kafka生产者+消费者)
以mac为例
1)安装kafka : brew install kafka
2)检查安装路径:
brew --prefix kafka
brew --prefix zookeeper
3)启动服务(目录为本人电脑目录,可能与读者目录不一致)
3.1)启动zookeeper:zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
3.2)启动kafka: kafka-server-start /usr/local/etc/kafka/server.properties
4)topic
4.1)创建topic:
# 创建名为 test_topic 的 topic,1个分区,1个副本
kafka-topics --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic test_topic
注意:
根据版本选择参数:
Kafka 版本 | 推荐创建 Topic 的方式 | 参数使用 |
---|---|---|
Kafka ≥ 2.2 | 推荐使用 --bootstrap-server (更现代,不依赖 ZK) | --bootstrap-server localhost:9092 |
Kafka < 2.2(比如 2.1.x、1.x) | 必须使用 --zookeeper | --zookeeper localhost:2181 |
4.2)查看topic:kafka-topics --list --bootstrap-server localhost:9092
旧版本:kafka-topics --list --zookeeper localhost:2181
5)python3代码(以kafka-python为例):
工具 | 适用场景 | 特点 |
---|---|---|
confluent-kafka | 生产环境,高性能 | 需安装 librdkafka,API 更底层,速度快 |
kafka-python | 学习、测试、开发 | 纯 Python,无需额外依赖,简单易用 |
5.1)安装依赖:pip install kafka-python
5.2)生产者(producer_kafka_python.py)
from kafka import KafkaProducer
import timebootstrap_servers = 'localhost:9092'
topic = 'test_topic'producer = KafkaProducer(bootstrap_servers=bootstrap_servers,value_serializer=lambda v: v.encode('utf-8')
)for i in range(5):msg = f'Hello from kafka-python #{i}'print(f'发送: {msg}')future = producer.send(topic, value=msg)metadata = future.get(timeout=10)print(f"发送到 Topic: {metadata.topic}, Partition: {metadata.partition}, Offset: {metadata.offset}")time.sleep(1)producer.close()
生产者运行结果如下:
5.3)消费者(consumer_kafka_python.py)
from kafka import KafkaConsumerbootstrap_servers = 'localhost:9092'
topic = 'test_topic'
group_id = 'my-group-python'consumer = KafkaConsumer(topic,bootstrap_servers=bootstrap_servers,group_id=group_id,auto_offset_reset='earliest',value_deserializer=lambda x: x.decode('utf-8')
)print(f"开始消费主题 '{topic}' ...")for message in consumer:print(f"收到消息 -> Topic: {message.topic}, Partition: {message.partition}, Offset: {message.offset}, Value: {message.value}")
消费者运行结果如下:
6)运行步骤总结:
- 安装 Kafka(通过 Homebrew)
- 启动 Zookeeper
- 启动 Kafka Broker
- **(可选)创建 Topic:
test_topic
- 运行 Python 生产者:
python
producer_kafka_python.py
- 运行 Python 消费者:
python
consumer_kafka_python.py
7)
confluent-kafka版python代码:
7.1)安装librdkafka : brew install librdkafka
7.2)安装confluent-kafka : pip3 install confluent-kafka
7.3)生产者
7.4)消费者