PySpark SQL

2024年8月29日 | 阅读 7 分钟

Apache Spark 是 Apache 软件基金会最成功的软件,专为快速计算而设计。 许多行业都在使用 Apache Spark 来寻找他们的解决方案。 PySpark SQL 是 Spark 中的一个模块,它将关系处理与 Spark 的函数式编程 API 集成在一起。 我们可以使用 SQL 查询语言提取数据。 我们可以使用与 SQL 语言相同的查询。

如果您对 RDBMS 有基本的了解,PySpark SQL 将很容易使用,您可以在其中扩展传统关系数据处理的局限性。 Spark 还支持 Hive 查询语言,但 Hive 数据库存在局限性。 开发 Spark SQL 是为了消除 Hive 数据库的缺点。 让我们看一下 Hive 的以下缺点

Hive 的缺点

  • 它无法恢复处理,这意味着如果在工作流程中间执行失败,您无法从卡住的地方恢复。
  • 启用回收站后,我们无法以级联方式删除加密数据库。 这会导致执行错误。 为了删除这种类型的数据库,用户必须使用 Purge 选项。
  • 即席查询是使用 MapReduce 执行的,MapReduce 由 Hive 启动,但是当我们分析中等大小的数据库时,它会延迟性能。
  • Hive 不支持更新或删除操作。
  • 它仅限于子查询支持。

这些缺点是开发 Apache SQL 的原因。

PySpark SQL 简要介绍

PySpark 支持将关系处理与 Spark 的函数式编程集成。 它为各种数据源提供支持,从而可以将 SQL 查询与代码转换交织在一起,从而产生非常强大的工具。

PySpark SQL 在 RDD 和关系表之间建立连接。 它通过声明式 Dataframe API 提供了关系处理和过程处理之间更紧密的集成,该 API 与 Spark 代码集成在一起。

使用 SQL,它可以轻松地供更多用户访问,并改善当前用户的优化。 它还支持大数据中的各种数据源和算法。

PySpark SQL 的功能

PySpark SQL 的功能如下

1) 一致的数据访问

它提供了一致的数据访问方式,意味着 SQL 支持一种共享方式来访问各种数据源,如 Hive、Avro、Parquet、JSON 和 JDBC。 它在将所有现有用户容纳到 Spark SQL 中起着重要作用。

2) 与 Spark 集成

PySpark SQL 查询与 Spark 程序集成。 我们可以在 Spark 程序中使用查询。

它的最大优点之一是开发人员不必手动管理状态故障或使应用程序与批处理作业同步。

3) 标准连接

它通过 JDBC 或 ODBC 提供连接,这两个是商业智能工具连接的行业标准。

4) 用户自定义函数

PySpark SQL 具有语言组合的用户定义函数 (UDF)。 UDF 用于定义新的基于列的函数,该函数扩展了 Spark SQL 的 DSL 词汇表,用于转换 DataFrame。

5) Hive 兼容性

PySpark SQL 在当前数据上运行未经修改的 Hive 查询。 它允许与当前 Hive 数据完全兼容。

PySpark SQL 模块

Spark SQL 和 DataFrames 的一些重要类如下

  • pyspark.sql.SparkSession: 它表示 DataFrame 和 SQL 功能的主要入口点。
  • pyspark.sql.DataFrame: 它表示分组到命名列中的数据的分布式集合。
  • pyspark.sql.Column: 它表示 DataFrame 中的列表达式。
  • pyspark.sql.Row: 它表示 DataFrame 中的一行数据。
  • pyspark.sql.GroupedData: 聚合方法,由 DataFrame.groupBy() 返回。
  • pyspark.sql.DataFrameNaFunctions: 它表示处理丢失数据(空值)的方法。
  • pyspark.sql.DataFrameStatFunctions: 它表示统计功能的方法。
  • pysark.sql.functions: 它表示可用于 DataFrame 的内置函数列表。
  • pyspark.sql.types: 它表示可用数据类型的列表。
  • pyspark.sql.Window: 它用于处理 Window 函数。

考虑以下 PySpark SQL 示例。

输出

 +-----+
|hello|
+-----+
|spark|
+-----+

代码解释

在上面的代码中,我们导入了 findspark 模块并调用了 findspark.init() 构造函数; 然后,我们导入 SparkSession 模块来创建 spark 会话。

from pyspark.sql import SparkSession

Spark 会话可用于创建 Dataset 和 DataFrame API。 SparkSession 还可用于创建 DataFrame、将 DataFrame 注册为表、在表上执行 SQL、缓存表和读取 parquet 文件。

class builder

它是 Spark 会话的构建器。

getOrCreate()

它用于获取现有的 SparkSession, 或者如果不存在,则根据构建器中设置的选项创建一个新的。

其他一些方法

PySpark SQL 的一些方法如下

1. appName(name)

它用于设置应用程序的名称,该名称将显示在 Spark Web UI 中。 参数 name 接受参数的名称。

2. config(key=None, value = None, conf = None)

它用于设置配置选项。 使用此方法设置的选项会自动传播到 SparkConfSparkSession 的配置。

参数

  • key- 配置属性的键名字符串。
  • value- 它表示配置属性的值。
  • conf - SparkConf 的一个实例。

3. master(master)

它设置 Spark master url 以连接到,例如“local”以在本地运行,“local[4]”以在本地运行,使用 4 个内核。

参数

  • master:spark master 的 url。

4. SparkSession.catalog

它是一个接口,用户可以创建、删除、更改或查询底层数据库、表、函数等。

5. SparkSession.conf

它是 spark 的运行时配置接口。 用户可以通过该接口获取和设置与 Spark SQL 相关的所有 Spark 和 Hadoop 配置。

class pyspark.sql.DataFrame

它是分组到命名列中的数据的分布式集合。 DataFrame 类似于 Spark SQL 中的关系表,可以使用 SQLContext 中的各种函数创建。

创建 dataframe 后,我们可以使用 DataFrame 的几个域特定语言 (DSL) 来操作它。 考虑以下示例。

让我们看下面的例子

使用 Spark SQL 查询

在以下代码中,首先,我们创建一个 DataFrame 并执行 SQL 查询以检索数据。 考虑以下代码

输出

+----------------+
|           Genre|
+----------------+
|    canadian pop|
|  reggaeton flow|
|       dance pop|
|             pop|
|         dfw rap|
|             pop|
|      trap music|
|             pop|
|     country rap|
|      electropop|
|       reggaeton|
|       dance pop|
|             pop|
|  panamanian pop|
|canadian hip hop|
|       dance pop|
|           latin|
|         dfw rap|
|canadian hip hop|
|     escape room|
+----------------+
only showing top 20 rows

+---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+
|_c0|          Track.Name|  Artist.Name|Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity|
+---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+
|  4|Beautiful People ...|   Ed Sheeran|  pop|              93|    65|          64|            -8|       8|      55|    198|            12|          19|        86|
|  6|I Don't Care (wit...|   Ed Sheeran|  pop|             102|    68|          80|            -5|       9|      84|    220|             9|           4|        84|
|  8|   How Do You Sleep?|    Sam Smith|  pop|             111|    68|          48|            -5|       8|      35|    202|            15|           9|        90|
| 13|   Someone You Loved|Lewis Capaldi|  pop|             110|    41|          50|            -6|      11|      45|    182|            75|           3|        88|
| 38|Antisocial (with ...|   Ed Sheeran|  pop|             152|    82|          72|            -5|      36|      91|    162|            13|           5|        87|
| 44|                Talk|       Khalid|  pop|             136|    40|          90|            -9|       6|      35|    198|             5|          13|        84|
| 50|Cross Me (feat. C...|   Ed Sheeran|  pop|              95|    79|          75|            -6|       7|      61|    206|            21|          12|        82|
+---+--------------------+-------------+-----+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+

使用 groupBy() 函数

groupBy() 函数收集相似类别的数据。

输出

+----------------+-----+
|           Genre|count|
+----------------+-----+
|        boy band|    1|
|      electropop|    2|
|             pop|    7|
|         brostep|    2|
|        big room|    1|
|       pop house|    1|
|  australian pop|    1|
|             edm|    3|
|  r&b en espanol|    1|
|       dance pop|    8|
|       reggaeton|    2|
|    canadian pop|    2|
|      trap music|    1|
|     escape room|    1|
|  reggaeton flow|    2|
|  panamanian pop|    2|
|     atl hip hop|    1|
|     country rap|    2|
|canadian hip hop|    3|
|         dfw rap|    2|
+----------------+-----+

repartition(numpartitions, *cols)

repartition() 返回一个新的 DataFrame,它是一个分区表达式。 该函数接受两个参数 numpartitions*col。 numpartitions 参数指定目标列数。

输出

+---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+
|_c0|          Track.Name|Artist.Name|  Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity|
+---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+
|  4|Beautiful People ...| Ed Sheeran|    pop|              93|    65|          64|            -8|       8|      55|    198|            12|          19|        86|
|  5|Goodbyes (Feat. Y...|Post Malone|dfw rap|             150|    65|          58|            -4|      11|      18|    175|            45|           7|        94|
| 17|          LA CANCI?N|   J Balvin|  latin|             176|    65|          75|            -6|      11|      43|    243|            15|          32|        90|
|  4|Beautiful People ...| Ed Sheeran|    pop|              93|    65|          64|            -8|       8|      55|    198|            12|          19|        86|
|  5|Goodbyes (Feat. Y...|Post Malone|dfw rap|             150|    65|          58|            -4|      11|      18|    175|            45|           7|        94|
+---+--------------------+-----------+-------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+
only showing top 5 rows

下一个主题PySpark UDF