PySpark RDD(弹性分布式数据集)

2025年3月17日 | 阅读 7 分钟

在本教程中,我们将学习 PySpark 的构建块,称为弹性分布式数据集,通常称为 PySpark RDD。

正如我们在 PySpark 简介中讨论的那样,Apache Spark 是大数据分析的最佳框架之一。 当与 Python 集成时,该技术变得更加有效和容易。 它为我们提供了非常方便和易于使用的 API,称为 PySpark。

什么是 PySpark RDD?

RDD 是 PySpark 最重要的部分,或者我们可以说是 PySpark 的支柱。 它是最基本的无模式数据结构之一,可以处理结构化和非结构化数据。 与网络和磁盘共享相比,它可以使内存数据共享速度提高 10 - 100 倍。

PySpark RDD

“弹性分布式数据集 (RDD) 是一种分布式内存抽象,可帮助程序员对大型集群执行内存计算。” RDD 的一个重要优势是容错性,这意味着如果发生任何故障,它会自动恢复。

RDD 根据键将数据分成较小的部分。 将数据分成更小的块的好处是,如果一个执行器节点发生故障,另一个节点仍然会处理数据。 由于相同的数据块在多个执行器节点上复制,因此它们能够从任何问题中快速恢复。

它提供了通过绑定多个节点来针对数据集非常快速地执行功能计算的功能。

RDD 在创建时变为不可变。 不可变意味着我们无法修改已创建的对象,但可以肯定地对其进行转换。

RDD 的特点

PySpark RDD 的各种特点如下

PySpark RDD
  • 内存计算

PySpark 提供了内存计算的功能。 计算结果存储在分布式内存 (RAM) 而不是稳定存储 (disk) 中。 它提供非常快速的计算

  • 延迟执行

PySpark RDD 中的转换是延迟的。 它不会立即计算结果,这意味着在触发操作之前不会开始执行。 当我们在 RDD 中调用一些操作进行转换时,它不会立即执行。 延迟执行在节省计算开销方面起着重要作用。 它通过减少查询数量来提供优化。

  • 容错

RDD 跟踪数据沿袭信息以自动重建丢失的数据。 如果 RDD 的任何分区中发生故障,则可以从原始的容错输入数据集重新计算该分区以创建它。

  • 不可变性

可以随时检索创建的数据,但其值无法更改。 RDD 只能通过确定性操作创建。

  • 分割

RDD 是各种数据项的集合,这些数据项的尺寸非常大。 由于其大小,它们无法放入单个节点,并且必须在各个节点上进行分区。

  • 持久化

它是一种优化技术,我们可以在其中保存 RDD 评估的结果。 它存储中间结果,以便我们可以在需要时进一步使用它。 它降低了计算复杂度。

  • 粗粒度操作

粗粒度操作意味着我们可以转换整个数据集,而不是数据集上的单个元素。 另一方面,细粒度意味着我们可以转换数据集上的单个元素。

创建 RDD

PySpark 提供了两种创建 RDD 的方法:加载外部数据集或分发一组对象的集合。 我们可以使用 parallelize() 函数创建 RDD,该函数接受程序中已存在的集合并将相同集合传递给 Spark Context。 这是创建 RDD 的最简单方法。 考虑以下代码

  • 使用 parallelize()

输出

+----+----+----+-----+
|col1|col2|col3| col4|
+----+----+----+-----+
|  12|  20|  35|a b c|
|  41|  58|  64|d e f|
|  70|  85|  90|g h i|
+----+----+----+-----+
  • 使用 createDataFrame() 函数

输出

上面的代码将给出以下 RDD 数据。

+--------+------+----------+----------+
|Roll_Num|  Name|Percentage|Department|
+--------+------+----------+----------+
|  009001|  Anuj|       70%|B.tech(cs)|
|  009002|Sachin|       80%|B.tech(cs)|
|  008005|Yogesh|       94%|       MCA|
|  007014|Ananya|       98%|       MCA|
+--------+------+----------+----------+
  • 使用读取和加载函数

在这里,我们使用 read() 函数从 .csv 文件中读取数据集。

输出

+---+--------------------+-------------+--------------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+
|_c0|          Track.Name|  Artist.Name|         Genre|Beats.Per.Minute|Energy|Danceability|Loudness..dB..|Liveness|Valence.|Length.|Acousticness..|Speechiness.|Popularity|
+---+--------------------+-------------+--------------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+
|  1|            Se?orita| Shawn Mendes|  canadian pop|             117|    55|          76|            -6|       8|      75|    191|             4|           3|        79|
|  2|               China|     Anuel AA|reggaeton flow|             105|    81|          79|            -4|       8|      61|    302|             8|           9|        92|
|  3|boyfriend (with S...|Ariana Grande|     dance pop|             190|    80|          40|            -4|      16|      70|    186|            12|          46|        85|
|  4|Beautiful People ...|   Ed Sheeran|           pop|              93|    65|          64|            -8|       8|      55|    198|            12|          19|        86|
|  5|Goodbyes (Feat. Y...|  Post Malone|       dfw rap|             150|    65|          58|            -4|      11|      18|    175|            45|           7|        94|
+---+--------------------+-------------+--------------+----------------+------+------------+--------------+--------+--------+-------+--------------+------------+----------+
only showing top 5 rows
root
 |-- _c0: integer (nullable = true)
 |-- Track.Name: string (nullable = true)
 |-- Artist.Name: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Beats.Per.Minute: integer (nullable = true)
 |-- Energy: integer (nullable = true)
 |-- Danceability: integer (nullable = true)
 |-- Loudness..dB..: integer (nullable = true)
 |-- Liveness: integer (nullable = true)
 |-- Valence.: integer (nullable = true)
 |-- Length.: integer (nullable = true)
 |-- Acousticness..: integer (nullable = true)
 |-- Speechiness.: integer (nullable = true)
 |-- Popularity: integer (nullable = true)

PySpark 中的 RDD 操作

RDD 支持两种类型的操作

1. 转换

转换是用于创建新 RDD 的过程。 它遵循的原则是延迟执行(在触发操作之前不会开始执行)。 下面给出了一些转换

  • map
  • flatMap
  • filter
  • distinct
  • reduceByKey
  • mapPartitions
  • sortBy

2. 操作

操作是在 RDD 上应用的过程,用于启动 Apache Spark 以应用计算并将结果传递回驱动程序。 以下是一些操作

  • collect(收集)
  • collectAsMap
  • reduce
  • countByKey/countByValue
  • first

RDD 中的各种操作

应用于 RDD 的操作如下

PySpark RDD
  • count()

它返回 RDD 中可用的元素数量。 考虑以下程序。

输出

Number of elements present in RDD : 7
  • collect()

此函数返回 RDD 中的整个元素。

输出

['python', 'java', 'hadoop', 'c', 'C++', 'spark vs hadoop', 'pyspark and spark']
  • foreach(f)

foreach(f) 函数仅返回那些匹配 foreach 内部函数条件的元素。

输出

python
java
hadoop 
C
 C++
spark vs hadoop 
pyspark and spark
  • cc

cc 操作返回一个新的 RDD,其中包含满足过滤器内部函数的元素。 在以下示例中,我们过滤掉包含 "spark" 的字符串。

输出

Filtered RDD : ['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']
  • map(f, presevesPartitioning = False)

它以键值对的形式返回新的 RDD,并将每个字符串映射为值 1。考虑以下示例

输出

Key value pair -> [('python', 1), ('java', 1), ('hadoop', 1), ('c', 1), ('C++', 1), ('spark vs hadoop', 1), ('pyspark and spark', 1)]
  • reduce(f)

它在 RDD 中执行指定的交换和关联二元运算。 考虑以下示例

输出

Adding all the elements : 15
  • cache()

我们可以使用 cache() 函数检查 RDD 是否已缓存。

输出

Words got chached > True
  • join(other, numPartition = None)

它返回具有匹配键的 RDD,并以配对形式返回其值。 我们将获得两个不同的 RDD,用于两对元素。 考虑以下代码

输出

Join RDD -> [('hadoop', (3, 4)), ('pyspark', (1, 2))]

来自 RDD 的 DataFrame

PySpark 提供了两种将 RDD 转换为 DF 的方法。 这些方法如下

  • toDF()

当我们通过 parallelize 函数创建 RDD 时,我们应该在 DataFrame 中标识相同的行元素,并用括号将这些元素括起来。 row() 可以接受 **kwargs 参数。

  • createDataFrame(rdd, schema)

我们还可以使用 createDataFrame 将 RDD 转换为 DataFrame。 考虑以下示例