当前位置: 首页 > news >正文

pyspark中的kafka的读和写案例操作

下面将详细讲解 PySpark 中操作 Kafka 进行数据读写的案例,包括必要的配置、代码实现和关键参数说明。

PySpark 与 Kafka 集成基础

PySpark 通过 Spark Streaming 或 Structured Streaming 与 Kafka 集成,需要引入特定的依赖包。通常使用spark-sql-kafka-0-10_2.12包,版本需要与 Spark 版本匹配。

读取 Kafka 数据(消费消息)

从 Kafka 读取数据可以分为批处理和流处理两种方式:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, IntegerType# 初始化SparkSession
spark = SparkSession.builder \.appName("KafkaReaderExample") \.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \.getOrCreate()# 1. 流处理方式读取Kafka数据
def stream_read_kafka():# 配置Kafka连接参数kafka_df = spark.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("subscribe", "test_topic")  # 订阅的主题,可以是多个用逗号分隔.option("startingOffsets", "earliest")  # 从最早的偏移量开始消费.load()# Kafka返回的数据包含多个字段,我们主要关注value字段(实际消息内容)# 将二进制的value转换为字符串kafka_df = kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")# 如果消息是JSON格式,可以进一步解析schema = StructType() \.add("id", IntegerType()) \.add("name", StringType()) \.add("age", IntegerType())parsed_df = kafka_df.select(from_json(col("value"), schema).alias("data")).select("data.*")# 输出到控制台(调试用)query = parsed_df.writeStream \.outputMode("append") \.format("console") \.start()query.awaitTermination()# 2. 批处理方式读取Kafka数据
def batch_read_kafka():kafka_df = spark.read \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("subscribe", "test_topic") \.option("startingOffsets", """{"test_topic":{"0":0}}""")  # 指定分区和偏移量.option("endingOffsets", """{"test_topic":{"0":100}}""") \.load()# 转换为字符串并展示result_df = kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")result_df.show(truncate=False)if __name__ == "__main__":# 选择运行流处理或批处理# stream_read_kafka()batch_read_kafka()

写入 Kafka 数据(生产消息)

同样,写入 Kafka 也支持流处理和批处理两种方式:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_json, struct# 初始化SparkSession
spark = SparkSession.builder \.appName("KafkaWriterExample") \.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \.getOrCreate()# 1. 流处理方式写入Kafka
def stream_write_kafka():# 创建测试数据data = [("1", "Alice", 25), ("2", "Bob", 30), ("3", "Charlie", 35)]df = spark.createDataFrame(data, ["id", "name", "age"])# 转换为Kafka所需的格式(必须包含key和value字段)kafka_df = df.select(col("id").alias("key"),  # key字段to_json(struct("id", "name", "age")).alias("value")  # value字段转为JSON)# 写入Kafkaquery = kafka_df.writeStream \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("topic", "test_topic") \.option("checkpointLocation", "/tmp/kafka_checkpoint")  # 流处理必须设置检查点.start()query.awaitTermination()# 2. 批处理方式写入Kafka
def batch_write_kafka():# 创建测试数据data = [("4", "David", 40), ("5", "Eve", 45)]df = spark.createDataFrame(data, ["id", "name", "age"])# 转换为Kafka所需格式kafka_df = df.select(col("id").cast("string").alias("key"),to_json(struct("id", "name", "age")).alias("value"))# 写入Kafkakafka_df.write \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("topic", "test_topic") \.save()if __name__ == "__main__":# 选择运行流处理或批处理# stream_write_kafka()batch_write_kafka()

关键参数说明

  1. 连接参数

    • kafka.bootstrap.servers:Kafka 集群的地址列表,格式为host:port
    • subscribe:要订阅的主题,多个主题用逗号分隔
    • topic:写入时指定的目标主题
  2. 偏移量设置

    • startingOffsets:读取的起始偏移量,earliest(最早)或latest(最新)
    • endingOffsets:批处理时的结束偏移量
  3. 数据格式

    • Kafka 中的数据以二进制形式存储,需要转换为字符串:CAST(key AS STRING)CAST(value AS STRING)
    • 写入时需要将数据转换为包含keyvalue字段的 DataFrame
  4. 流处理特殊参数

    • checkpointLocation:必须设置,用于保存流处理的状态信息
    • outputMode:输出模式,常用append(追加)

运行注意事项

  1. 确保 Kafka 服务已启动并正常运行
  2. 主题需要提前创建:kafka-topics.sh --create --topic test_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
  3. 依赖包版本需要与 Spark 版本匹配,例如 Spark 3.3.0 对应spark-sql-kafka-0-10_2.12:3.3.0
  4. 流处理程序需要手动停止,可通过query.stop()或 Ctrl+C 终止

通过以上示例,你可以实现 PySpark 与 Kafka 之间的数据交互,根据实际需求选择批处理或流处理方式。

http://www.dtcms.com/a/315452.html

相关文章:

  • Goby 漏洞安全通告| NestJS DevTools /inspector/graph/interact 命令执行漏洞(CVE-2025-54782)
  • libpq库使用
  • PDF转图片工具技术文档(命令行版本)
  • 【taro react】 ---- useModel 数据双向绑定 hook 实现
  • vue和react的框架原理
  • 基于PD控制器的四旋翼无人机群飞行控制系统simulink建模与仿真
  • SpringBoot原理揭秘--BeanFactory和ApplicationContext
  • day 46 神经网络-简版
  • 2025年渗透测试面试题总结-01(题目+回答)
  • 什么是压接孔?压接孔PCB制造流程
  • Zabbix 企业级高级应用
  • AI赋能复合材料与智能增材制造:前沿技术研修重磅
  • 【MATLAB】(八)矩阵
  • 盟接之桥说制造:价格战与品质:制造企业可持续发展的战略思考
  • 智能融合:增材制造多物理场AI建模与工业应用实战
  • PHP:历经岁月仍熠熠生辉的服务器端脚本语言
  • Spring 的 ioc 控制反转
  • 无人设备遥控器之信号切换技术篇
  • Guava 与 Caffeine 本地缓存系统详解
  • jQuery DOM节点操作详解
  • stm32F407 硬件COM事件触发六步换相
  • AI医疗革命:十大应用场景如何重塑未来医疗
  • 手绘风格制图新选择:如何用Excalidraw+cpolar构建你的视觉化工作流?
  • windos10 安装CentOS7 虚拟机笔记
  • Datawhale AI夏令营 第三期 task2
  • 基于ZYNQ ARM+FPGA的声呐数据采集系统设计
  • 01数据结构-平衡二叉树
  • Prometheus监控学习-安装
  • 【Git】实现使用SSH方式连接远程仓库时的免密操作
  • 计算机网络:目的网络在路由表项中的作用