如何向 PySpark DataFrame 添加新列?

2025 年 1 月 7 日 | 阅读 9 分钟

PySpark 是用于 Apache Spark 的 Python API。Apache Spark 是一个开源的云计算系统,用于解决大数据问题。PySpark 是一个支持 Python 与 Apache Spark 集成的 Python API。PySpark 提供了一个 Py4j 库,可以轻松与 Apache Spark 集成。PySpark 在处理和分析大型数据集方面发挥着重要作用。

PySpark 的特点

  • 内存计算
  • 惰性求值
  • 容错性
  • 不可变性
  • 分割
  • 持久化
  • 粗粒度操作

PySpark 的其他不同特点如下所示:

  • 实时计算: PySpark 在处理大型数据集时提供实时计算,因为它专注于内存处理。
  • 支持多种语言: PySpark 框架支持多种编程语言,如 Scala、Java、Python 和 R。
  • 缓存和磁盘持久性: PySpark 框架提供了强大的缓存和良好的磁盘持久性。
  • 快速处理: PySpark 能够实现高速数据处理,内存中速度快 100 倍。

Apache Spark 是由 Apache 软件基金会设计的开源分布式云计算框架。Apache Spark 用于分析、处理和计算大数据。它提供高速易用性,简单性,流式分析,并且几乎可以在任何地方运行。Apache Spark 可以实时分析数据。Apache Spark 还提供大数据快速计算。Apache Spark 可以用于多种用途,例如运行分布式 SQL、创建数据管道、将数据摄取到数据库中;它还可以用于运行机器学习算法、处理图或数据流等等。

在 PySpark 中创建 DataFrame

PySpark DataFrame collect() 是一种用于将数据集的所有元素检索到驱动程序节点的操作。collect() 方法应该在 filter()、group() 等操作之后对较小的数据使用。检索大型数据可能会导致 OutOfMemory 错误。我们首先创建一个 DataFrame。

代码

输出

 
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10     |
|Marketing|20     |
|Sales    |30     |
|IT       |40     |
+---------+-------+   

解释: 在上面的代码中,导入了 pyspark 库,并从 pyspark 导入了 sql。使用 getOrCreate() 方法创建了 spark 会话。数据作为类型列表输入,其中输入了部门名称和部门 ID,并且部门列作为列表输入。使用 createDataFrame 方法创建了数据框,并打印了数据。默认情况下,它只返回 20 行。

让我们使用 collect() 方法在 Pyspark 中检索数据。

代码

输出

 
[Row(dept_name='Finance', dept_id=10), 
Row(dept_name='Marketing', dept_id=20), 
Row(dept_name='Sales', dept_id=30), 
Row(dept_name='IT', dept_id=40)]   

说明

在上面的代码中,deptDF.collect() 方法用于将 DataFrame 中的所有数据元素作为 Row 类型的数组获取到驱动程序节点。collect() 方法是一个动作;因此,它不打印 DataFrame;它以数组形式返回数据。您可以使用 for 循环打印数据。

代码

输出

 
Finance,10
Marketing,20
Sales,30
IT,40   

说明

在上面的代码中,for 循环用于迭代数据帧的数据。

如果要从 DataFrame 中检索第一行和第一列,请使用

代码

输出

 
Finance   

说明

在上面的代码行中,从数据框中检索了第一行和第一列的数据,返回了 finance。deptDF.collect() 方法返回一个 Row 类型的数组。deptDF.collect()[0] 方法返回数组中的第一个元素,而 deptDF.collect()[0][0] 返回第一行和第一列的值。

collect() 方法用于在处理小型数据集时检索输出,在具有大型数据集的 DataFrame 上调用 collect() 可能会导致内存不足错误。因此,应避免对大型数据集调用 collect()。

select() 可用于返回一个新的 DataFrame 并保存选定的列,而 collect() 是一个操作,它以数组形式返回整个数据集。

向 PySpark DataFrame 添加新列

我们首先创建一个 DataFrame,以便在 PySpark 中添加列。

代码

输出

 
+---+-------+---------+
| ID|   Name|  Company|
+---+-------+---------+
|  1| sravan|company 1|
|  2| ojaswi|company 1|
|  3| rohith|company 2|
|  4|sridevi|company 1|
|  5|  bobby|company 1|
+---+-------+---------+   

说明

在上面的代码中,导入了 pyspark 模块,创建了一个名为 spakdf 的会话,并调用了 getorcreate 函数来创建会话。定义了一个员工数据列表,列名为 ID、Name 和 Company。借助 createDataFrame 函数,创建了数据框,并传递了数据和列。借助 show 方法打印了数据框。

方法 1:添加具有常量值的新列

此方法通过 withColumn 函数中的 lit 函数添加一个具有常量值的新列,并将所需的参数传递给这些函数。

语法

其中,

  • dataframe 是用户输入的数据。
  • Column_name 是要添加的列

代码

输出

 
+---+-------+---------+------+
| ID|   Name|  Company|salary|
+---+-------+---------+------+
|  1| sravan|company 1| 34000|
|  2| ojaswi|company 1| 34000|
|  3| rohith|company 2| 34000|
|  4|sridevi|company 1| 34000|
|  5|  bobby|company 1| 34000|
+---+-------+---------+------+   

说明

在上面的代码中,导入了 pyspark 模块,还导入了其他函数,例如用于创建会话的 SparkSession 和用于向新列添加值的 lit 函数。借助 getOrCreate 函数创建了一个会话。定义了一个包含 ID、Name 和 Company 列的数据列表。借助 createDataFrame 函数创建了数据框。借助 withColumn 函数创建了名为 salary 的新列,并添加了一个常量值。借助 show 函数打印了数据框。

根据 DataFrame 的另一列添加列

此方法根据给定数据框中的列添加新列。在此方法中,根据数据框中现有列创建新列。

语法

其中,

  • dataframe 是给定的数据框
  • column_name 是要创建的列。
  • existing_column 是数据框中已存在的列。

代码

输出

 
+---+-------+---------+-------+
| ID|   Name|  Company| salary|
+---+-------+---------+-------+
|  1| sravan|company 1| 2300.0|
|  2| ojaswi|company 1| 4600.0|
|  3| rohith|company 2| 6900.0|
|  4|sridevi|company 1| 9200.0|
|  5|  bobby|company 1|11500.0|
+---+-------+---------+-------+   

说明

在上面的代码中,添加了一个名为 salary 的新列,该列通过将 ID 列乘以 2300 得到。

使用 concat_ws()

此函数用于连接两个已经存在的列,并从 pyspark.sql.functions 导入此方法以创建新列。

语法

其中,

  • dataframe 是给定的数据框。
  • Column_name 是要创建的列。
  • Existing_column1 和 existing_column2 将连接成一个新列。
  • 分隔符是具有两列的一种运算符。

代码

输出

 
+---+-------+---------+-----------------+
| ID|   Name|  Company|          Details|
+---+-------+---------+-----------------+
|  1| sravan|company 1| sravan-company 1|
|  2| ojaswi|company 1| ojaswi-company 1|
|  3| rohith|company 2| rohith-company 2|
|  4|sridevi|company 1|sridevi-company 1|
|  5|  bobby|company 1|  bobby-company 1|
+---+-------+---------+-----------------+   

说明

在上面的代码中,创建了一个名为 details 的新列,它由 Name 和 Company 列组成。

当 DataFrame 中不存在列时添加列

此方法用于在列不存在于数据框中时添加列,它借助 lit() 函数和 if 条件的检查来创建。

语法

代码

输出

 
+---+-------+---------+------+
| ID|   Name|  Company|salary|
+---+-------+---------+------+
|  1| sravan|company 1| 34000|
|  2| ojaswi|company 1| 34000|
|  3| rohith|company 2| 34000|
|  4|sridevi|company 1| 34000|
|  5|  bobby|company 1| 34000|
+---+-------+---------+------+   

说明

在上面的代码中,导入了必要的模块,并创建了一个 Spark 会话。员工数据作为列表输入,包含 ID、NAME 和 Company 列。数据框是借助 createDataFrame 函数创建的。通过检查 salary 列是否存在于数据框中,当它不存在时,将其添加到数据框中。

使用 select() 向 DataFrame 添加列

为了在数据框中添加列,使用 lit() 函数调用 select() 函数。此方法用于显示选定的列。

语法

代码

输出

 
+------+
|salary|
+------+
| 34000|
| 34000|
| 34000|
| 34000|
| 34000|
+------+  

说明

在上面的代码中,添加了一个名为 salary 的列,其常量值为 34000,并通过 select 函数选择了此列。

使用 SQL 表达式向 DataFrame 添加列

此方法借助 SQL 表达式和 SQL 函数添加列。为此,首先创建一个临时视图。对于该视图,添加并选择了列。

语法

代码

输出

 
+------+
|salary|
+------+
| 34000|
| 34000|
| 34000|
| 34000|
| 34000|
+------+   

说明

在上面的代码中,导入了所需的模块并创建了一个会话。输入了数据列表,并借助 createDataFrame 函数创建了数据框,列名为 ID、NAME、COMPANY。创建了一个视图,并借助 SQL 函数运行查询以选择 salary 列。

根据条件添加列值

在此方法中,检查条件并结合 withColumn() 函数,并根据现有列值添加列。

语法

代码

输出

 
+---+-------+---------+------+
| ID|   Name|  Company|salary|
+---+-------+---------+------+
|  1| sravan|company 1| 34000|
|  2| ojaswi|company 1| 78000|
|  3| rohith|company 2| 78000|
|  4|sridevi|company 1| 78000|
|  5|  bobby|company 1| 31000|
+---+-------+---------+------+   

说明

在上面的代码中,通过检查其他列(即 name)的条件来创建新的 salary 列,并根据条件添加 salary。