如何在 Python 3 中使用 ThreadPoolExecutor?2025年3月5日 | 17 分钟阅读 线程是一种通过同时执行多项任务来加速代码的技术。在 Python 中,这可以通过两种不同的方式实现:使用 multiprocessing 模块或 multithreading 模块。 多线程对于需要大量等待的操作非常有用,例如通过 HTTP 发送请求、处理数据库或在文件之间传输信息。这些被称为 I/O 密集型任务。多线程 在这种情况下表现出色,因为它允许你的软件在等待这些活动返回信息的同时完成其他任务。 另一方面,当你的代码在进行大量估算时,例如解决复杂的数学问题,多进程是正确的选择。这是因为 Python 有一个名为全局解释器锁 (GIL) 的特性,它可能会在 CPU 密集型应用中减慢多线程的速度。多进程通过在自己的 CPU 核心上运行每个任务来解决这个问题,从而实现更快的计算。 在 Python 3.2 中,'concurrent.futures' 包引入了一个名为 ThreadPoolExecutor 的新类。这个类旨在更有效地管理和创建线程。你可能会想,既然 Python 已经有了一个内置的 threading 模块,为什么还需要这个。让我来解释一下。 在处理少量线程时,动态创建新线程不是问题。然而,跟踪大量线程可能会有问题。此外,生成过多的线程可能会导致速度下降,因为它在计算上变得低效。 为了持续保持卓越的性能,最好预先创建一个线程池,这些线程在需要之前保持空闲。当任务出现时,可以使用这些预先存在的线程之一,而不是启动一个新线程。这种方法减少了与不断创建新线程相关的开销。 此外,线程池管理线程的生命周期,并根据需求进行调度,从而简化你的代码并降低出错的风险。Python 中的 ThreadPoolExecutor 是一个实用工具,它允许你同时处理和运行多个进程,使你编写的代码在处理可以并发完成的任务时更有效率。 语法 参数 - max_workers: 这个选项指定了可以同时执行的线程数或工作者数。如果你不提供任何值,Python 将根据机器的 CPU 数量选择一个默认值,从 Python 3.8 开始,最小容量为 32 个线程。其中五个线程专门用于涉及输入/输出操作的任务,例如从文件中接收或发布到文件。
- thread_name_prefix: 这个特性在 Python 3.6 中引入,允许你为每个线程指定自己的名称。在调试代码时,识别线程非常有用,因为它使追踪哪个线程在做什么变得容易。
- initializer: 这个参数接受一个函数(或任何可调用对象),在新工作线程启动时执行。当你需要为每个线程在开始处理作业之前创建初始状态或环境时,这是必不可少的。
- initargs: 这是一个你希望传递给 initializer 过程的参数元组。它允许用户使用适当的数据配置每个线程的启动。
ThreadPoolExecutor 方法ThreadPoolExecutor 类提供了三种在后台运行线程的方法。下面是一个简要概述: submit(fn, *args, kwargs)这个技术允许你并发地执行一个算法或方法。它会产生一个 Future 对象,该对象作为执行结果的临时占位符。你可以使用这个对象来验证任务的状态或在任务完成后检索结果。 ThreadPoolExecutor 类的发布技术提供了一个关键特性,允许例程或过程并发执行。这意味着,该技术不是等待一个函数完成后才继续下一个任务,而是让函数在后台运行,从而释放主线程来执行其他代码。 它的工作原理如下 - 函数执行: 你将一个函数 (fn) 以及任何必要的参数 (*args) 和关键字参数 (**kwargs) 传递给 submit 方法。然后,该函数被安排在由线程池管理的单独线程中运行。
- Future 对象: 当你调用 submit 时,它会返回一个 Future 对象。这个对象代表了函数的最终结果。它就像一个占位符,你可以用它来检查函数是否已经运行完成并检索其结果。
- 检查状态: 使用 Future 对象,你可以查询函数执行的状态。例如,你可以查看函数是否仍在进行中或是否已经完成。
- 获取结果: 当一个函数执行完毕后,你可以利用 Future 对象来检索函数的结果。如果被调用的函数抛出了一个错误,那么当你尝试获取结果时,Future 对象将抛出相同的错误。
总之,submit 使你能够在后台进程中执行活动,同时保持主应用程序的可访问性和效率。它还允许你监控和找到它们的结果。 map(fn, *iterables, timeout = None, chunksize = 1)函数 map 可以用来同时对多个可迭代对象应用一个方法。如果该过程未在指定的时间限制内完成,将引发一个 concurrent.futures.TimeoutError 错误。 在处理大型可迭代对象时,使用 ProcessPoolExecutor 并设置大于 1 的 chunksize 可以提高效率。然而,如果你使用的是 ThreadPoolExecutor,chunksize 对性能没有影响,可以保持默认值 1。 Python 中的 map 函数是一个工具,它将给定的函数(或方法)同时应用于一组输入参数(可迭代对象)。这是一种在单个操作中处理多个数据项的强大方法,可以使你的代码更高效、更易读。 参数 - fn: 这是你想要应用于可迭代对象中每个项目的函数。
- *iterables: 这些是您要处理的项目集合。如果需要,您可以传递多个可迭代对象。
- timeout (可选): 设置功能执行其操作所需的最长时间。如果过程花费的时间超过指定限制,则认为它是并发的。将报告一个 TimeoutError。
- chunksize (可选,默认为 1): 这指定了一次处理的项目数量。
处理超时 如果 map 函数未能在指定的超时时间内完成处理,它将引发一个 concurrent.futures.TimeoutError。这个异常会通知你操作耗时太长并已被终止。 使用 chunksize 优化性能 - 使用 ProcessPoolExecutor: 当使用专为跨多个进程并行处理而设计的 ProcessPoolExecutor 时,调整 chunksize 可以提高性能。更大的 chunksize 允许执行器一次处理更大的数据批次,减少开销并可能加速操作。
- 使用 ThreadPoolExecutor: 对于专为跨多个线程并行处理而设计的 ThreadPoolExecutor,更改 chunksize 并不能带来性能优势。在这种情况下,你可以坚持使用默认的 chunksize 为 1。
shutdown(wait = True, *, cancel_futures = False)在 Python 中与监视器(例如通过 concurrent.futures 模块的 ThreadPoolExecutor 或 ProcessPoolExecutor)交互时,shutdown() 函数对于控制能源和保证任务正确完成至关重要。 以下是其工作原理的解释: - shutdown() 的目的: shutdown() 函数指示执行器停止接收新任务,并在所有现有任务(future)完成后释放所有资源。这是避免资源泄漏并确保执行器得到适当清理所必需的。
- 使用场景
- 何时调用: 你应该在使用 executor.submit() 或 executor.map() 提交完任务后调用 shutdown()。如果在所有任务提交之前调用 shutdown(),或者根本不调用它,你可能会遇到 RuntimeError 或使资源处于不一致的状态。
参数 - wait=True: 当 wait=True 时,该方法将阻塞并等待,直到当前正在处理的所有任务都完成后才返回。这确保了执行器在所有事情都得到妥善清理并且所有任务都已完成之前不会返回。这是一种确保关闭过程彻底且没有任务被挂起的方法。
- cancel_futures=False: 这个参数决定是否取消任何尚未开始的任务。如果你设置 cancel_futures=True,执行器将尝试取消这些待处理的任务。如果你需要停止已排队但尚未开始的任务,这可能很有用。默认情况下,此参数设置为 False,这意味着只处理当前正在运行或已完成的任务,任何待处理的任务都不会被取消。
想象一下,你有一个工作线程或进程池来处理多个任务。在提交完所有任务后,你决定是时候关闭执行器了。你调用 shutdown(wait=True),该方法将等待直到所有任务都完成。如果你还想取消任何尚未开始的任务,你会调用 shutdown(wait=True, cancel_futures=True)。这样,你就能确保你的应用程序正确清理资源,并有效地管理任务的完成和取消。 示例 1 下面的代码片段展示了 ThreadPoolExecutor 的功能。与传统的 threading 模块不同,ThreadPoolExecutor 极大地简化了线程操作。使用 ThreadPoolExecutor,你不需要手动使用循环创建和管理线程。你也不必用列表跟踪线程,用 join 等待它们完成,或在线程结束后处理资源清理的麻烦。 相反,ThreadPoolExecutor 在幕后处理了所有这些细节。这意味着你可以编写更干净、更紧凑的代码,并且不易出错。ThreadPoolExecutor 的构造函数会自动处理线程管理、同步和资源释放。这不仅简化了你的代码,还减少了与手动线程处理相关的错误可能性。 总而言之,ThreadPoolExecutor 抽象了线程的复杂性,让你能够更多地关注应用程序的核心逻辑,而不是线程管理的 intricacies。 代码 输出
square of 2:4
square of 7:49
square of 8:64
square of 9:81
square of 10:100
None
None
None
None
代码解释 - ThreadPoolExecutor: 这个类是 concurrent.futures 模块的一部分,该模块为异步执行可调用对象提供了一个高级接口。ThreadPoolExecutor 允许你管理一个线程池来并发执行任务。
- sleep: 虽然导入了,但 sleep 在此代码片段中并未使用。
- square(x): 这个函数接受一个整数 x 并打印其平方。它不返回任何值;它只是直接打印结果。
- if __name__ == '__main__': 这确保了此块内的代码仅在脚本直接执行时运行,而不是作为模块导入到另一个脚本中时运行。
- res = []: 初始化一个空列表 res 来存储 map 操作的结果。
- ThreadPoolExecutor(max_workers=5): 创建一个最多有 5 个工作线程的线程池。这意味着最多可以有 5 个线程并发运行。
- as exe: 将 ThreadPoolExecutor 实例分配给变量 exe。with 语句确保在退出块时资源得到正确清理。
- submit(square, 2): 将 square 函数以参数 2 提交给线程池。这个函数将在一个单独的线程中执行,但在这种情况下,其结果没有被存储或处理。
- map(square, val): 将 square 函数映射到列表 val ([7, 8, 9, 10]) 上。这意味着 square 将在单独的线程中以 val 的每个元素作为参数被调用。exe.map 返回一个结果的迭代器。
- 迭代由 exe.map 产生的结果。每个结果对应于用 val 的一个元素调用 square 的结果。
- 打印迭代器 res 中的每个结果。然而,由于 square 函数不返回值,只是打印,所以这里打印的值是 None。
- 线程池: ThreadPoolExecutor 管理一个线程池,允许你并发执行任务。max_workers 参数指定了要使用的最大线程数。
- submit 方法: 向线程池提交一个任务,但不等待结果。
- map 方法: 将一个函数映射到一个参数列表,并返回一个迭代器。该函数会在单独的线程中应用于每个参数。
示例 2 下面的代码片段旨在使用 requests 库发出的 HTTP 请求 从网络上收集照片。脚本的第一部分一次执行一个 API 调用,这可能相当缓慢。而脚本的第二部分则使用线程同时进行多个调用,从而加快了过程。 你可以测试不同的因素,看看它们如何影响速度。例如,将线程数从三增加到六,很可能会在下载性能上带来更显著的提升。 代码 输出
Downloading...
Downloading...
Downloading...
Downloading...
Downloading...
Downloading...
Single Threaded Code Took: 0.38163670600010846 seconds
**************************************************
Downloading...
Downloading...
Downloading...
Downloading...
Downloading...
Downloading...
MultiThreaded Code Took: 0.1118598049999946 seconds
代码解释 - requests: 用于发出 HTTP 请求以获取图像数据。
- time: 用于测量操作所花费的时间。
- concurrent.futures: 用于代码的并行执行。
- image_urls 是一个指向你想要下载的图像的 URL 列表。
- start_time = time.perf_counter(): 在开始下载图像前记录开始时间。
- fetch_image(url): 一个函数,接收一个 URL,发出 GET 请求获取图像,并打印 "Downloading..."。它将图像数据存储在 image_data 中,但这些数据没有被进一步使用。
- 一个 for 循环 遍历 image_urls 中的每个 URL,调用 fetch_image(url) 来顺序下载每张图片。
- end_time = time.perf_counter(): 在所有图像下载完成后记录结束时间。
- 打印单线程下载过程所花费的时间。
- start_time = time.perf_counter(): 记录多线程下载的开始时间。
- fetch_image_concurrently(url): 与 fetch_image(url) 类似,这个函数获取图像数据并打印 "Downloading..."。它有相同的目的,但专为并发执行而设计。
- 使用 concurrent.futures.ThreadPoolExecutor(3) 作为执行器: 创建一个最多有三个线程的线程池。with 声明保证了资产得到妥善处理和清理。
- executor.map(fetch_image_concurrently, image_urls): fetch_image_concurrently 调用被分配给 image_urls 中的每个 URL,允许使用最多三个线程同时检索多张图片。
- end_time = time.perf_counter(): 在所有图像并发下载完成后记录结束时间。
- 打印多线程下载过程所花费的时间。
- 单线程策略一次上传一张照片,而多线程方法利用线程池同时上传多张照片。
该程序的目标是评估单线程和多线程图片下载的效率。预期多线程方法由于并发执行会更快。 - fetch_image 和 fetch_image_concurrently 函数中的 print("Downloading...") 语句用于指示下载何时开始,但它们不提供有关完成或下载了多少数据的信息。
- 在代码的两个版本中,image_data 被获取但未使用或存储。这通常是您处理或保存下载图像的地方。
Python 3 中 ThreadPoolExecutor 的优点Python 的 concurrent.futures 模块中的 ThreadPoolExecutor 在管理和执行线程方面提供了几个优势。以下是一些关键好处: 1. 简化的线程管理 手动管理线程可能很麻烦。你通常需要: - 创建线程: 管理它们的生命周期(开始、暂停、恢复、停止)。
- 处理同步问题,如竞争条件和死锁。
ThreadPoolExecutor 将这些细节抽象化了。你不必单独创建和管理每个线程。相反,你向执行器提交任务,它会自动处理线程的创建、调度和执行。 2. 线程池 线程池是一组预先实例化、处于空闲状态的线程,随时准备执行任务。这种池化提供了几个好处: - 减少开销: 创建和销毁线程是资源密集型的。通过重用一个线程池,你减少了这种开销。
- 线程是免费可用的;因此,任务可以更快地完成。
例如,如果你拥有一个包含 10 个进程的池并提交 100 个任务,ThreadPoolExecutor 将利用这 10 个线程来同时处理任务,并在新线程可用时回收线程。 3. 任务调度 使用 ThreadPoolExecutor,你可以使用 submit() 方法提交任务,该方法会安排它们由线程池执行。这种调度包括: - 排队: 任务被放入一个队列中,并在线程可用时执行。
- 优先级: 任务按照它们被提供的顺序处理,尽管如果需要可以实现更复杂的排序。
这种抽象使你能够专注于任务定义,而不是费心思考哪个线程将执行它们以及何时执行。
4. 并发控制 在创建执行器时,你可以通过指定 max_workers 参数来控制池中的最大线程数。这使你能够: - 优化资源利用: 将线程数量限制在应用程序或相关系统能够处理的范围内。
- 防止过载: 不要让你的计算机因过多的并发线程而过载,这可能会损害性能。
例如,设置 max_workers 为 5 可确保同时运行的线程不超过五个,即使提交了 100 个任务。 5. Future 对象 当你向 ThreadPoolExecutor 提交任务时,它会返回一个 Future 对象。这个对象代表了任务的最终结果,并提供了几种方法: - result(): 阻塞直到任务完成并返回结果。
- exception(): 返回任务执行期间引发的任何异常。
- done(): 检查任务是否完成。
您可以使用这些方法以干净、有组织的方式处理结果和异常。 6. 异常处理 如果某事引发异常,ThreadPoolExecutor 会捕获它,并让你通过使用 Future 对象的 exception() 函数来恢复它。这可以防止异常导致你的软件突然崩溃,并使以受控方式处理问题变得更容易。 7. 优雅关闭 当你提交完任务后,可以调用执行器的 shutdown() 方法。这个方法: - 阻止新任务: 它阻止任何新任务被提交。
- 等待完成: 它会等待当前正在执行的任务完成后再关闭。
你也可以使用 shutdown(wait=False) 立即返回,同时允许当前正在执行的任务完成。 8. 灵活的任务提交 ThreadPoolExecutor 允许你提交各种可调用对象: - 函数: 常规 Python 函数。
- 方法: 类的实例方法。
- Lambda 函数: 内联匿名函数。
这种灵活性意味着你可以轻松地将执行器适应不同类型的任务和场景。 Python 3 中 ThreadPoolExecutor 的缺点虽然 Python 中的 ThreadPoolExecutor 提供了许多优点,但它也有一些缺点和限制。以下是一些主要缺点: - 全局解释器锁 (GIL) 的影响
- GIL 约束: Python 的全局解释器锁 (GIL) 可能会限制多线程的效率。GIL 保证一次只有一个循环运行 Python 字节码,这可能会影响 CPU 密集型操作的速度。ThreadPoolExecutor 对于 I/O 密集型过程仍然有效,因为线程经常等待 I/O 操作完成,但其优势对于 CPU 密集型工作负载不太明显。
- 有限的并行性
- CPU 密集型限制: 由于 GIL 的存在,ThreadPoolExecutor 可能无法为需要大量计算的 CPU 密集型操作提供显著的性能提升。对于此类任务,使用 ProcessPoolExecutor(使用进程而非线程)可能更合适,因为它通过使用独立的内存空间绕过了 GIL。
- 资源管理
- 线程开销: 每个线程都会消耗系统资源。如果管理不当,拥有大量线程可能导致过度的内存使用和增加的上下文切换开销。设置过高的线程数可能会降低性能而不是提高性能。
- 任务依赖的复杂性
- 任务依赖: 管理具有复杂依赖关系的任务可能会变得繁琐。ThreadPoolExecutor 独立处理任务,不提供任务依赖或工作流的内置机制。你可能需要实现额外的逻辑来处理任务之间的依赖和同步。
- 错误处理
- 异常传播: 虽然可以使用 Future 对象捕获来自线程的异常,但在涉及多个线程的情况下,错误处理可能会变得复杂。你需要明确检查并处理异常,这会增加代码的复杂性。
- 死锁和竞争条件
- 并发问题: 当你使用任何多线程策略时,都应警惕死锁、竞争条件以及各种其他并发问题。线程安全需要适当的同步方法(如锁),这增加了代码的复杂性。
- 阻塞操作
- 阻塞调用: 如果一个线程在一个长时间运行或阻塞的操作上被阻塞,可能会导致线程池的低效使用。如果任务没有被设计成能很好地处理阻塞操作,这可能是一个问题,因为它可能导致线程饥饿或利用率不足。
- 大规模系统的可扩展性有限
- 可扩展性问题: 对于需要数千个并发线程的超大规模系统,ThreadPoolExecutor 可能无法有效扩展。在这种情况下,其他并行处理方法或框架可能更合适。
- 测试和调试
- 多线程程序的调试可能比单线程程序更困难。像竞争条件、死锁和线程交互这样的难题可能难以重现和识别。
Python 3 中 ThreadPoolExecutor 的应用Python 中的 ThreadPoolExecutor 是一个多功能工具,可应用于各种需要并发执行的场景。以下是一些常见的应用: - I/O 密集型任务
ThreadPoolExecutor 在涉及等待输入/输出操作(例如读取或写入文件、执行网络查询或查询数据库)的情况下非常有用。它允许多个进程同时运行。这可能是有利的,因为当一个线程等待读或写操作完成时,其他线程可以继续执行,从而提高整体效率并减少等待时间。 - 并行计算
ThreadPoolExecutor 允许将计算操作划分为更小、独立的工件并并行处理。这对于需要复杂计算或数据处理的活动很方便。虽然 Python 的全局解释器锁 (GIL) 会降低线程对 CPU 密集型操作的有效性,但 ThreadPoolExecutor 对于 I/O 密集型工作负载或在 GIL 影响较小的情况下仍然具有优势。 - 异步任务
ThreadPoolExecutor 适用于管理不干扰主应用程序流程的异步活动。ThreadPoolExecutor 有助于管理诸如发送电子邮件、后台更新以及在后台运行长时间运行的程序等任务。这使得主应用程序能够保持响应并执行其他操作,同时处理这些任务。 - Web 服务器和网络服务
ThreadPoolExecutor 在网络和 Web 服务器中非常有用,当需要同时处理来自客户端的多个请求时。这通过将工作分配给多个线程,提高了服务器管理大量入站连接和响应的能力。它确保服务器保持及时响应,并能妥善处理多个客户端。 - GUI 应用
在图形用户界面(GUI)中,耗时或阻塞的操作,例如数据加载或处理,如果在主线程上执行,可能会导致用户界面变得无法使用。ThreadPoolExecutor 通过将这些活动卸载到基础线程来提供帮助,从而使 GUI 保持可访问性。 - 测试与仿真
对于测试框架或仿真环境,当需要执行多个独立的测试或仿真时,ThreadPoolExecutor 可以并发运行这些任务。这减少了测试套件或仿真完成所需的总时间,从而实现更快的反馈和迭代。通过并行化执行,你可以更好地利用可用的 CPU 资源,并加速整个测试或仿真过程。
结论Python 中的 ThreadPoolExecutor 通过线程提供了一种管理并发任务的有效方式,简化了同时执行多个任务的过程。它通过维护一个工作线程池来抽象线程管理的复杂性,从而减少了重复创建和销毁线程所带来的开销。这使得它对于 I/O 密集型任务特别有效,例如网络爬虫、文件处理或网络通信,这些任务通常涉及等待外部资源,并且可以从并发执行中受益。 指定最大线程数的能力允许对并发性进行微调控制,优化资源使用并提高应用程序的响应能力。例如,它可以在 Web 服务器中处理多个并发连接,或在 GUI 应用程序中执行后台处理而不会冻结用户界面。 然而,由于 Python 的全局解释器锁 (GIL),ThreadPoolExecutor 在处理 CPU 密集型任务时面临限制,该锁限制了多个线程在 CPU 密集型操作中的执行。在这种情况下,像 ProcessPoolExecutor 这样的替代方案可能更合适,因为它们通过使用独立的进程来绕过 GIL。 总的来说,ThreadPoolExecutor 是一个多功能且强大的工具,在并发任务执行有益的场景中能提升性能和效率,使其成为 Python 并发工具箱中一个有价值的补充。
|