Spark DataFrame 与非相同连接列的外连接

17 Mar 2025 | 6 分钟阅读

Apache Spark 是一个流行的分布式计算框架,用于大数据处理,它提供了丰富的 API 来处理结构化数据。Spark 使用数据帧(DataFrame)提供了强大的数据处理方式,数据帧类似于关系数据库中的表。处理数据帧时,将它们连接起来是一项重要的操作。在 Spark 中,我们可以使用不同类型的连接来连接两个数据帧,包括内连接(inner join)、左连接(left join)、右连接(right join)和外连接(outer join)。在本文中,我们将重点讨论 Spark 数据帧中连接列非标识时的外连接。

Spark 数据帧提供了一种结构化方式来组织和操作数据,类似于关系数据库中的表。Spark 数据帧高度优化,适用于分布式计算,并提供了丰富的数据操作、转换和分析 API。

Spark 数据帧中的一项关键操作是连接。连接允许我们根据公共列组合两个或多个数据帧,这类似于 SQL 的连接操作。Spark 数据帧支持多种类型的连接,包括内连接、外连接、左连接和右连接。

一种在非标识连接列上执行外连接的方法是重命名其中一个数据帧中的列。我们可以使用 `withColumnRenamed()` 方法重命名列,如前例所示。

在深入研究外连接操作之前,让我们先了解一下 Spark 中的连接操作。Spark 数据帧中的连接操作是通过公共列来组合两个数据帧的一种方式。Spark 支持不同的连接操作类型,如内连接、左连接、右连接和外连接。

在外连接操作中,输出将包含两个数据帧的所有行。如果某一行在另一个数据帧中没有匹配的记录,那么另一数据帧中该行的值将用 null 值填充。然而,当两个数据帧中的连接列非标识时,我们需要采取一些额外的步骤来执行外连接操作。

让我们通过一个例子来理解 Spark 数据帧中带有非标识连接列的外连接操作。假设我们有两个数据帧,“sales”和“customer”,如下所示:

“sales”数据帧包含有关不同客户销售的信息,包括客户 ID、产品 ID 和销售金额。“customer”数据帧包含有关客户的信息,包括他们的 ID、姓名和城市。

要对这两个数据帧执行外连接操作,我们需要有一个公共列来连接这两个数据帧。然而,在这种情况下,两个数据帧中的连接列是非标识的。“sales”数据帧有 `customerID` 列,而“customer”数据帧有 `ID` 列。

为了执行外连接操作,我们可以重命名其中一个数据帧中的列,使其标识。例如,我们可以使用 `withColumnRenamed()` 方法将“customer”数据帧中的 `ID` 列重命名为 `customerID`。

现在,我们可以使用 `join()` 方法执行外连接操作,并将连接类型指定为“outer”。

在上面的代码中,我们首先将连接条件指定为 `sales.customerID == customer.customerID`,这表明我们要基于 `customerID` 列连接这两个数据帧。然后,我们将连接类型指定为“outer”来执行外连接操作。

结果数据帧将包含两个数据帧的所有行,非匹配行的值将用 null 值填充。我们可以进一步处理结果数据帧,以删除任何 null 值或根据需要执行其他操作。

实施

join() 函数的语法

参数

other: DataFrame. 连接的右侧

on : str, list or Column, optional. 列名列表,连接表达式 (Column)。

how : str, optional

standard inner Inner, cross, outer, full, fullouter, full outer, left, leftouter, left outer, right, rightouter, right outer, semi, leftsemi, left semi, anti, leftanti, and left anti are the only options that can be used.

用于 PySpark 中外连接和合并连接列的数据帧

我们将生成两个带有非标识连接列的示例数据帧,以演示 PySpark 数据帧中外连接和合并连接列的概念。我们可以看到“Name”既是第一个数据帧的连接列,也是第二个数据帧的连接列。连接列中的值也不相同。

Outer join Spark dataframe with non-identical join column

使用 Join 函数进行外连接

我们将使用 PySpark 的 `join` 函数对两个数据帧进行外连接。“join”函数接受两个数据帧和连接列名作为输入。外连接操作会返回两个数据帧的所有行以及任何匹配的行。非匹配行的相应列将具有 null 值。

输出

在这里,我们可以看到“dataframe1”和“dataframe2”已被外部连接,在没有数据的地方已填充“null”。结果数据帧包含两个数据帧的所有列。

Outer join Spark dataframe with non-identical join column

通过一些小的调整,我们将使用 Name 列的一个列来提高可读性。如果你想要两个列,请遵循上面的说明。

输出

Outer join Spark dataframe with non-identical join column

使用外连接进行合并

在 PySpark 中,`merge` 方法不可用。但是,Pandas 可以使用它。如果你正在处理小型数据集并希望使用 `merge` 方法,可以将 PySpark 数据帧转换为 Pandas 数据帧,使用 `merge` 技术合并它们,然后将合并的 Pandas 数据帧转换回 PySpark 数据帧。

外连接是一种数据库连接,它允许我们在保留一个或两个表的所有行的情况下组合两个表,即使在另一个表中没有相应的匹配项。在 Python 中,我们可以使用 pandas 库中的 `merge()` 函数执行外连接。

让我们来看一个例子,其中我们有两个表,employees 和 departments,数据如下:

要根据 `department_id` 列在这两个表之间执行外连接,我们可以使用 `merge()` 函数,如下所示:

`how` 参数指定我们要执行的连接类型。在这种情况下,我们将 `outer` 指定为执行外连接。`on` 参数指定要连接的列。

结果合并表将如下所示:

   id_x  name_x    salary  department_id  id_y        name_y
0   1.0   Alice   50000.0            1.0   1.0         Sales
1   2.0     Bob   60000.0            2.0   2.0     Marketing
2   3.0  Charlie  70000.0            2.0   2.0     Marketing
3   4.0    Dave   80000.0            3.0   3.0  Engineering
4   5.0     Eve   90000.0            3.0   3.0  Engineering
5   NaN     NaN      NaN            4.0   NaN           NaN

请注意,`id_x`、`name_x` 列来自 employees 表,`id_y`、`name_y` 列来自 departments 表。由于没有 `department_id` 为 4 的员工,因此最后一行包含 NaN 值。

使用 `merge()` 进行外连接允许我们在保留一个或两个表的所有行的情况下组合两个表,即使在另一个表中没有相应的匹配项。

总之,Spark 数据帧中的外连接操作是一种根据公共列组合来自两个不同源的数据的强大方法。当两个数据帧中的连接列非标识时,我们可以使用 `withColumnRenamed()` 方法重命名其中一个列,然后像往常一样执行外连接操作。借助 Spark 数据帧,我们可以高效地处理大量数据并轻松执行复杂的数据操作。