PySpark MapType 的 UDF

2024 年 8 月 29 日 | 4 分钟阅读

什么是 PySpark DataFrame?

PySpark DataFrame 是一个数据集合,它被组织成列。DataFrame 类似于 SparkSQL 中的关系。我们可以使用 SparkSession 中的不同函数来创建 pyspark DataFrame。

PySpark MapType

PySpark 中的 MapType 是一个用于定义字典的数据类型,它可以存储键值对、Map 类型对象。它包含三个组成部分:键类型 (数据类型)、值类型 (数据类型) 和 valueContainsNull (布尔类型)。MapType 还可以用于定义 Map 的键值对。

PySpark 允许您使用用户自定义函数 (UDF) 构建自定义函数来更改 Spark DataFrame。PySpark 支持原始数据类型的 UDF,但处理 MapType 这种具有混合值类型的复杂数据结构需要定制的方法。

PySpark MapType 语法

其中 keyType 是 Map 中键的数据类型 (它是一个非空值)

valueType 是 Map 中值的数据类型

valueContainsNull 是一个布尔类型,用于检查值是否包含 null 值。

PySpark MapType 的 UDF

pyspark.sql.function 提供了 UDF 函数,用于定义自定义函数。它接受两个参数:函数和返回类型。

PySpark UDF 函数语法

MapType 列指定一个 Map 或字典样式的结构,它将键映射到值。它是一组键值对,其中键和值可能是各种数据类型。

PySpark UDF、Spark UDF 或 Spark 中的用户自定义函数允许我们根据需要定义唯一的函数或操作。这使我们能够开发 Spark 提供的内置函数中未包含的方法。

Spark UDF 是独特且高效的,因为用户可以使用任何编程语言,如 Scala、Java、Python 或 R 来构建这些函数。Pyspark 或 Spark 中的 UDF 是逐行执行的。

PySpark UDF 架构

在 Python 中创建 Spark UDF 需要几个步骤

  • 该函数被压缩并分发到工作节点。
  • 然后,Spark 在工作节点上启动一个 Python 进程,并发送数据。
  • 该过程是逐行进行的。
  • Python 中的进程完成后,结果会返回给 Spark。

在 PySpark 中注册 UDF

让我们使用不同的方法在 PySpark 中创建 UDF。

首先,我们将导入所有必需的库,包括 PySpark 中的 UDF 方法。

然后,我们将创建一个包含不同整数的 DataFrame。

输出

root
 |-- id: map (nullable = false)

现在,我们将创建一个 UDF 来计算整数的平方。我们正在使用不同的方法创建 UDF。

1. 我们将使用装饰器模式创建 UDF。这是创建 UDF 的简单方法。

代码

输出

+------------------+
|square_integer(id)|
+------------------+
|                 1|
|                 4|
|                 9|
|                16|
|                25|
+------------------+
only showing top 5 row

说明

我们使用此 UDF 打印了整数的平方。我们使用Map 类型的数据类型调用了 UDF 装饰器。然后,我们创建了一个返回整数平方的函数。然后,使用 show 函数,我们打印了它。

2. 我们将通过 UDF 方法创建 UDF,并将参数(函数及其返回类型)传递给它。

代码

输出

+------------------+
|square_integer(id)|
+------------------+
|                 1|
|                 4|
|                 9|
|                16|
|                25|
+------------------+
only showing top 5 row

说明

我们使用此 UDF 打印了整数的平方。首先,我们创建了一个返回整数平方的函数。然后,我们使用 Map 类型的数据类型调用了 UDF 函数。然后,使用 show 函数,我们打印了它。

现在,我们将使用不同的函数修改 UDF。

在 UDF 中过滤和访问 Map 值。

在此,我们将访问 Map 值并使用 getItem() 过滤行,它将从 Map 中获取值,并使用 filter() 方法过滤行。在这里,我们将创建一个包含不同水果的 DataFrame。

代码

输出

+----+---------------------------+-------------+
|id  |fruit_count                | Apple_count |
+--------------------------------+-------------+
|1   | {Orange -> 3, Apple -> 2} | 2           |
+--------------------------------+-------------+

说明

我们将 DataFrame 中的值映射并过滤了行。然后,我们打印了水果及其计数的列表。