Dask Python

2025年03月17日 | 阅读 9 分钟

在机器学习和数据科学的现代世界中,使用独特的 Python 工具出奇地容易。这些包包括 scikit-learn、NumPyPandas,它们无法在内存使用或处理时间方面适当地扩展数据。

转向分布式计算工具(传统上是 Apache Spark)是预料之中的。但是,这可能意味着要为一个全新的系统重新调整工作流程,在熟悉的 Python 生态系统和不同的 Java 虚拟机 (JVM) 世界之间导航,并显著增加开发工作流程的复杂性。

Dask 库用于将分布式计算能力与 Python 开发的灵活性相结合,实现数据科学,并与 Python 的标准数据工具无缝集成。

理解分布式计算

让我们考虑一个场景:我们有一个数据集,可能是一组非常大的文本文件,无法完全放入机器内存中。我们可以利用 Python 中的文件流和其他生成器工具来迭代数据集,而无需将其加载到内存中。但是,会引发另一个问题,因为程序仍然在单个线程上工作,即使在内存管理之后,这最终也会限制速度。

因此,Python 提供了一个称为全局解释器锁(换句话说,大多数开发人员使用 CPython)的安全功能,用于在 Python 中编写并行代码,但这可能有点棘手。

因此,有几种好的解决方案可用。这些解决方案包括使用 GIL 之外的低级工具(例如 NumPy 在已编译而非 Python 代码中执行多线程繁重工作),或利用 Python 代码包(例如 multiprocessingjoblib)中的多个进程/线程。

然而,尝试并行化以加速代码变得困难,结果是,即使过程正确完成,代码的可读性也较差,需要开发人员完全重新架构该过程,但系统上的资源可能有限。

对于像上面这样真正大规模的困难,分布式计算可以被认为是一个主要的关键。在分布式系统中,工作被分配到多台独立的 worker 机器上,而不是仅仅尝试在单个设备上使用多个线程工作。

这些自主的 worker 机器在其处理器上以及在其磁盘空间或内存中处理数据集的块。这些 worker 机器只通过相对简单的消息传递相互通信或与中央调度器通信,而不是像多线程代码那样共享磁盘空间和内存。

分布式计算中的挑战

  • 复杂性:设计和管理分布式系统比传统的集中式系统更复杂。
  • 安全性:确保跨多个节点的安全通信和数据完整性更具挑战性。
  • 延迟和带宽:通信延迟和有限的网络带宽会影响性能。

分布式计算系统还允许开发人员在非常大的数据集上扩展代码,以便在任意数量的 worker 上并行运行,以换取设置集中式调度器并使 worker 彼此完全分离的设计复杂性。

分布式计算的优势

  1. 效率:通过分配任务,可以更快地解决大问题。
  2. 成本效益:可以根据需要扩展资源,与维护大型、强大的单机相比,降低了成本。
  3. 冗余和可靠性:如果一个节点出现故障,其他节点可以接管,使系统更具容错性。
  4. 灵活性:分布式系统可以由不同类型的机器组成,从强大的服务器到低成本、低功耗的设备。

让我们了解 Dask 是什么以及它是如何工作的。

理解 Dask

Dask 是一个开源的 Python 库,专为并行计算和处理大型数据集而设计。它使用户能够跨多个核心或机器无缝地扩展计算,从而能够处理不适合内存的数据。Dask 与流行的 Python 库(如 NumPy、pandas 和 scikit-learn)很好地集成,允许用户以最少的代码更改并行化现有工作流程。它对于数据处理、机器学习以及其他需要高效处理大规模计算的任务特别有用。

Dask 通过其低级调度器和高级集合提供不同的实用程序。

  1. 低级调度器:Dask 提供动态任务调度器,并行处理任务图。这些执行引擎控制 高级集合。但是,我们可以使用它们来为用户定义的自定义和工作负载提供支持。这些调度器具有较低的延迟(大约 1 毫秒),并努力以较小的内存占用处理计算。Dask 中的调度器是替代复杂情况下或IPython parallelLuigi 等其他任务调度系统中多处理和线程库的直接利用的方法。
  2. 高级集合:Dask 提供高级数组、DataFrame 和 Bag 集合,它们模仿 Pandas、列表和 NumPy。但是,我们可以在不适合内存的数据集上并行操作这些集合。Dask 的高级集合是用于大数据集的 Pandas 和 NumPy 的替代品。

Dask 的用例提供了几个示例工作流,其中 Dask 可以被视为完美的选择。

Dask 调度器类型

Dask 主要提供两种类型的调度器:单机调度器和分布式调度器

  1. 单机调度器:单机调度器针对管理超出机器内存容量的任务进行了优化。它易于使用、简单且经济高效,是较小任务或开发的不错选择。但是,它不能很好地扩展,因为它在单机上运行。
  2. 分布式调度器:分布式调度器更高级,并且完全异步,允许在多台机器上持续、非阻塞地执行任务。由于其可伸缩性和功能(包括带有实时表格和绘图的交互式仪表板),因此建议在大多数情况下使用。当集群初始化时,此仪表板默认在端口 8787 上可访问,提供对任务进度和性能的有价值的见解。

建议在大多数情况下使用分布式调度器,因为它提供了一个包含多个表格和实时信息绘图的方便且交互式的仪表板。默认情况下,它在初始化集群时可在端口 8787 上使用。

在进入安装部分之前,让我们了解 Dask 集群

理解 Dask 集群

集群是一个分布式或并行处理系统,包含一组相互连接的独立计算机,它们共同作为一个单一的集成计算资源协同工作。集群中的节点可以被视为一个单处理器或多处理器系统,例如 个人计算机 (PC)、工作站,甚至是 SMP。

Dask Python

在集群世界中,有各种体系结构形式可用,以决定我们如何在计算机之间精确地分配工作。让我们了解 Dask 中集群的组织方式。

Dask 网络由三部分组成

  1. 集中式调度器:集中式调度器管理 workers 并分配需要完成的任务。
  2. 许多 Workers:许多 Workers 执行计算,保留结果,并与其他 Workers 通信结果。
  3. 一个或多个客户端:一个或多个客户端可以与 Jupyter Notebooks 或脚本的用户交互。这些客户端还将工作提交给调度器,以便在 workers 上进行处理。

客户端将向调度器发送描述计算代码类型的请求。一旦收到请求,调度器就会在 workers 之间分配工作以满足请求,最后 workers 完成计算工作。

Dask Python

正如我们所观察到的,Dask 将这些大量的数据计算分解为多个小的计算。

值得注意的是,Dask 也可以部署在各种基于集群的技术上,例如

  1. Kubernetes 集群
  2. 处理作业管理器的 HPC 集群,例如 LSF、PBS、SGE、SLURM 或科学和学术实验室中常见的任何其他管理器。
  3. 处理 YARN 的 Spark 或 Hadoop 集群。

如何安装 Dask Python

我们可以使用 Anaconda 或 pip 来安装 Dask

通过 Anaconda 安装 Dask 的语法如下

我们可以简单地在终端或命令提示符中使用以下命令通过 pip 安装 Dask

成功安装 Dask 库后,让我们了解 Dask 接口

理解 Dask 接口

Dask 提供不同的用户界面。这些界面包含一套不同的并行算法,用于分布式计算。以下是一些重要的用户界面,适用于寻求扩展 NumPy、Pandas 和 scikit-learn 的数据科学从业者。

  1. 数组:并行 NumPy
  2. 数据帧:并行 Pandas
  3. 机器学习:并行 Scikit-Learn

Dask 数组

Dask 中的数组通过分块算法提供大于内存的、并行的、n 维数组。换句话说,它是 NumPy 数组的分布式形式。

这是一张图片,可以帮助我们了解 Dask 数组的外观

Dask Python

正如我们所观察到的,多个 NumPy 数组被组织成网格以形成 Dask 数组。当我们创建一个 Dask 数组时,我们可以规定块的大小,它定义了 NumPy 数组的大小。例如,如果一个数组中有十个值,并且将块大小设置为五,它将返回两个 NumPy 数组,每个数组包含五个值。

Dask 数组提供以下一些重要功能

  1. 大于内存:Dask 数组允许我们处理大于可用内存大小的数据集。Dask 有助于将数组分解成许多小的片段,对这些片段进行操作以减少计算的内存占用,并有效地从磁盘流式传输数据。
  2. 并行:Dask 数组利用所有核心进行并行计算。
  3. 分块算法:Dask 数组还提供分块算法,用于对块或子矩阵进行操作,而不是对数组的整个行或列进行操作。此功能有助于通过处理许多小的计算来执行大型计算。

以下是一些使用 Dask 创建数组的简单案例。

示例 1:使用 Dask 数组创建随机数组

输出

[ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15]
((5, 5, 5, 1),)

说明

在上面的程序中,我们从 dask 库中导入了 array 模块,并使用 arange() 方法创建了一个包含 16 个值的数组,并分别将块大小定义为 5。然后我们使用 compute() 方法打印数组。我们还使用 chunks 函数检查了每个块的大小。结果,我们得到了结果数组,我们还可以观察到数组分布在四个块中,其中第一个、第二个和第三个块各包含五个值,第四个块只有一个值。

示例 2:将 NumPy 数组转换为 Dask 数组

输出

[ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14]

说明

在上面的示例中,我们导入了 NumPy 库和 dask 库的 array 模块。然后我们使用 arange() 方法创建了一个包含 15 个值的 NumPy 数组作为 first_array。然后我们使用 from_array() 方法将 first_array 转换为 Dask 数组作为 second_array,分别将块定义为 5。然后我们使用 compute() 函数打印数组。

此外,Dask 数组支持 NumPy 数组的大多数函数。例如,我们可以使用 mean()sum() 等。

示例 3:计算前 100 个数字的和

输出

4950

说明

在上面的示例中,我们导入了 NumPy 库和 Dask 库的 array 模块,并使用 arange 函数创建了一个从 1 到 100 的 NumPy 数组。然后我们将 NumPy 数组转换为 Dask 数组,并使用 sum() 函数打印 Dask 数组值的和。结果,我们得到了前 100 个数字的总和。

我们已经讨论了 Dask Python 的基本介绍,但仍有一些重要概念有待讨论。本教程的其余部分将在第二部分中介绍。