当前位置: 首页 > news >正文

Spark大数据分析案例(pycharm)

所需文件(将文件放在路径下,自己记住后面要用):

通过百度网盘分享的文件:beauty_p....csv等4个文件
链接:https://pan.baidu.com/s/1pBAus1yRgefveOc7NXRD-g?pwd=22dj 
提取码:22dj 
复制这段内容打开「百度网盘APP 即可获取」

工具:Spark下安装的pycharm

5.20
2.窗口操作(Spark SQL)
在处理数据时,经常会遇到数据的分类问题,比如“按照性别,统计班级的男生和女生的 数量”,这种
问题通过SQL语句的GROUP BY和聚合函数就可以解决,比如“SELECT gender, count(‘gender’) AS 
stu_count FROM stu GROUP BY gender”。不过有时候我们关注的并不是分组统计问题,而是类似“有来
自多个班的学生成绩,分别对每个班的学生进行成绩排名”这样的问题,不是对所有班的学生进行排
名,而是针对每个班的学生进行单独的排名
显然,首先这是一个分组问题,但不能通过GROUP BY来解决。也就是说,这里的分组是一个大前提,
分组之后再对单独每一组的数据进行排序,Spark SQL实现的一个高级特性,即“窗口函数”或“开窗操
作”正是针对这类问题的

2.窗口操作(Spark SQL)
那么什么是窗口函数呢?实际上,窗口可以理解为“指定记录的集合”,即每条记录都有其对应的窗口,
窗口的功能与GROUP BY类似,它们都能将满足条件的记录划分出来。所谓窗口函数,就是对窗口中的
记录(满足某种条件的记录集合)进行执行的一类特殊函数。窗口函数和普通SQL聚合函数的区别主要
有如下3点
(1)聚合函数是将多条记录聚合运算为一条,代表的是一个数据的汇总过程,而窗口函数是对每条
记录在对应的窗口内进行运算,不会改变记录的条数
(2)当窗口函数和聚合函数一起使用时,窗口函数是基于聚合后的数据执行的,也就是说,先执行
聚合操作,再执行窗口函数
(3)窗口函数的执行是在SQL中的FROM、JOIN、WHERE、GROUP BY、HAVING之后,且在ORDER
BY、LIMIT、SELECT、DISTINCT之前完成。当窗口函数执行时,因为GROUP BY的聚合过程已经结束,
所以不会再产生数据的聚合操作


一、启动PyCharm集成开发环境
在Ubuntu 20.04的应用程序列表中找到这个图标启动PyCharm集成开发环境,选择New Project…命令创建一个新的Python项目
名字为BeautyProduct
编译器为python3.6
位置(主要看自己的位置):/home/spark/PycharmProjects/BeautyProduct(文件名)

main.py编写代码(删除原有的配置)
#导入库模块
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace,col,lit
from pyspark.sql import functions as F

#创建Spark对象
spark = SparkSession.builder.appName('SparkBeautyProduct').getOrCreate()
sc = spark.sparkContext

#创建两个df
# 从csv文件读取数据,header代表标题行,inferSchema代表是否自动推断字段类型
df1 = spark.read.csv('hdfs://localhost:9000/datas/beauty_prod_info.csv',
    encoding='UTF-8', header=True, sep=',', inferSchema=False,
    #商品编号prod_id,商品名称product,商品小类cataB,商品大类cataA,销售单价price
    schema='prod_id string, product string, cataB string, cataA string, price float')
df2 = spark.read.csv('hdfs://localhost:9000/datas/beauty_prod_sales.csv',
    encoding='UTF-8', header=True, sep=',', inferSchema=False,
    #订单编码order_id,订单日期od_date,客户编码cust_id,所在区域cust_region,
    #所在省份cust_province,所在地市cust_city,商品编号prod_id,
    #订购数量od_quantity,订购单价od_price,金额od_amount
    schema='''order_id string, od_date string, cust_id string,
              cust_region string, cust_province string, cust_city string,
              prod_id string, od_quantity string, od_price string, od_amount float''')


启动HDFS

spark@vm01:~$ start-all.sh
spark@vm01:~$ jps
3216 SecondaryNameNode
3793 Jps
3346 ResourceManager
1860 Main
3481 NodeManager
3020 DataNode
2846 NameNode

订单数据处理分析
美妆商品订单数据存储在CSV文件中,它们都是结构化的数据,Spark SQL要做的工作包括数据清洗和数据预处理两个方面。为了将数据文件存放到HDFS文件系统上,要先确保HDFS服务已经在运行,然后在HDFS上创建一个datas文件夹,并将两个CSV文件上传至datas文件夹中

将数据文件传送到hdfs里的datas文件夹里
spark@vm01:~$ hdfs dfs -mkdir -p /datas
spark@vm01:~$ hdfs dfs -ls
Found 1 items
drwxr-xr-x   - spark supergroup          0 2025-05-06 22:25 .sparkStaging
spark@vm01:~$ hdfs dfs -ls /datas
spark@vm01:~$ hdfs dfs -put ~/mydata/beauty/beauty_prod_*.csv /datas
spark@vm01:~$ hdfs dfs -ls /datas
Found 2 items
-rw-r--r--   1 spark supergroup       4462 2025-05-19 22:52 /datas/beauty_prod_info.csv
-rw-r--r--   1 spark supergroup    2281550 2025-05-19 22:52 /datas/beauty_prod_sales.csv

使用Spark SQL把CSV文件转换成DataFrame对象,重新设定Schema的字段信息,一是为数据清洗做准
备,二是尽量避免在SQL语句中硬编码中文的字段名称。接着在PyCharm的main.py已有代码之后输入
下面的代码

# 数据清洗:删除包含空字段的数据行、商品编号相同的数据行
df_prod_info = df1.dropna() \
                  .dropDuplicates(['prod_id'])
# 数据清洗:处理订单日期、订购数量、订购单价字段的非法字符,过滤包含错误日期的订单
# 新增一个od_month字段代表订单月份,在后面统计使用
df_prod_sales = df2 \
    .dropna() \
    .distinct() \
    .withColumn("od_date", regexp_replace('od_date', '#', '-')) \
    .withColumn("od_quantity", regexp_replace('od_quantity','个','')) \
    .withColumn("od_price", regexp_replace('od_price','元','')) \
    .withColumn('cust_province', regexp_replace('cust_province','自治区|维吾尔|回族|壮族|省|市','')) \
    .withColumn("od_date", col('od_date').cast('date')) \
    .withColumn("od_quantity", col('od_quantity').cast('integer')) \
    .withColumn("od_price", col('od_price').cast('float')) \
    .withColumn("od_month", F.month('od_date')) \
    .filter("od_date<'2025-12-31'")          #日期改为今年年底

df1.show(10)
df2.show(10)

运行结果:
+-------+-------+-----+------+-----+
|prod_id|product|cataB| cataA|price|
+-------+-------+-----+------+-----+
|   X001|  商品1| 面膜|护肤品|121.0|
|   X002|  商品2| 面膜|护肤品|141.0|
|   X003|  商品3| 面膜|护肤品|168.0|
|   X004|  商品4| 面膜|护肤品|211.0|
|   X005|  商品5| 面膜|护肤品|185.0|
|   X006|  商品6| 面膜|护肤品|159.0|
|   X007|  商品7| 面膜|护肤品|213.0|
|   X008|  商品8| 面膜|护肤品|119.0|
|   X009|  商品9| 面膜|护肤品| 92.0|
|   X010| 商品10| 面膜|护肤品| 58.0|
+-------+-------+-----+------+-----+

+--------+---------+-------+-----------+--------------+--------------------+-------+-----------+--------+---------+
|order_id|  od_date|cust_id|cust_region| cust_province|           cust_city|prod_id|od_quantity|od_price|od_amount|
+--------+---------+-------+-----------+--------------+--------------------+-------+-----------+--------+---------+
|  D31313|2019-5-16| S22796|       东区|        浙江省|              台州市|   X091|        892|     214| 190888.0|
|  D21329|2019-5-14| S11460|       东区|        安徽省|              宿州市|   X005|        276|     185|  51060.0|
|  D22372|2019-8-26| S11101|       北区|        山西省|              忻州市|   X078|       1450|     116| 168200.0|
|  D31078| 2019-4-8| S10902|       北区|        吉林省|    延边朝鲜族自治州|   X025|       1834|     102| 187068.0|
|  D32470|2019-4-11| S18696|       北区|        北京市|              北京市|   X010|        887|      58|  51446.0|
|  D28336| 2019-2-8| S17681|       西区|        云南省|红河哈尼族彝族自治州|   X039|       1102|     108| 119016.0|
|  D29820|2019#3#11| S13780|       南区|        广东省|              江门市|   X011|       1180|     226| 266680.0|
|  D31449| 2019-5-1| S12455|       南区|广西壮族自治区|              百色市|   X051|       1236|     203| 250908.0|
|  D22542|2019-9-18| S10865|       北区|  内蒙古自治区|          呼和浩特市|   X089|       1101|     104| 114504.0|
|  D31859|2019-1-17| S17785|       西区|        云南省|              曲靖市|   X001|        235|     121|  28435.0|
+--------+---------+-------+-----------+--------------+--------------------+-------+-----------+--------+---------+

#1.统计每个“商品小类”中价格最高的前5个商品
df_top5_product = spark.sql(
    'SELECT *,dense_rank() over (partition by cataB order by price DESC) as rank FROM prod')\
    .where('rank<=5')
df_top5_product.show()

结果:
+-------+-------+------+------+-----+----+
|prod_id|product| cataB| cataA|price|rank|
+-------+-------+------+------+-----+----+
|   X071| 商品71|防晒霜|护肤品|253.0|   1|
|   X069| 商品69|防晒霜|护肤品|251.0|   2|
|   X070| 商品70|防晒霜|护肤品|242.0|   3|
|   X068| 商品68|防晒霜|护肤品|225.0|   4|
|   X073| 商品73|防晒霜|护肤品|209.0|   5|
|   X062| 商品62|隔离霜|护肤品|239.0|   1|
|   X067| 商品67|隔离霜|护肤品|212.0|   2|
|   X061| 商品61|隔离霜|护肤品|203.0|   3|
|   X063| 商品63|隔离霜|护肤品|162.0|   4|
|   X064| 商品64|隔离霜|护肤品|150.0|   5|
|   X024| 商品24|  眼霜|护肤品|235.0|   1|
|   X023| 商品23|  眼霜|护肤品|235.0|   1|
|   X017| 商品17|  眼霜|护肤品|213.0|   2|
|   X018| 商品18|  眼霜|护肤品|212.0|   3|
|   X019| 商品19|  眼霜|护肤品|210.0|   4|
|   X022| 商品22|  眼霜|护肤品|158.0|   5|
|   X057| 商品57|爽肤水|护肤品|245.0|   1|
|   X058| 商品58|爽肤水|护肤品|239.0|   2|
|   X056| 商品56|爽肤水|护肤品|233.0|   3|
|   X049| 商品49|爽肤水|护肤品|226.0|   4|
+-------+-------+------+------+-----+----+
only showing top 20 rows

#2.统计每月订购情况,数量和金额
df_month_sale = spark.sql(
    '''SELECT od_month, 
              sum(od_quantity) as total_quantity,
              sum(od_amount) as total_amount
       FROM sales 
       GROUP BY od_month 
       ORDER BY od_month''')
#df_month_sale.show()

结果:
+--------+--------------+------------+
|od_month|total_quantity|total_amount|
+--------+--------------+------------+
|       1|       1947407|3.13620183E8|
|       2|       2145102|3.43201847E8|
|       3|       2568902|4.13361291E8|
|       4|       3184543|5.05195997E8|
|       5|       3590871|5.75270541E8|
|       6|       3553759|5.74130121E8|
|       7|       3884817|6.21952477E8|
|       8|       3833576|6.07935472E8|
|       9|       3113495|4.98623384E8|
+--------+--------------+------------+

#3.统计所在地市的订购数量排行TOP20
df_city_sale = spark.sql(
    '''SELECT cust_city, 
              sum(od_quantity) as total_quantity 
       FROM sales
       GROUP BY cust_city 
       ORDER BY total_quantity DESC 
       LIMIT 20''')
#df_city_sale.show()

结果:
+---------+--------------+
|cust_city|total_quantity|
+---------+--------------+
|   上海市|       1125312|
|   苏州市|       1010450|
|   重庆市|        805434|
|   深圳市|        778322|
|   泉州市|        738150|
|   杭州市|        563136|
|   北京市|        561432|
|   广州市|        523038|
|   南通市|        452168|
|   宁波市|        425098|
|   泰州市|        413528|
|   长沙市|        398747|
|   南京市|        395471|
|   无锡市|        381211|
|   常州市|        361425|
|   温州市|        342305|
|   东莞市|        341821|
|   扬州市|        313732|
|   福州市|        275812|
|   南昌市|        275555|
+---------+--------------+

#4.分析什么类型的美妆需求量最大
# df = spark.sql(
#     '''SELECT s.*,
#               product,cataB,cataA, price
#        FROM sales s,prod p
#        WHERE s.prod_id=p.prod_id''')
# df.show()
df_best_seller = spark.sql(
    '''SELECT cataA,cataB,
              sum(od_quantity) as total_quantity 
       FROM sales s,prod p
       WHERE s.prod_id=p.prod_id 
       GROUP BY cataA,cataB
       ORDER BY cataA ASC, total_quantity DESC''')
df_best_seller.show()

结果:
+------+------+--------------+
| cataA| cataB|total_quantity|
+------+------+--------------+
|  彩妆|  口红|       2013024|
|  彩妆|  粉底|       1188621|
|  彩妆|睫毛膏|        586332|
|  彩妆|  眼影|        295795|
|  彩妆|  蜜粉|         45534|
|护肤品|  面膜|       5450216|
|护肤品|  面霜|       4566905|
|护肤品|爽肤水|       3523687|
|护肤品|  眼霜|       3346554|
|护肤品|隔离霜|       2488124|
|护肤品|防晒霜|       2388610|
|护肤品|洁面乳|       1927482|
+------+------+--------------+


Process finished with exit code 0

#5.分析哪些省份的美妆需求量最大
df_province_sale = spark.sql(
    '''SELECT cust_province, 
              sum(od_quantity) as total_quantity 
       FROM sales
       GROUP BY cust_province''')
#df_province_sale.show()

结果:
+-------------+--------------+
|cust_province|total_quantity|
+-------------+--------------+
|         广东|       2951377|
|         云南|        706882|
|       内蒙古|        120813|
|         湖北|        909538|
|         新疆|        202183|
|         海南|        134901|
|         陕西|        647066|
|         天津|        155298|
|         广西|        914145|
|         河南|        552203|
|         贵州|        483007|
|         江苏|       4180558|
|         宁夏|        102254|
|         福建|       1569114|
|       黑龙江|        481754|
|         辽宁|        564647|
|         重庆|        805434|
|         安徽|       1353512|
|         山东|       1234058|
|         湖南|       1314400|
+-------------+--------------+
only showing top 20 rows


Process finished with exit code 0

#6.统计每个客户的最近一次购买时间、消费频率、总消费金额
#增加一个用于SparkSQL窗口操作的字段cust_all设定整列值为1,相当于所有数据在同一个操作窗口中
df = spark.sql(
    '''SELECT cust_id,
              max(od_date) as od_latest,
              count(order_id) as total_count,
              sum(od_amount) as total_amount
       FROM sales
       GROUP BY cust_id''') \
    .withColumn('cust_all', lit(1))
#df.show()

结果:
+-------+----------+-----------+------------+--------+
|cust_id| od_latest|total_count|total_amount|cust_all|
+-------+----------+-----------+------------+--------+
| S14776|2019-09-20|         17|   2596176.0|       1|
| S17873|2019-09-04|         23|   2962041.0|       1|
| S15320|2019-09-20|         55|   8755617.0|       1|
| S22155|2019-08-08|         19|   2446681.0|       1|
| S14412|2019-06-12|         33|   4358357.0|       1|
| S23207|2019-08-23|         19|   2629213.0|       1|
| S19304|2019-09-26|         27|   3698868.0|       1|
| S16573|2019-09-26|         54|   7226044.0|       1|
| S21521|2019-09-24|         34|   5238461.0|       1|
| S19779|2019-09-11|         53|   8737872.0|       1|
| S20992|2019-07-25|         15|   1944612.0|       1|
| S21120|2019-09-27|         16|   1793628.0|       1|
| S12814|2019-09-23|         47|   6543973.0|       1|
| S11602|2019-09-30|         29|   4430713.0|       1|
| S19346|2019-09-26|          9|    997019.0|       1|
| S15443|2019-08-21|         20|   2652965.0|       1|
| S10226|2019-09-23|         17|   2381690.0|       1|
| S23247|2019-07-09|         18|   1935552.0|       1|
| S11773|2019-09-30|         26|   3009549.0|       1|
| S12196|2019-07-29|         10|   1528659.0|       1|
+-------+----------+-----------+------------+--------+
only showing top 20 rows


Process finished with exit code 0

#为生成的客户DataFrame映射为一张临时视图表
df.createOrReplaceTempView('customer')
#通过归一化方式计算 R-Recency  F-Frequency  M-Money 对应的量化值
df = spark.sql(
    '''SELECT cust_id, od_latest,total_count,total_amount,
          percent_rank() over (partition by cust_all order by od_latest) as R,
          percent_rank() over (partition by cust_all order by total_count) as F,
          percent_rank() over (partition by cust_all order by total_amount) as M
       FROM customer''')
#df.show()
#按权重比 R-20% F-30% M-50% 计算客户价值得分,保留1位小数,从大到小排序
df_customerRFM = df\
    .withColumn('score', col('R')*20+col('F')*30+col('M')*50) \
    .withColumn('score', F.round(col('score'),1)) \
    .orderBy(F.desc('score'))
#df_customerRFM.where("score>98").show()

结果:
+-------+----------+-----------+------------+------------------+-----------------+-----------------+-----+
|cust_id| od_latest|total_count|total_amount|                 R|                F|                M|score|
+-------+----------+-----------+------------+------------------+-----------------+-----------------+-----+
| S17476|2019-09-30|         68| 1.0258002E7|0.9602954755309326|0.984302862419206|0.987072945521699| 98.1|
+-------+----------+-----------+------------+------------------+-----------------+-----------------+-----+


#为保存CSV文件准备的中文列名字典
colmap = {'prod_id':'商品编号', 'product':'商品名称',
          'cataB'  :'商品小类', 'cataA'  :'商品大类',
          'price'  :'销售单价',

          'order_id'     :'订单编码','od_date'    :'订单日期',
          'cust_id'      :'客户编码','cust_region':'所在区域',
          'cust_province':'所在省份','cust_city'  :'所在地市',
          'od_quantity'  :'订购数量','od_price'   :'订购单价',
          'od_amount'    :'金额',

          'total_quantity':'订购数量','od_month':'订单月份',
          'total_amount'  :'金额',

          'od_latest'  :'最近一次购买时间',
          'total_count':'消费频率','score':'综合分数',
          }

#每个商品小类中价格最高的前5个商品、每月订购情况、地市订购数量排行
#美妆商品需求量排行、省份美妆需求量排行、RFM模型客户价值
#以上分析结果保存至HDFS存储,便于后续的数据可视化
df_top5_product\
    .drop('rank')\
    .coalesce(1)\
    .withColumnRenamed('prod_id', colmap['prod_id'])\
    .withColumnRenamed('product', colmap['product'])\
    .withColumnRenamed('cataB', colmap['cataB'])\
    .withColumnRenamed('cataA', colmap['cataA'])\
    .withColumnRenamed('price', colmap['price'])\
    .write\
    .option('header', True)\
    .option('sep', ',')\
    .mode('overwrite')\
    .csv('hdfs://localhost:9000/datas/result.top5_product')
df_month_sale.coalesce(1)\
    .withColumnRenamed('od_month', colmap['od_month'])\
    .withColumnRenamed('total_quantity', colmap['total_quantity'])\
    .withColumnRenamed('total_amount', colmap['total_amount'])\
    .write.csv('hdfs://localhost:9000/datas/result.month_sale',
               header=True, mode="overwrite")
df_city_sale.coalesce(1)\
    .withColumnRenamed('cust_city', colmap['cust_city'])\
    .withColumnRenamed('total_quantity', colmap['total_quantity'])\
    .write.csv('hdfs://localhost:9000/datas/result.city_sale',
               header=True, mode="overwrite")
df_best_seller.coalesce(1)\
    .withColumnRenamed('cataA', colmap['cataA'])\
    .withColumnRenamed('cataB', colmap['cataB'])\
    .withColumnRenamed('total_quantity', colmap['total_quantity'])\
    .write.csv('hdfs://localhost:9000/datas/result.best_seller',
               header=True, mode="overwrite")
df_province_sale.coalesce(1)\
    .withColumnRenamed('cust_province', colmap['cust_province'])\
    .withColumnRenamed('total_quantity', colmap['total_quantity'])\
    .write.csv('hdfs://localhost:9000/datas/result.province_sale',
               header=True, mode="overwrite")
df_customerRFM.coalesce(1)\
    .withColumnRenamed('cust_id', colmap['cust_id'])\
    .withColumnRenamed('od_latest', colmap['od_latest'])\
    .withColumnRenamed('total_count', colmap['total_count'])\
    .withColumnRenamed('total_amount', colmap['total_amount'])\
    .withColumnRenamed('score', colmap['score'])\
    .write.csv('hdfs://localhost:9000/datas/result.customerRFM',
               header=True, mode="overwrite")


将hdfs里面生成的文件传送到本地(下载到本地):
spark@vm01:~$ hdfs dfs -get /datas/result.month_sale/part-* ~/mydata/month_sale.csv
spark@vm01:~$ hdfs dfs -get /datas/result.city_sale/part-* ~/mydata/city_sale.csv

将上面两个文件可视化
新建名为:data_visual.py 的python文件

#导入库模块
import pandas as pd
from pyecharts import options as opts
from pyecharts.charts import Bar

#读取文件数据
#从csv文件读取数据,转换为Pandas中的DataFrame二维表
month_sale = pd.read_csv('/home/spark/mydata/month_sale.csv')

#根据实际需要处理每一列的值,分别得到list数组
x = [f'{v}月' for v in month_sale['订单月份'].tolist()]
y1 = [round(v/10000, 2) for v in month_sale['订购数量'].tolist()]
y2 = [round(v/10000/10000, 2) for v in month_sale['金额'].tolist()]
print(x)
print(y1)
print(y2)

结果:
/usr/bin/python3.6 /home/spark/PycharmProjects/BeautyProduct/data_visual.py 
['1月', '2月', '3月', '4月', '5月', '6月', '7月', '8月', '9月']
[194.74, 214.51, 256.89, 318.45, 359.09, 355.38, 388.48, 383.36, 311.35]
[3.14, 3.43, 4.13, 5.05, 5.75, 5.74, 6.22, 6.08, 4.99]

去文件保存的目录里,查看可视化文件,如果Spark上的浏览器看不到,可以将文件复制在本地电脑上查看

相关文章:

  • pycharm无法正常调试问题
  • 山东大学软件学院项目实训-基于大模型的模拟面试系统-Vditor编辑器上传图片
  • C++学习:六个月从基础到就业——多线程编程:std::thread基础
  • std::chrono类的简单使用实例及分析
  • JavaScript性能优化实战(13):性能测试与持续优化
  • 后期:daplink
  • 可编辑PPT | 华为安全架构设计方法指南华为数字化转型架构解决方案
  • npm vs npx 终极指南:从原理到实战的深度对比 全面解析包管理器与包执行器的核心差异,助你精准选择工具
  • 完善网络安全等级保护,企业需注意:
  • kotlin 将一个list按条件分为两个list(partition )
  • centos 9 Kickstart + Ansible自动化部署 —— 筑梦之路
  • 阅读笔记---城市计算中用于预测学习的时空图神经网络研究综述
  • JVM的面试相关问题
  • List优雅分组
  • Python打卡DAY31
  • STM32+ESP8266+ONENET+微信小程序上传数据下发指令避坑指南
  • .NET 10 - 尝试一下Minimal Api的Validation新特性
  • LangChain4j入门(六)整合提示词(Prompt)
  • RK3588 ArmNN CPU/GPU ResNet50 FP32/FP16/INT8 推理测试
  • .NET外挂系列:3. 了解 harmony 中灵活的纯手工注入方式
  • 焦点访谈丨售假手段又翻新,警惕化肥“忽悠团”的坑农套路
  • 山西持续高温:阳城地表温度72.9℃破纪录,明日局部地区仍将超40℃
  • F4方程式上海站引擎轰鸣,见证中国赛车运动不断成长
  • 特朗普与泽连斯基通话
  • 高温最强时段来了!北方局地高温有明显极端性
  • 体坛联播|水晶宫队史首夺足总杯,CBA总决赛爆发赛后冲突