如何向 PySpark DataFrame 添加新列?2025 年 1 月 7 日 | 阅读 9 分钟 PySpark 是用于 Apache Spark 的 Python API。Apache Spark 是一个开源的云计算系统,用于解决大数据问题。PySpark 是一个支持 Python 与 Apache Spark 集成的 Python API。PySpark 提供了一个 Py4j 库,可以轻松与 Apache Spark 集成。PySpark 在处理和分析大型数据集方面发挥着重要作用。 PySpark 的特点
PySpark 的其他不同特点如下所示:
Apache Spark 是由 Apache 软件基金会设计的开源分布式云计算框架。Apache Spark 用于分析、处理和计算大数据。它提供高速易用性,简单性,流式分析,并且几乎可以在任何地方运行。Apache Spark 可以实时分析数据。Apache Spark 还提供大数据快速计算。Apache Spark 可以用于多种用途,例如运行分布式 SQL、创建数据管道、将数据摄取到数据库中;它还可以用于运行机器学习算法、处理图或数据流等等。 在 PySpark 中创建 DataFramePySpark DataFrame collect() 是一种用于将数据集的所有元素检索到驱动程序节点的操作。collect() 方法应该在 filter()、group() 等操作之后对较小的数据使用。检索大型数据可能会导致 OutOfMemory 错误。我们首先创建一个 DataFrame。 代码 输出 +---------+-------+ |dept_name|dept_id| +---------+-------+ |Finance |10 | |Marketing|20 | |Sales |30 | |IT |40 | +---------+-------+ 解释: 在上面的代码中,导入了 pyspark 库,并从 pyspark 导入了 sql。使用 getOrCreate() 方法创建了 spark 会话。数据作为类型列表输入,其中输入了部门名称和部门 ID,并且部门列作为列表输入。使用 createDataFrame 方法创建了数据框,并打印了数据。默认情况下,它只返回 20 行。 让我们使用 collect() 方法在 Pyspark 中检索数据。 代码 输出 [Row(dept_name='Finance', dept_id=10), Row(dept_name='Marketing', dept_id=20), Row(dept_name='Sales', dept_id=30), Row(dept_name='IT', dept_id=40)] 说明 在上面的代码中,deptDF.collect() 方法用于将 DataFrame 中的所有数据元素作为 Row 类型的数组获取到驱动程序节点。collect() 方法是一个动作;因此,它不打印 DataFrame;它以数组形式返回数据。您可以使用 for 循环打印数据。 代码 输出 Finance,10 Marketing,20 Sales,30 IT,40 说明 在上面的代码中,for 循环用于迭代数据帧的数据。 如果要从 DataFrame 中检索第一行和第一列,请使用 代码 输出 Finance 说明 在上面的代码行中,从数据框中检索了第一行和第一列的数据,返回了 finance。deptDF.collect() 方法返回一个 Row 类型的数组。deptDF.collect()[0] 方法返回数组中的第一个元素,而 deptDF.collect()[0][0] 返回第一行和第一列的值。 collect() 方法用于在处理小型数据集时检索输出,在具有大型数据集的 DataFrame 上调用 collect() 可能会导致内存不足错误。因此,应避免对大型数据集调用 collect()。 select() 可用于返回一个新的 DataFrame 并保存选定的列,而 collect() 是一个操作,它以数组形式返回整个数据集。 向 PySpark DataFrame 添加新列我们首先创建一个 DataFrame,以便在 PySpark 中添加列。 代码 输出 +---+-------+---------+ | ID| Name| Company| +---+-------+---------+ | 1| sravan|company 1| | 2| ojaswi|company 1| | 3| rohith|company 2| | 4|sridevi|company 1| | 5| bobby|company 1| +---+-------+---------+ 说明 在上面的代码中,导入了 pyspark 模块,创建了一个名为 spakdf 的会话,并调用了 getorcreate 函数来创建会话。定义了一个员工数据列表,列名为 ID、Name 和 Company。借助 createDataFrame 函数,创建了数据框,并传递了数据和列。借助 show 方法打印了数据框。 方法 1:添加具有常量值的新列此方法通过 withColumn 函数中的 lit 函数添加一个具有常量值的新列,并将所需的参数传递给这些函数。 语法 其中,
代码 输出 +---+-------+---------+------+ | ID| Name| Company|salary| +---+-------+---------+------+ | 1| sravan|company 1| 34000| | 2| ojaswi|company 1| 34000| | 3| rohith|company 2| 34000| | 4|sridevi|company 1| 34000| | 5| bobby|company 1| 34000| +---+-------+---------+------+ 说明 在上面的代码中,导入了 pyspark 模块,还导入了其他函数,例如用于创建会话的 SparkSession 和用于向新列添加值的 lit 函数。借助 getOrCreate 函数创建了一个会话。定义了一个包含 ID、Name 和 Company 列的数据列表。借助 createDataFrame 函数创建了数据框。借助 withColumn 函数创建了名为 salary 的新列,并添加了一个常量值。借助 show 函数打印了数据框。 根据 DataFrame 的另一列添加列此方法根据给定数据框中的列添加新列。在此方法中,根据数据框中现有列创建新列。 语法 其中,
代码 输出 +---+-------+---------+-------+ | ID| Name| Company| salary| +---+-------+---------+-------+ | 1| sravan|company 1| 2300.0| | 2| ojaswi|company 1| 4600.0| | 3| rohith|company 2| 6900.0| | 4|sridevi|company 1| 9200.0| | 5| bobby|company 1|11500.0| +---+-------+---------+-------+ 说明 在上面的代码中,添加了一个名为 salary 的新列,该列通过将 ID 列乘以 2300 得到。 使用 concat_ws() 此函数用于连接两个已经存在的列,并从 pyspark.sql.functions 导入此方法以创建新列。 语法 其中,
代码 输出 +---+-------+---------+-----------------+ | ID| Name| Company| Details| +---+-------+---------+-----------------+ | 1| sravan|company 1| sravan-company 1| | 2| ojaswi|company 1| ojaswi-company 1| | 3| rohith|company 2| rohith-company 2| | 4|sridevi|company 1|sridevi-company 1| | 5| bobby|company 1| bobby-company 1| +---+-------+---------+-----------------+ 说明 在上面的代码中,创建了一个名为 details 的新列,它由 Name 和 Company 列组成。 当 DataFrame 中不存在列时添加列此方法用于在列不存在于数据框中时添加列,它借助 lit() 函数和 if 条件的检查来创建。 语法 代码 输出 +---+-------+---------+------+ | ID| Name| Company|salary| +---+-------+---------+------+ | 1| sravan|company 1| 34000| | 2| ojaswi|company 1| 34000| | 3| rohith|company 2| 34000| | 4|sridevi|company 1| 34000| | 5| bobby|company 1| 34000| +---+-------+---------+------+ 说明 在上面的代码中,导入了必要的模块,并创建了一个 Spark 会话。员工数据作为列表输入,包含 ID、NAME 和 Company 列。数据框是借助 createDataFrame 函数创建的。通过检查 salary 列是否存在于数据框中,当它不存在时,将其添加到数据框中。 使用 select() 向 DataFrame 添加列为了在数据框中添加列,使用 lit() 函数调用 select() 函数。此方法用于显示选定的列。 语法 代码 输出 +------+ |salary| +------+ | 34000| | 34000| | 34000| | 34000| | 34000| +------+ 说明 在上面的代码中,添加了一个名为 salary 的列,其常量值为 34000,并通过 select 函数选择了此列。 使用 SQL 表达式向 DataFrame 添加列此方法借助 SQL 表达式和 SQL 函数添加列。为此,首先创建一个临时视图。对于该视图,添加并选择了列。 语法 代码 输出 +------+ |salary| +------+ | 34000| | 34000| | 34000| | 34000| | 34000| +------+ 说明 在上面的代码中,导入了所需的模块并创建了一个会话。输入了数据列表,并借助 createDataFrame 函数创建了数据框,列名为 ID、NAME、COMPANY。创建了一个视图,并借助 SQL 函数运行查询以选择 salary 列。 根据条件添加列值在此方法中,检查条件并结合 withColumn() 函数,并根据现有列值添加列。 语法 代码 输出 +---+-------+---------+------+ | ID| Name| Company|salary| +---+-------+---------+------+ | 1| sravan|company 1| 34000| | 2| ojaswi|company 1| 78000| | 3| rohith|company 2| 78000| | 4|sridevi|company 1| 78000| | 5| bobby|company 1| 31000| +---+-------+---------+------+ 说明 在上面的代码中,通过检查其他列(即 name)的条件来创建新的 salary 列,并根据条件添加 salary。 下一主题数据科学中有用的 PIP 命令 |
我们请求您订阅我们的新闻通讯以获取最新更新。