当前位置: 首页 > news >正文

解锁 PySpark SQL 的强大功能:有关 App Store 数据的端到端教程

解锁 PySpark SQL 的强大功能:有关 App Store 数据的端到端教程

参考链接:

  • Unlocking the Power of PySpark SQL: An end-to-end tutorial on App Store data:https://datascience.fm/pyspark-sql-end-to-end-tutorial-app-store-dataset/
  • androidapps.csv数据源:https://cdn.getmidnight.com/171293841d3fdd4af2e12426ce202ac9/files/2024/02/androidapps.csv?ref=datascience.fm
  • Anaconda3网址:https://www.anaconda.com/download
  • Spark下载网址:https://spark.apache.org/downloads.html
  • google浏览器下载网址:https://www.google.cn/chrome/

任务要求:

  • 环境搭建:在本学期内,你们要完成集成 Anaconda3 与 Spark 的 PySpark notebook 环境设置。确保安装好 Anaconda3 和 Spark,配置好相关环境变量,为后续的学习和实践做好准备。
  • 基本 SQL 操作:深入学习 PySpark SQL 的 select 查询、group by 和 order by 等基本操作。通过实际操作,熟练运用这些操作对数据进行处理和分析,能够解决实际问题。
  • 函数学习:全面掌握 PySpark SQL 中的聚合函数、标量函数和窗口函数。通过具体案例分析,深入理解这些函数在复杂数据分析中的作用。
  • 用户定义函数(UDF):研究并掌握用户定义函数(UDF)的实用程序和实现方法。

验收条件:

  • 环境设置:PySpark notebook 环境设置正确且能正常运行。
  • 操作执行:准确运用 PySpark SQL 进行基本操作,操作结果正确。
  • 函数应用:熟练运用 PySpark SQL 中的函数进行数据分析,结果准确。
  • UDF 实现:成功实现用户定义函数(UDF),能够运用 UDF 解决实际问题。

希望同学们通过本次实验,能够掌握 PySpark SQL 的基本操作,为大数据处理和分析提供有力支持。

1. 虚拟机ubuntu中设置 PySpark notebook

1.1 Anacoda3的安装

1.1.1 下载Anacoda3安装包
  1. 下载安装包到Anaconda3下载网址下载最新的发行版。选择 Linux 的安装包。如,本文章下载得到文件是:Anaconda3-2024.10-1-Linux-x86_64.sh将这个文件放到共享文件夹,虚拟机ubuntuvm可用通过文件夹/media/sf_vmshare/访问。
1.1.2 安装Anacoda3
  1. 安装以hadoop用户登录,尽量都使用默认的设置参数进行安装:
    bash /media/sf_vmshare/Anaconda3-2024.10-1-Linux-x86_64.sh
    注意:这里有很多内容出现(其最后一行是more这个词),需要你一直按回车键知道最后一行不在是more。

  2. 然后根据以下步骤执行操作:

    Do you accept the license terms? [yes|no]
    >>>  yes  # 👈 接受版权条款
    Anaconda3 will now be installed into this location:
    /home/hadoop/anaconda3
    - Press ENTER to confirm the location
    - Press CTRL-C to abort the installation
    - Or specify a different location below
    [/home/hadoop/anaconda3] >>>  # 👈 直接回车,接受默认的安装路径
    PREFIX=/home/hadoop/anaconda3
    Unpacking payload ...
    Installing base environment...
    Downloading and Extracting Packages:
    Downloading and Extracting Packages:
    Preparing transaction: done
    Executing transaction: done
    installation finished.
    Do you wish to update your shell profile to automatically initialize conda?
    This will activate conda on startup and change the command prompt when activated.
    If you'd prefer that conda's base environment not be activated on startup,
    run the following command when conda is activated:conda config --set auto_activate_base falseYou can undo this by running `conda init --reverse $SHELL`? [yes|no]
    [no] >>>  # 👈 直接回车,接受默认的[no]选项
    You have chosen to not have conda modify your shell scripts at all.
    To activate conda's base environment in your current shell session:
    eval "$(/home/hadoop/anaconda3/bin/conda shell.YOUR_SHELL_NAME hook)"
    To install conda's shell functions for easier access, first activate, then:
    conda init
    Thank you for installing Anaconda3!
    

    至此Anacoda3已安装完毕!

1.1.3 安装完成后的一些设置与检查
  1. 设置安装程序执行的最后,屏幕输出了一些提示信息。其中提到,要执行conda init来进行shell功能的初始化。执行下面的命令:

     cd ~/anaconda3/bin./conda init
    
  2. 在shell功能的初始化过程中将会自动在~/.bashrc配置文件尾部加入conda的初始化脚本
    注意,建议不要人为去修改conda 的初始化脚本,这部分初始化脚本将由 conda 自行管理。退出当前的 SSH 连接并重新登录生效。 重新以 Hadoop 用户登录进入虚拟机后,我们发现它的提示符变为这样,前面带有 “(base)” 的标记,它表示当前在 conda 的默认环境 base 中。

  3. 查看全部环境列表。当前只有一个 base 环境,且是激活的环境(用 * 号标记):
    conda env list

  4. 发行版安装后,随着时间的推移,可能会有一些更新。我们可以在命令行窗口,将它更新到最新。

    #在当前的环境下,更新全部的包。
    conda update --all
    

    至此,基于 Anaconda 的 Python 开发环境已经就绪。

1.2 Spark的安装

1.2.1 下载最新版的 Spark
  1. 在Spark下载网址下载安装包spark-3.5.3-bin-hadoop3.tgz并将其复制到共享文件夹内。
1.2.2 安装Spark
  1. 以 hadoop 用户登录虚拟机,手动解压缩安装:

    # 进入目标文件夹。
    cd /opt/app
    # 解压缩。
    tar zxf /media/sf_vmshare/spark-3.5.3-bin-hadoop3.tgz
    # 进入文件夹,检查一下文件。
    cd spark-3.5.3-bin-hadoop3
    
  2. 修改hadoop用户的配置文件 ~/.profile ,在尾部添加环境变量:
    export SPARK_HOME=/opt/app/spark-3.5.3-bin-hadoop3
    退出当前的 SSH 连接并重新登录生效。

  3. 检查一下。 Spark 提供命令行的交互工具“spark-shell”。有 Scala 和 Python 两种。 我们先看基于 Scala 语言的 Shell:

    cd $SPARK_HOME
    # 启动交互式 Scala Shell(按 Ctrl+D 退出)。
    ./bin/spark-shell
    

    当看到提示符“scala>”,它表示 spark-shell 已经就绪。

  4. 我们再来看看基于Pyhon语言的Shell。启动交互式Python Shell(按Ctrl+D,或输入”exit()“退出):

    # 启动交互式 Python Shell。
    ./bin/pyspark
    
  5. spark-shell启动(一定要先启动,否则打不开网址)“Spark context Web UI”,地址在:http://10.0.2.15:4040 。注意这里“10.0.2.15”是虚拟机 ubuntuvm1的内网IP地址。因虚拟机是以 NAT 方式来联网,它的 IP 是 “10.0.x” 这样的一个内网 IP,从 Windows 宿主机上不能直接访问,只能通过端口转发。 例如,下面使用虚拟机上的 Google Chrome 浏览器,直接访问本机 HBase 的 Web 界面(访问 http://localhost:4040 )。

  6. PySpark是Spark的Python开发环境,下面将它与Jupyter Notebook集成。在当前hadoop用户的配置文件~/.profile尾部新添加下面3行:

    export PATH=$SPARK_HOME/bin:$PATH
    export PYSPARK_DRIVER_PYTHON=jupyter
    export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
    

    退出当前的 SSH 连接并重新登录生效

  7. 启动 pyspark。以hadoop用户登录,在任意位置启动pyspark(首先要进入到bash环境):

    #启动bash环境
    conda activate
    #启动pyspark
    pyspark
    

    注意上面的提示信息,给出了一个 URL 链接,它是在本机访问 Jupyter Notebook 所需的链接,若启动pyspark不能直接跳转到jupyterLab页面,可通过复制该链接到Google Chrome浏览器中打开jupyterLab。

  8. 在虚拟机本机的Google Chrome浏览器中,访问之前Jupyter Notebook在屏幕输出中给出的链接,即可打开使用Jupyter Notebook。新建一个 Notebook,选择Kernel,如下图所示:
    pAX9yNT.png
    至此,PySpark 基于 Jupyter Notebook 的开发环境已经就绪。

1.3 安装Google Chrome浏览器

1.3.1 下载Google Chrome 浏览器
  1. 下载 deb 安装包 到 Google 的官网下载 Google Chrome 浏览器。在国内不能访问主站 google.com,只能访问国内的二级域名站点 google.cn。我们在 Windows 宿主机上打开 Edge 浏览器,访问网址google浏览器下载网址 。网页默认是下载 Windows 的安装包,但这里需要的是 Ubuntu 的 deb 安装包,请翻到页面底部,选择“Other Platforms”,再选择 “Linux”。例如,这里下载得到文件: google-chrome-stable_current_amd64.deb 将这个文件放到共享文件夹,虚拟机 ubuntuvm1 可用通过文件夹 /mediasf_vmshare/ 访问。
1.3.2 准备环境
  1. 将操作系统更新到最新
    sudo apt update -y
    sudo apt upgrade -y
    
  2. 安装依赖包
    sudo apt install -y fonts-liberation libgbm1 xdg-utils
    
1.3.3 安装
  1. 安装
    sudo dpkg -i /media/sf_vmshare/google-chrome-stable_current_amd64.deb
    
  2. 卸载(供参考)
    sudo dpkg --remove google-chrome-stable
    
1.3.4 后台运行Google Chrome 浏览器
  1. 运行浏览器
    google-chrome &
    
    当浏览器窗口打开时,任意访问一个网站,应该能够看到内容正常显示。

1.4 访问 Jupyter Notebook

  1. 将查询应用于实际的Android应用程序数据集,打开Notebook后,通过在单元格中编写和执行代码来开始浏览Android应用程序数据集,androidapps.csv数据源在参考链接部分,请将该链接下载到虚拟机的/home/hadoop/datafile/目录下。我们从数据集的前20行开始,以快速了解其内容。
    from pyspark.sql import SparkSession# Spark session & context
    spark = SparkSession.builder.master("local").getOrCreate()
    sc = spark.sparkContext# Reading CSV file from /sparkdata folder
    csv_path = "/home/hadoop/datafile/androidapps.csv"  # This will read any CSV file in the /sparkdata folder
    df = spark.read.csv(csv_path, header=True, inferSchema=True)  # Assuming the CSV has a header# Show the DataFrame
    df.show()# If you want to perform any action on DataFrame
    # For instance, to get the count of rows:
    print("Number of rows:", df.count())# Create a temporary view to query the DataFrame using SQL
    df.createOrReplaceTempView("androidapps")
    
    在此代码中,创建了SparkSession和SparkContext以与Apache Spark交互。SparkSession(spark)充当Spark功能的入口点,同时,SparkContext (sc) 管理 Spark 任务和资源的执行,特别是对于较低级别的操作。它们共同支持数据处理任务,例如读取 CSV 文件(spark.read.csv)、显示DataFrame内容 (df.show()) 以及创建临时视图 (df.createOrReplaceTempView) 以使用 SQL 语法进行查询,以下只展示部分输出结果。
    输出:
    +--------------------+--------------+------+-------+----+---------+----+-----+
    |                 App|      Category|Rating|Reviews|Size| Installs|Type|Price|
    +--------------------+--------------+------+-------+----+---------+----+-----+
    |Photo Editor & Ca...|ART_AND_DESIGN|   4.1|    159|  19|  10000.0|Free|  0.0
    | Coloring book moana|ART_AND_DESIGN|   3.9|    967|  14| 500000.0|Free|  0.0|
    |U Launcher Lite –...|ART_AND_DESIGN|   4.7|  87510| 8.7|5000000.0|Free|  0.0|
    |Sketch - Draw & P...|ART_AND_DESIGN|   4.5| 215644|  25|    5.0E7|Free|  0.0|
    |Pixel Draw - Numb...|ART_AND_DESIGN|   4.3|    967| 2.8| 100000.0|Free|  0.0|
    +--------------------+--------------+------+-------+----+---------+----+-----+
    

2. 解锁PySpark SQL的强大功能

2.1 显示特定列

# Selecting specific columns 'App', 'Category', and 'Rating'
selected_columns_df = df.select('App', 'Category', 'Rating')# Show the resulting DataFrame
selected_columns_df.show()

使用select关键字,将显示DataFrame df中的特定列 “App”、“Category” 和 “Rating”,以下只展示部分输出结果。
输出:

+--------------------+---------+------+
|                 App| Installs|Rating|
+--------------------+---------+------+
|U Launcher Lite –...|5000000.0|   4.7|
|Sketch - Draw & P...|    5.0E7|   4.5|
|Tattoo Name On My...|    1.0E7|   4.2|
|FlipaClip - Carto...|5000000.0|   4.3|
|        ibis Paint X|    1.0E7|   4.6|
+--------------------+---------+------+

2.2 安装量超过 100 万且评分高于 4 的应用

from pyspark.sql.functions import col# Applying filter conditions using DataFrame functions
query_result = df.select('App', 'Installs', 'Rating') .filter((col('Installs') > 1000000) & (col('Rating') > 4.0))# Show the query result
query_result.show()

在此代码中,我们根据两个条件筛选(使用filter关键字)DataFrame 行:“安装”大于1,000,000和“评级”大于4.0& 是一个条件运算符,它结合了两个条件,只有当两个条件同时为 true时才计算为true。必须满足这两个条件,才能在查询结果中展示一行!以下只展示部分输出结果。
输出:

+--------------------+---------+------+
|                 App| Installs|Rating|
+--------------------+---------+------+
|U Launcher Lite –...|5000000.0|   4.7|
|Sketch - Draw & P...|    5.0E7|   4.5|
|Tattoo Name On My...|    1.0E7|   4.2|
|FlipaClip - Carto...|5000000.0|   4.3|
|        ibis Paint X|    1.0E7|   4.6|
|  Floor Plan Creator|5000000.0|   4.1|
+--------------------+---------+------+

2.3 显示不同的类别和流派

# Selecting distinct values from the 'Category' and 'Genres' columns
distinct_categories_genres_df = df.select('Category', 'Genres').distinct()# Show the distinct categories and genres
distinct_categories_genres_df.show()

此代码使用distinct关键字,该关键字从DataFrame的 “Category” 和 “Genres” 列中选择值的唯一组合并显示它们,以下只展示部分输出结果。
输出:

+-------------------+--------------------+
|           Category|              Genres|
+-------------------+--------------------+
|              TOOLS|               Tools|
|          EDUCATION|Education;Music &...|
|            FINANCE|             Finance|
|             FAMILY|  Casual;Brain Games|
|BOOKS_AND_REFERENCE|   Books & Reference|
+-------------------+--------------------+

GROUP BYORDER BY查询
GROUP BY语句通过根据您在查询中指定的列对数据进行分组来对数据进行排序,它主要用于聚合函数。ORDER BY允许您按字母或数字以及升序或降序组织结果集。

2.4 对Type列分组并获取免费和付费应用总数

# Simple group by 'Type' and count the number of apps in each type
type_counts_df = df.groupBy('Type').count()# Show the resulting DataFrame
type_counts_df.show()

此代码在 “Type” 列上执行一个简单的group by操作,并计算每种类型的应用程序数量,我们将其与count聚合函数一起使用,我们很快就会了解更多!
输出:

+------+-----+
|  Type|count|
+------+-----+
|102248|    1|
|  NULL|    1|
|  Free| 8900|
|  Paid|  756|
|  2509|    1|
+------+-----+

2.5 获取评分最高的应用

from pyspark.sql.functions import desc
# Simple order by 'Rating' in descending order
sorted_apps_df = df.orderBy(desc('Rating'))
# Show the resulting DataFrame
sorted_apps_df.show()

该代码按称为“Rating”列的特定列降序对 DataFrame 进行排序,从而根据应用程序的评级从最高到最低排序,以下只展示部分输出结果。
输出:

|                 App|          Category|Rating|      Reviews|
+--------------------+------------------+------+-------------+
|"Women""s Health ...|              Face|  Body|weight lose)"|
|Mindvalley U Tall...|            EVENTS|     5|            1|
|NCLEX Multi-topic...|           MEDICAL|     5|            1|
|      FHR 5-Tier 2.0|           MEDICAL|     5|            2|
|Anatomy & Physiol...|           MEDICAL|     5|            1|
|        Sway Medical|           MEDICAL|     5|            3|
+--------------------+------------------+------+-------------+

2.6 以递减方式获取每个类别的应用数量

# Group by 'Category' and calculate the count of apps in each category, then order by the count in descending order
category_counts_df = df.groupBy('Category').count().orderBy(desc('count'))# Show the resulting DataFrame
category_counts_df.show()

我们按 “Category” 列进行分组,以计算每个类别中的应用数量。其次,我们按计数降序对结果进行排序,首先显示应用程序数量最多的类别,以下只展示部分输出结果。
输出:

+-------------------+-----+
|           Category|count|
+-------------------+-----+
|             FAMILY| 1832|
|               GAME|  959|
|              TOOLS|  827|
|           BUSINESS|  420|
|            MEDICAL|  395|
+-------------------+-----+

聚合函数:
聚合函数用于对 DataFrame 中的数据组执行计算。这些函数汇总或计算数据集的统计数据,例如查找列的总和、计数、平均值、最小值或最大值。它们对于有效地汇总和分析大型数据集至关重要。

2.7 计算安装总数

from pyspark.sql.functions import sum# Calculate the total number of installs
total_installs = df.select(sum('Installs')).collect()[0][0]
print("Total number of installs:", total_installs)

我们通过将sum聚合函数应用于 “Installs” 列来获取DataFrame中所有应用的安装总数。
输出:

Total number of installs: 75116937535.6

2.8 查找应用平均评分

from pyspark.sql.functions import avg# Find the average rating of all apps
average_rating = df.select(avg('Rating')).collect()[0][0]
print("Average rating of all apps:", average_rating)

与前面的查询类似,我们使用average( avg)聚合函数。
输出:

Average rating of all apps: 4.173212106419335

2.9 列出应用的最高和最低价格

from pyspark.sql.functions import max, min# Find the maximum and minimum values in the 'Price' column
max_price = df.select(max('Price')).collect()[0][0]
min_price = df.select(min('Price')).collect()[0][0]print("Maximum Price:", max_price)
print("Minimum Price:", min_price)

我们利用maxmin聚合函数来查找“Price”列中的最大值和最小值。
输出:

Maximum Price: 400.0
Minimum Price: 0.0

继续进行稍微复杂的查询。

2.10 计算平均评分最高的类别

from pyspark.sql.functions import avg# Calculate the average rating for each category
avg_rating_df = df.groupBy('Category').agg(avg('Rating').alias('AvgRating'))# Find the maximum average rating
max_avg_rating = avg_rating_df.select(max('AvgRating')).collect()[0][0]# Find the category with the highest average rating
max_avg_rating_category = avg_rating_df.filter(avg_rating_df['AvgRating'] == max_avg_rating) .select('Category').first()[0]print("Category with the highest average rating:", max_avg_rating_category)

我们首先使用avg聚合函数获得每个类别的平均评分,然后我们通过查找最高平均评分并过滤以检索相应的类别来确定具有最高平均评分的类别。
输出:

Category with the highest average rating: EVENTS

2.11 计算安装总数最高的类别

from pyspark.sql.functions import sum, max# Calculate the total number of installs for each category
total_installs_df = df.groupBy('Category').agg(sum('Installs').alias('TotalInstalls'))# Find the maximum total number of installs
max_installs = total_installs_df.select(max('TotalInstalls')).collect()[0][0]# Find the category with the highest total number of installs
max_installs_category = total_installs_df.filter(total_installs_df['TotalInstalls'] == max_installs) .select('Category').first()[0]print("Category with the highest total number of installs:", max_installs_category)

此代码使用sum聚合函数计算每个类别的安装总数,然后通过查找最大总安装量来确定安装总数最高的类别。
输出:

Category with the highest total number of installs: GAME

标量函数
标量函数对 DataFrame 列中的单个值进行操作,为每个输入值生成单个输出值。这些函数可用于逐行操作、转换或计算 DataFrame 列中的值。

2.12 将应用名称转换为大写

from pyspark.sql.functions import upper# Convert the 'App' column values to uppercase
uppercase_apps_df = df.select('App', upper('App').alias('UppercaseApp'))# Show the result
uppercase_apps_df.show(truncate=False)

此代码段利用upper()标量函数将 DataFrame 的 “App” 列中的值转换为大写,创建名为 “UppercaseApp” 的新列,以下只展示部分输出结果。
输出:

+----------------------------------------+----------------------------------------+
|App                                     |UppercaseApp                            |
+----------------------------------------+----------------------------------------+
|Coloring book moana                     |COLORING BOOK MOANA                     |
|Sketch - Draw & Paint                   |SKETCH - DRAW & PAINT                   |
|Pixel Draw - Number Art Coloring Book   |PIXEL DRAW - NUMBER ART COLORING BOOK   |
+----------------------------------------+----------------------------------------+

2.13 将应用名称与其类型连接

from pyspark.sql.functions import concat# Concatenate the 'App' and 'Genres' columns
concat_apps_df = df.select('App', 'Genres', concat('App', 'Genres').alias('AppWithGenres'))# Show the result
concat_apps_df.show(truncate=False)

我们使用concat() 标量函数来连接(连接)“App” 和 “Genres” 列中的值,创建一个名为 “AppWithGenres”的新列,以下只展示部分输出结果。
输出:

+--------------------------------------------------+-------------------------+
|App                                               |Genres                   |
+--------------------------------------------------+-------------------------+
|Photo Editor & Candy Camera & Grid & ScrapBook    |Art & Design             |
|Coloring book moana                               |Art & Design;Pretend Play|
|U Launcher Lite – FREE Live Cool Themes, Hide Apps|Art & Design             |
|Sketch - Draw & Paint                             |Art & Design             |
|Pixel Draw - Number Art Coloring Book             |Art & Design;Creativity  |
+--------------------------------------------------+-------------------------+

窗口函数:
窗口函数用于跨与当前行相关的一组行执行计算。可以在定义的行窗口上计算运行总计、排名和其他分析函数。这些函数通常与 Window 规范结合使用,Window 规范定义函数操作的窗口框架,包括框架内行的分区和排序。

2.14 为每行分配行号

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number# Define a window specification
window_spec = Window.orderBy('App')# Assign a unique number to each row based on the order of the 'App' column
row_number_df = df.withColumn('RowNumber', row_number().over(window_spec))# Show the result
row_number_df.show()

PySpark 中的 window 函数用于根据 “App” 列的顺序为 DataFrame 的每一行分配唯一的行号,从而允许在不改变原始顺序的情况下对行进行顺序编号。正如我们在输出中看到的,最后一列包含 Row Number,以下只展示部分输出结果。

输出:

+--------------------+-------------------+-----------+-------------+---------+
|                 App|           Category|     Rating|      Reviews|RowNumber|
+--------------------+-------------------+-----------+-------------+---------+
|"""i DT"" Fútbol....|             SPORTS|       NULL|           27|        1|
|"Alphabet ""H"" P...|    PERSONALIZATION|        4.5|            2|        2|
|"Eat Fast Prepare...|     FOOD_AND_DRINK|        4.6|         4925|        3|
|"Official QR Code...|       PRODUCTIVITY|        4.4|         3031|        4|
|"The FN ""Baby"" ...|BOOKS_AND_REFERENCE|       NULL|            1|        5|
+--------------------+-------------------+-----------+-------------+---------+

2.15 根据 Rating 分配密集 Rank

from pyspark.sql.functions import dense_rank# Define a window specification
window_spec = Window.partitionBy('Category').orderBy('Rating')# Assign a dense rank to each row within each category based on the 'Rating' column
dense_rank_df = df.withColumn('DenseRank', dense_rank().over(window_spec))# Show the result
dense_rank_df.show()

我们使用窗口函数根据 “Rating” 列为每个类别中的每一行分配一个密集的排名(将连续的整数分配给具有相同排名顺序的项目,而不会在排名之间留下间隙),从而能够对特定类别中的数据进行排名,同时保持每个分区内的原始排序,以下只展示部分输出结果。
输出:

+--------------------+--------------+-----------+--------+---------+
|                 App|      Category|     Rating| Reviews|DenseRank|
+--------------------+--------------+-----------+--------+---------+
|"Yanosik: ""antyr...|  traffic jams| navigation| camera"|        1|
|Mcqueen Coloring ...|ART_AND_DESIGN|       NULL|      61|        1|
|Easy DIY CD Craft...|ART_AND_DESIGN|       NULL|       7|        1|
|Test Application ...|ART_AND_DESIGN|       NULL|       0|        1|
|Learn To Draw Kaw...|ART_AND_DESIGN|        3.2|      55|        2|
+--------------------+--------------+-----------+--------+---------+

2.16 根据评级对应用排名并显示每类前3个应用

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col# Define a window specification partitioned by 'Category' and ordered by 'Rating' in descending order
window_spec = Window.partitionBy('Category').orderBy(col('Rating').desc())# Use row_number() to assign a unique row number within each category based on the rating
ranked_apps_df = df.withColumn('Rank', row_number().over(window_spec))# Filter only the top 3 apps within each category based on their rating
top_3_apps_df = ranked_apps_df.filter(col('Rank') <= 3)# Show the result
top_3_apps_df.select('App', 'Category', 'Rating', 'Rank').show()

这段代码使用窗口函数按降序排列每个类别中的应用程序,然后过滤以仅保留每个类别中排名前3的应用程序,显示结果及其各自的排名,以下只展示部分输出结果。
输出:

+--------------------+-------------------+-----------+----+
|                 App|           Category|     Rating|Rank|
+--------------------+-------------------+-----------+----+
|"Yanosik: ""antyr...|       traffic jams| navigation|   1|
|Spring flowers th...|     ART_AND_DESIGN|          5|   1|
|Harley Quinn wall...|     ART_AND_DESIGN|        4.8|   2|
|   Cardi B Wallpaper|     ART_AND_DESIGN|        4.8|   3|
|Tickets SDA 2018 ...|  AUTO_AND_VEHICLES|        4.9|   1|
+--------------------+-------------------+-----------+----+

2.17 将应用大小从MB转换为KB

from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf# Define the UDF to convert size from MB to KB
def mb_to_kb(size_mb):try:return float(size_mb) * 1024.0except ValueError:return None# Register the UDF
mb_to_kb_udf = udf(mb_to_kb, FloatType())# Apply the UDF to the 'Size' column
android_apps_df = df.withColumn('Size_KB', mb_to_kb_udf('Size'))# Show the result
android_apps_df.select('App', 'Size', 'Size_KB').show()

这段代码定义了一个名为mb_to_kb的用户定义函数(UDF),将大小从兆字节(mb)转换为千字节(kb)。该函数接受以MB为单位的大小作为输入,将其转换为KB,然后返回结果。使用PySpark的udf()函数注册UDF,指定输入和输出类型。然后,它被应用到“Size”列,最后我们创建一个名为“Size_KB”的新列,以下只展示输出部分结果。
输出:

+--------------------+----+-------+
|                 App|Size|Size_KB|
+--------------------+----+-------+
|Photo Editor & Ca...|  19|19456.0|
| Coloring book moana|  14|14336.0|
|U Launcher Lite –...| 8.7| 8908.8|
|Sketch - Draw & P...|  25|25600.0|
|Pixel Draw - Numb...| 2.8| 2867.2|
+--------------------+----+-------+

2.18 将应用分为各个类型

from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf# Define the UDF to categorize apps by size
def categorize_size(size):try:size = float(size)  # Convert size to floatif size <= 10:return 'Small'elif 10 < size <= 100:return 'Medium'else:return 'Large'except ValueError:return None  # Return None for non-numeric values# Register the UDF
categorize_size_udf = udf(categorize_size)# Apply the UDF to the 'Size' column
android_apps_df = df.withColumn('Size_Category', categorize_size_udf('Size'))# Show the result
android_apps_df.select('App', 'Size', 'Size_Category').show()

我们创建了一个名为categorize _ size的UDF,它根据应用程序的大小将其分为三类:“小型”、“中型”和“大型”。该函数将应用程序的大小作为输入,将其转换为浮点型,并根据预定义的大小阈值分配类别。我们创建一个名为“Size_Category”的新列来保存类别,以下只展示部分输出结果。
输出:

+--------------------+----+-------------+
|                 App|Size|Size_Category|
+--------------------+----+-------------+
|Photo Editor & Ca...|  19|       Medium|
| Coloring book moana|  14|       Medium|
|U Launcher Lite –...| 8.7|        Small|
|Sketch - Draw & P...|  25|       Medium|
|Pixel Draw - Numb...| 2.8|        Small|
+--------------------+----+-------------+

2.19 提取应用的第一个单词

from pyspark.sql.functions import udf
from pyspark.sql.types import StringTypedef extract_first_word(app_name):try:return app_name.split()[0]except IndexError:return None# Register the UDF
extract_first_word_udf = udf(extract_first_word, StringType())# Apply the UDF to the 'App' column
android_apps_df = df.withColumn('First_Word', extract_first_word_udf('App'))# Show the result
android_apps_df.select('App', 'First_Word').show()

构建名为extract_first_word的 UDF,用于从每个应用程序名称中提取第一个单词。该函数将应用程序名称作为输入,按空格将其拆分,并返回第一个单词。我们创建名为 “First_Word” 的列。最后,显示 DataFrame!以下只展示部分输出结果。
输出:

+--------------------+----------+
|                 App|First_Word|
+--------------------+----------+
|Photo Editor & Ca...|     Photo|
| Coloring book moana|  Coloring|
|U Launcher Lite –...|         U|
|Sketch - Draw & P...|    Sketch|
|Pixel Draw - Numb...|     Pixel|
+--------------------+----------+

2.20 计算Optimized App Score

from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
import math# Define the UDF to calculate the optimized score
def calculate_optimized_score(rating, reviews, installs):try:# Convert installs and reviews to numeric valuesinstalls_numeric = int(installs)  # No need for replace() as installs is already numericreviews_numeric = int(reviews)# Apply log transformation to reviews and installs to reduce skewnesslog_reviews = math.log1p(reviews_numeric)  # log1p is log(1+x) to handle 0 reviews gracefullylog_installs = math.log1p(installs_numeric)# Normalize rating to be in a 0-1 range if it's not already (assuming 5 is the max rating)normalized_rating = float(rating) / 5# Assign weights to each component based on desired importanceweight_rating = 0.5  # Example weight, adjust based on preferenceweight_reviews = 0.25weight_installs = 0.25# Calculate the weighted scorescore = (normalized_rating * weight_rating) + (log_reviews * weight_reviews) + (log_installs * weight_installs)return scoreexcept ValueError as e:# Handle cases where the data cannot be converted to numeric valuesprint(f"Error processing input: {e}")return None# Register the UDF with a FloatType return
optimized_score_udf = udf(calculate_optimized_score, FloatType())# Apply the UDF to calculate the optimized score for each app
android_apps_df = df.withColumn('Optimized_Score', optimized_score_udf('Rating', 'Reviews', 'Installs'))# Show the result
android_apps_df.select('App', 'Rating', 'Reviews', 'Installs', 'Optimized_Score').show()

calculate_optimized_score定义了一个用户定义的函数 (UDF),用于根据Android应用的评分、评价和安装量计算其优化分数。它将对reviews和installing应用对数变换以减少偏度,并为每个组件分配权重以调整它们对最终分数的影响。然后,注册 UDF 并将其应用于包含应用程序数据的DataFrame,从而添加“Optimized_Score”列。结果最终显示出来,以下只展示部分输出结果。
输出:

+--------------------+------+-------+---------+---------------+
|                 App|Rating|Reviews| Installs|Optimized_Score|
+--------------------+------+-------+---------+---------------+
|Photo Editor & Ca...|   4.1|    159|  10000.0|      3.9814036|
| Coloring book moana|   3.9|    967| 500000.0|      5.3893995|
|U Launcher Lite –...|   4.7|  87510|5000000.0|      7.1711173|
|Sketch - Draw & P...|   4.5| 215644|    5.0E7|      7.9522305|
|Pixel Draw - Numb...|   4.3|    967| 100000.0|       5.027042|
+--------------------+------+-------+---------+---------------+

3. 总结与展望

在本学期的学习中,PySpark SQL展现出强大的功能。环境搭建上,集成Anaconda3与Spark,为后续实验奠定基础。基本SQL操作方面,select查询、group by和order by等操作让数据处理与分析变得高效且精准。函数学习使我们能深入挖掘数据价值,聚合函数、标量函数和窗口函数在复杂数据分析中发挥关键作用。用户定义函数(UDF)更是拓展了应用场景。
通过这篇文章,我们希望大家都能了解这个工具功能的强大,它可以有效地处理大数据,解锁洞察力并推动明智的决策。无论您是数据科学家、分析师还是工程师,学习查询和使用 PySpark 进行数据分析都能让您掌握在当今数据驱动型环境中应对实际挑战的技能。


http://www.dtcms.com/a/540666.html

相关文章:

  • MousePlus(鼠标增强工具) 中文绿色版
  • 源码学习:MyBatis源码深度解析与实战
  • RAG项目中知识库的检索优化
  • Java IO 流之转换流:InputStreamReader/OutputStreamWriter(字节与字符的桥梁)
  • 熊掌号做网站推广的注意事项品牌网页
  • shell脚本curl命令发送钉钉通知(加签方式)——筑梦之路
  • [无人机sdk] AdvancedSensing | 获取实时视频流 | VGA分辨率
  • 海康相机通过透明通道控制串口收发数据
  • 建网站科技公司做校服的网站
  • 设计模式简介
  • PyTorch torch.unique() 基础与实战
  • 【图像处理基石】图像滤镜的算法原理:从基础到进阶的技术解析
  • 信宜网站建设网站开发配置表格
  • 提示词(Prompt)——指令型提示词在大模型中的调用(以 Qwen 模型为例)
  • python-88-实时消费kafka数据批量追加写入CSV文件
  • 提示词(Prompt)——链式思维提示词(Chain-of-Thought Prompting)在大模型中的调用(以 Qwen 模型为例)
  • 用三个面中心点求解长方体位姿:从几何直觉到线性代数实现
  • 网站备案ip查询网站做网站首页ps分辨率多少
  • 免费建一级域名网站千锋教育广州校区
  • CSS3属性(三)
  • 开源底盘+机械臂机器人:Lekiwi驱动链路分析
  • 通过 useEventBus 和 useEventCallBack 实现与原生 Android、鸿蒙、iOS 的事件交互
  • iOS 26 iPhone 使用记录分析 多工具组合构建全方位设备行为洞察体系
  • 【Unity】HTModuleManager(三)Markdown语法的Unity编辑器方言
  • 如何将安卓手机备份到电脑?7种方法
  • 基于SpringBoot+Vue的购物商城(支付宝沙盒支付、物流快递API、WebSocket及时通讯、协同过滤算法、Echarts图形化分析)
  • MYSQL-超全基础以及用法--仅个人的速记笔记(1)
  • 31、LangChain开发框架(八)-- LangChain 数据分析智能体实战
  • 建设局域网网站盐城市亭湖区城乡建设网站
  • 6.2 大数据方法论与实践指南-任务元数据