Dask Python (第二部分)

2025年03月17日 | 阅读 9 分钟

在之前的教程中,我们了解了分布式计算的概念和 Dask 简介。我们还了解了什么是 Dask 集群以及如何安装 Dask,此外还介绍了 Dask 接口。

Dask 接口

正如我们已经讨论过的,Dask 接口具有各种用于分布式计算的并行算法集。数据科学从业者正在使用一些重要的用户接口来扩展 NumPy、Pandas 和 scikit-learn。

  1. 数组: 并行 NumPy
  2. 数据帧: 并行 Pandas
  3. 机器学习: 并行 Scikit-Learn

我们已经在之前的教程中介绍了 Dask 数组;现在让我们直接进入 Dask 数据帧。

Dask 数据帧

我们已经观察到,它需要将多个 NumPy 数组分组才能形成 Dask 数组。类似地,一个 Dask 数据帧包含许多较小的 Pandas 数据帧。一个大的 Pandas 数据帧按行分割以形成多个较小的数据帧。这些较小的数据帧可以在单个系统或多个系统上可用(因此,允许我们存储比内存更大的数据集)。Dask 数据帧的每次计算都并行化了现有 Pandas 数据帧上的函数。

下面显示了表示 Dask 数据帧结构的图像

Dask Python Part 2

Dask 数据帧还提供了与 Pandas 数据帧非常相似的应用程序编程接口 (API)。

现在,让我们考虑一些使用 Dask 数据帧执行基本函数的示例。

示例 1:读取 CSV 文件

借助 Pandas 读取文件

输出

       Sno      Date     Time State/UnionTerritory ConfirmedIndianNational ConfirmedForeignNational   Cured  Deaths  Confirmed
0        1  30/01/20  6:00 PM               Kerala                       1                        0       0       0          1
1        2  31/01/20  6:00 PM               Kerala                       1                        0       0       0          1
2        3  01/02/20  6:00 PM               Kerala                       2                        0       0       0          2
3        4  02/02/20  6:00 PM               Kerala                       3                        0       0       0          3
4        5  03/02/20  6:00 PM               Kerala                       3                        0       0       0          3
...    ...       ...      ...                  ...                     ...                      ...     ...     ...        ...
9286  9287  09/12/20  8:00 AM            Telengana                       -                        -  266120    1480     275261
9287  9288  09/12/20  8:00 AM              Tripura                       -                        -   32169     373      32945
9288  9289  09/12/20  8:00 AM          Uttarakhand                       -                        -   72435    1307      79141
9289  9290  09/12/20  8:00 AM        Uttar Pradesh                       -                        -  528832    7967     558173
9290  9291  09/12/20  8:00 AM          West Bengal                       -                        -  475425    8820     507995

[9291 rows x 9 columns]

借助 Pandas 读取文件

输出

Sno      Date     Time State/UnionTerritory ConfirmedIndianNational ConfirmedForeignNational   Cured  Deaths  Confirmed
0        1  30/01/20  6:00 PM               Kerala                       1                        0       0       0          1
1        2  31/01/20  6:00 PM               Kerala                       1                        0       0       0          1
2        3  01/02/20  6:00 PM               Kerala                       2                        0       0       0          2
3        4  02/02/20  6:00 PM               Kerala                       3                        0       0       0          3
4        5  03/02/20  6:00 PM               Kerala                       3                        0       0       0          3
...    ...       ...      ...                  ...                     ...                      ...     ...     ...        ...
9286  9287  09/12/20  8:00 AM            Telengana                       -                        -  266120    1480     275261
9287  9288  09/12/20  8:00 AM              Tripura                       -                        -   32169     373      32945
9288  9289  09/12/20  8:00 AM          Uttarakhand                       -                        -   72435    1307      79141
9289  9290  09/12/20  8:00 AM        Uttar Pradesh                       -                        -  528832    7967     558173
9290  9291  09/12/20  8:00 AM          West Bengal                       -                        -  475425    8820     507995

[9291 rows x 9 columns]

说明

在上面的示例中,我们创建了两个不同的程序。在第一个程序中,我们导入了 pandas 库并使用 read_csv() 函数读取 CSV 文件。相比之下,我们导入了 dask 库的 dataframe 模块并使用 read_csv() 函数读取 CSV 文件。

两个程序的结果将相同,但处理时间不同。与 Pandas 相比,Dask 数据帧执行函数的速度更快。一旦实际使用,这一点就会很明显。

示例 2:查找特定列的值计数

输出

Kerala                                      315
Delhi                                       283
Rajasthan                                   282
Haryana                                     281
Uttar Pradesh                               281
Tamil Nadu                                  278
Ladakh                                      278
Jammu and Kashmir                           276
Karnataka                                   276
Punjab                                      275
Maharashtra                                 275
Andhra Pradesh                              273
Uttarakhand                                 270
Odisha                                      269
West Bengal                                 267
Puducherry                                  267
Chhattisgarh                                266
Gujarat                                     265
Chandigarh                                  265
Madhya Pradesh                              264
Himachal Pradesh                            264
Bihar                                       263
Manipur                                     261
Mizoram                                     260
Andaman and Nicobar Islands                 259
Goa                                         259
Assam                                       253
Jharkhand                                   253
Arunachal Pradesh                           251
Tripura                                     247
Meghalaya                                   240
Telengana                                   236
Nagaland                                    207
Sikkim                                      200
Dadra and Nagar Haveli and Daman and Diu    181
Cases being reassigned to states             60
Telangana                                    45
Dadar Nagar Haveli                           37
Unassigned                                    3
Telangana***                                  1
Maharashtra***                                1
Telengana***                                  1
Chandigarh***                                 1
Daman & Diu                                   1
Punjab***                                     1
Name: State, dtype: int64

说明

在上面的示例中,我们导入了 dask 库的 dataframe 模块,并使用 read_csv() 函数读取 CSV 文件的内容。然后,我们使用了列名“States”,后跟 value_counts() 方法,以计算该特定列中每个值的总数。结果,我们得到了该列中存在的所有州名及其出现的总次数。

示例 3:在 Dask 数据帧上使用 groupby 函数

输出

State
Andaman and Nicobar Islands                    4647
Andhra Pradesh                               860368
Arunachal Pradesh                             15690
Assam                                        209447
Bihar                                        232563
Cases being reassigned to states                  0
Chandigarh                                    16981
Chandigarh***                                 14381
Chhattisgarh                                 227158
Dadar Nagar Haveli                                2
Dadra and Nagar Haveli and Daman and Diu       3330
Daman & Diu                                       0
Delhi                                        565039
Goa                                           46924
Gujarat                                      203111
Haryana                                      232108
Himachal Pradesh                              37871
Jammu and Kashmir                            107282
Jharkhand                                    107898
Karnataka                                    858370
Kerala                                       582351
Ladakh                                         8056
Madhya Pradesh                               200664
Maharashtra                                 1737080
Maharashtra***                              1581373
Manipur                                       23166
Meghalaya                                     11686
Mizoram                                        3772
Nagaland                                      10781
Odisha                                       316970
Puducherry                                    36308
Punjab                                       145093
Punjab***                                    130406
Rajasthan                                    260773
Sikkim                                         4735
Tamil Nadu                                   770378
Telangana                                     41332
Telangana***                                  40334
Telengana                                    266120
Telengana***                                  42909
Tripura                                       32169
Unassigned                                        0
Uttar Pradesh                                528832
Uttarakhand                                   72435
West Bengal                                  475425
Name: Cured, dtype: int64

说明

在上面的程序中,我们再次导入了 dask 库的 dataframe 模块,并使用 read_csv 从指定的 CSV 文件中读取。然后,我们使用了 dask 数据帧的 groupby 函数和 max() 函数来查找每个州治愈人数的最大值。

现在,让我们了解另一个 Dask 接口,即 Dask 机器学习。

Dask 机器学习

Dask 机器学习为可扩展的 Python 机器学习 提供算法,它与 scikit-learn 兼容。让我们首先了解如何使用 scikit-learn 处理计算,然后仔细研究 Dask 如何以不同的方式执行这些功能。

Dask Python Part 2

用户可以通过设置参数 njobs = -1,借助 scikit-learn(在单个系统上)执行并行计算。Scikit-learn 利用 Joblib 来执行这些并行计算。Joblib 是一个 Python 库,为并行化提供支持。当我们调用 fit() 函数时,根据要执行的任务(无论是超参数搜索还是拟合模型),Joblib 将任务分发到可用的核心。

Dask Python Part 2

但是,我们可以将借助 scikit-learn 库执行的并行计算扩展到多台机器。而 Dask 在单个系统上表现良好,也可以轻松扩展到系统集群。

Dask 提供一个中央任务调度器和一组工作器。调度器将任务分配给每个工作器。然后这些工作器被分配多个核心,它们可以在这些核心上执行计算。工作器提供两个功能

  1. 计算分配的任务
  2. 根据请求向其他工作器提供结果。

让我们考虑一个示例来演示调度器和工作器之间的对话方式(此示例由 Dask 的开发人员 Matthew Rocklin 提供)

中央任务调度器以 Python 函数的形式将工作发送给工作器,以便在同一系统或集群上执行。

  1. 工作器 A,请计算 x = f(1),工作器 B 请计算 y = g(2)
  2. 工作器 A,一旦 g(2) 函数完成,请从工作器 B 获取 y 并执行 z = h(x, y)

以上示例应该能清楚地演示 Dask 的工作原理。现在让我们了解机器学习模型和 Dask-search CV。

机器学习模型

Dask 机器学习(也称为 Dask-ML)提供 Python 中可扩展的机器学习。但在我们开始之前,让我们按照下面给出的 Dask-ML 安装步骤进行操作

使用 conda 安装

使用 pip 安装

让我们继续了解直接并行化 Scikit-Learn 并使用 Dask Array 重新实现算法。

1. 直接并行化 Scikit-Learn

正如我们已经讨论过的,Scikit-Learn(也称为 sklearn)借助 Joblib 提供并行计算(在单个 CPU 上)。我们可以直接利用 Dask 通过插入几行代码(甚至无需修改当前代码)来并行化多个 sklearn 估计器。

主要步骤是从 dask 库的 distributed 模块导入客户端。此命令将在系统上生成一个本地调度程序和工作程序。

下一步是在后端实例化 Dask 的 Joblib。我们必须从 sklearn 库的 joblib 中导入 parallel_backend,如下语法所示

2. 使用 Dask 数组重新实现算法

Dask-ML 重新实现了简单的机器学习算法以使用 NumPy 数组。Dask 使用 Dask 数组替换 NumPy 数组,以实现可扩展算法。此替换有助于实现

  1. 线性模型(例如,线性回归、泊松回归、逻辑回归等)
  2. 预处理(例如,缩放器、转换等)
  3. 聚类(例如,K 均值、谱聚类等)

A. 线性模型示例

B. 预处理示例

C. 聚类示例

Dask-Search CV

超参数调优 被认为是构建模型的重要一步,可以极大地改变模型的实现。机器学习模型具有各种超参数,很难理解在特定情况下哪个参数会表现更好。手动执行此任务相当繁琐。然而,Scikit-Learn 库提供了 Gridsearch 来简化超参数调优任务。用户必须提供参数,Gridsearch 将提供这些参数的最佳组合。

让我们考虑一个示例,我们需要选择随机森林技术来拟合数据集。该模型有三个重要的可调参数——分别为第一个参数、第二个参数和第三个参数。

现在,让我们设置这些参数的值如下

第一个参数 - Bootstrap = True

第二个参数 - max_depth - [8, 9]

第三个参数 - n_estimators : [50, 100, 200]

1. sklearn Gridsearch: 对于每个参数组合,Scikit-learn Gridsearch 将执行任务,有时会导致单个任务多次迭代。下图显示,这并不是最有效的方法

Dask Python Part 2

2. Dask-Search CV: 与 sklearn 的 Gridsearch CV 相比,Dask 提供了一个名为 Dask-Search CV 的库。Dask-Search CV 合并步骤以减少重复。我们可以使用下面显示的步骤安装 Dask-search

使用 conda 安装 Dask-Search CV

使用 pip 安装 Dask-Search CV

下面显示了演示 Dask-Search CV 工作原理的图表

Dask Python Part 2

Spark 与 Dask 的区别

以下是 Spark 和 Dask 之间的主要区别

序号。SparkDask
1Spark 用 Scala 编程语言编写。Dask 用 Python 编程语言编写。
2Spark 支持 R 和 Python。Dask 只支持 Python。
3Spark 提供自己的生态系统。Dask 是 Python 生态系统的一个组件。
4Spark 提供自己的应用程序编程接口 (API)。Dask 重复利用 Pandas 的应用程序编程接口 (API)
5对于 Scala 和 SQL 用户来说,Spark 易于理解和实现。Dask 通常受到 Python 从业者的青睐。
6Spark 不原生支持多维数组。Dask 完全支持用于可扩展多维数组的 NumPy 模型。