Python PySpark collect() - 从DataFrame检索数据

2025 年 1 月 5 日 | 11 分钟阅读

引言

Apache Spark 已经证明自己是一个理想且有用的大数据处理框架。PySpark 作为 Apache Spark 的 Python API,为开发人员提供了无缝利用此处理工具的能力。PySpark 中可用的数据帧 API 与 Pandas 数据帧类似,前者还提供了高级分布式数据结构。用于从 PySpark 数据帧中提取数据的第二个核心功能是 collect()。在本教程中,我们将分析棘手的 collect() 函数,揭示其目的、使用场景、可能出现的问题以及正确使用它的技巧。

理解 PySpark 数据帧

在深入了解 PySpark 数据帧的机制之前,了解 collect() 函数的一些基本原理至关重要。PySpark 数据帧在结构上类似于存储在关系数据库中的表或 Pandas 中的数据帧,两者都具有分布在数据元素上的命名列。PySpark 数据帧为数据处理提供了卓越的解决方案,因为该技术能够高效地处理大规模分布式数据集,从而实现大数据分析。

collect() 函数

PySpark 的 'collect()' 函数读取分布式数据帧的所有记录并将其从站点传输回本地机器。它将所有数据从数据帧的所有分区中提取出来,作为列表或数组返回给驱动程序。

语法

然而,需要注意的是,当处理大型数据帧时,collect() 函数可能会非常耗费资源,因为所有数据都必须集中到一台机器上。这可能导致内存不足错误,因为驱动程序没有足够的内存来缓存来自数据帧的所有功能。

让我们看看 collect() 函数的基本用法

代码实现

输出

Row(Name='Alice', Age=25)
Row(Name='Bob', Age=30)
Row(Name='John', Age=22)

说明

  • PySpark 'collect()' 函数是数据帧上的一个动作。
  • 它从分布式系统上的数据帧中提取数据并将其全部传递到本地机器。
  • 发送的数据库以 Python 行对象列表的形式给出。
  • 语法很简单:'df.collect()',其中 'df' 是 PySpark 数据帧。
  • 它通常用于本地检查、验证和分发解释。
  • 在使用如此大的数据集时,应格外小心,以避免出现内存不足错误。
  • 对从所有分区收集数据的功能性能产生影响。
  • 为了提高效率,操作应分布式进行,并将单独的分区远程保存,而无需在本地收集完整数据。
  • 提供的代码片段显示了一个数据帧的创建,在其上应用了 'collect()',并将此操作的结果呈现在控制台上。

让我们看一些从数据帧中检索所有数据的示例

1. 使用 collect() 检索数据帧中的所有数据

PySpark 中的 'collect()' 函数指的是从数据帧中提取所有可用数据并将其带入本地内存。

让我们看看下面的代码实现

代码实现

输出

Original DataFrame:
+-----+---+
| Name|Age|
+-----+---+
|Alice| 25|
|  Bob| 30|
|David| 22|
+-----+---+


All Collected Data:
Row(Name='Alice', Age=25)
Row(Name='Bob', Age=30)
Row(Name='David', Age=22)

说明

  • 在此示例中,我们从一个更简单的数据帧开始,它只包含两列 'Name' 和 'Age'。首先进行初始表示和可视化,然后使用 collector() 等函数获取所有数据,一旦检索到,就逐行打印。
  • 值得一提的是,使用 'collect()' 处理大型数据帧时应谨慎,尤其是在生产环境中,因为可能会因收集而导致内存消耗过大。对于此类数据集,可以采用其他方法,例如采样或大规模分布式计算,以更有效地管理数据。

2. 使用 Collect() 检索特定行的数据

让我们看看使用 collect() 函数检索特定行数据的代码实现。

代码实现

输出

Original DataFrame:
+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 22|
|  David| 35|
+-------+---+


Collected Data for Rows with Age > 30:
Row(Name='David', Age=35)

说明

  • 在此示例中,我们首先创建一个包含姓名和年龄对应列的数据帧。在显示原始数据帧后,我们指定一个条件(例如,大于 30)来过滤特定行。然后数据帧根据此条件进行过滤,并且 'collect()' 函数只检索满足给定条件的行。
  • 还需要注意的是,在大型数据集中,有时收集特定记录的数据比收集所有数据更容易;这种现象称为记录和危机的选择性。它允许您定位相关的数据子集,而不是将所有列都带回本地机器。

3. 使用 Collect() 检索多行数据

让我们看看使用 collect() 函数检索多行数据的代码实现。

代码实现

输出

Original DataFrame:
+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 22|
|  David| 35|
|    Eva| 28|
+-------+---+


Collected Data for Selected Rows:
Row(Name='Alice', Age=25)
Row(Name='David', Age=35)
Row(Name='Eva', Age=28)

说明

  • 在这里,我们构建一个包含“姓名”和“年龄”列的数据帧,并列出将要选择的 'selected_names'。使用 'isin' 方法创建条件,根据给定的姓名列表过滤行。接下来,对数据帧进行过滤,并应用带有 'collect()' 的数据收集函数。
  • 然而,应该记住,实现收集特定行数据的过程可能比主要由大型数据集支持的所有累积过程更有效。它有助于处理信息的方便部分,因为在数据帧中,将所有值发送到本地机器似乎不合适。

4. 使用 Collect() 检索特定列的数据

让我们看看使用 collect() 函数检索特定列数据的代码实现。

代码实现

输出

Original DataFrame:
+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 22|
|  David| 35|
|    Eva| 28|
+-------+---+


Collected Data from the 'Age' column:
25
30
22
35
28

说明

  • 在此示例中,我们将构建一个包含“姓名”列的数据帧,并特别指定我们要检索其数据的列名(“年龄”)。使用 'select()' 方法选择指定的列,然后应用 collect() 函数从选定的列中获取所有数据。
  • 应该说,当从某个列中检索数据时,结果将是行列表,因此分析师需要从这些行中提取实际值。在这种情况下,'row[0]' 允许访问从选定列中检索到的值。

5. 使用 Collect() 检索多列数据

让我们看看使用 collect() 函数检索特定列数据的代码实现。

代码实现

输出

Original DataFrame:
+-------+---+--------------+
|   Name|Age|    Occupation|
+-------+---+--------------+
|  Alice| 25|      Engineer|
|    Bob| 30|Data Scientist|
|Charlie| 22|       Analyst|
|  David| 35|       Manager|
|    Eva| 28|     Developer|
+-------+---+--------------+


Collected Data from the Selected Columns:
Alice Engineer
Bob Data Scientist
Charlie Analyst
David Manager
Eva Developer

说明

  • 在这种情况下,我们构建一个包含“姓名”、“年龄”和“职业”列的数据帧对象。我们只提及我们需要从其中获取相关字段“姓名”和“职业”的列名。调用 'select()' 方法以选择指示的列,然后通过应用 'collect()' 函数,从这些选定的列中收集数据。
  • 当您运行此代码片段时,将生成由行类型对象实现的列表,并且需要访问实际表示这些行的值。在给定情况下,从列 0 和 1 中分别检索两个值作为 row[0] 和 row[1]
  • 最后,当从多个列收集数据值时,您可能正在处理数据帧的一部分而不是所有列,因此如果处理大量数据集,则可以提取子集并节省时间。这使得人们能够只关注必要的信息,而无需将不必要的数据“拖到”本地机器。

用例

1. 本地数据探索

  • 当处理可以驻留在本地内存中的较小数据集时,最好使用 collect()。
  • 允许数据科学家和分析师使用常用的 Python 库在本地调查数据集的一部分。

2. 数据验证和测试

  • 在测试和验证阶段,collect() 只对一小部分数据集进行角度分析。
  • 在将代码大规模部署之前,有助于识别系统中的缺陷数据、异常或不稳定的趋势等问题。

3. 与 Python 生态系统集成

  • 它在帮助集成各种 Python 库进行机器学习、数据可视化和统计分析方面发挥着重要作用。
  • 使数据科学家能够轻松地在 PySpark 和其他工具之间传输。

4. 调试

  • 使用 collect() 调试数据片段,并使用 collect() 从相对本地的源调试 PySpark 代码。
  • 此外,它还提供了一种交互式和参与式的方法来查找数据转换或转换中的问题。

5. 重点分析

  • 为了处理数据子集,使用 collect() 方法收集特定行或列以执行详细分析。
  • 更重要的是,突出相关事实而无需将数据帧带到本地计算机至关重要。

6. 采样策略

  • 数据帧应该整体收集,但使用 collect() 的采样策略,您可以获得代表完整数据集的数据子集。
  • 它缩小了内存占用,并提供了一种更快地查看数据的方法。

7. 快速数据验证

  • 为了快速验证数据帧上某些操作的行为,使用 collect() 浏览和验证结果。
  • 支持正确转换数据和聚合的工作。

8. 交互式数据探索

  • 适用于数据科学家需要使用其 Python 工具快速检查和转换数据的交互式会话。
  • 使用 collect() 使收集和采样过程变得更容易。

最佳实践和注意事项

1. 内存限制

'collect()' 的主要威胁是内存不足问题,当处理大型数据帧时,这有时会成为一个真正的问题。因此,有必要评估数据帧中的数据量以及驱动程序可用的数据量,方法是使用 'collect()。

2. 性能影响

调用 'collect()' 会导致所有分区中的所有项目从远程位置转移到本地机器,从而带来性能损失。强烈建议适度使用 'collect()',尤其是在生产环境中发布时,并进行采样或诉诸并行处理。

3. 数据倾斜

数据倾斜,其中一个子集的数据量远大于其他子集,可能导致在 'collect()' 期间资源利用不均衡,导致不足。它会影响性能,在使用 'collect()' 处理大型数据时应作为行为之一。

4. 采样策略

与其收集整个数据帧,不如考虑使用合理的采样方法来获取部分信息。这不仅有助于节省内存占用,还可以轻松快速地检查和分析数据。

结论

本质上,PySpark 中的 'collect' 函数是一种从列表中检索数据的高效方法,它使得 Python 易于与任何正在处理的材料一起使用,并且更容易在本地检查错误。然而,可能会出现许多缺点,最主要的是与内存限制和性能下降相关的问题。但是,数据科学家和工程师在使用大型数据集的 'collect()' 时应谨慎处理,并进一步修复其他方案以解决这些问题。熟悉 'collect()' 工作背后的复杂性并集成最佳实践将有助于提高 PySpark 在处理大数据应用程序中的效率和有效性。