Python concurrent.futures 模块中的 ThreadPoolExecutor(线程池)与 ProcessPoolExecutor(进程池)详解
concurrent.futures
是 Python 标准库中的模块,它提供了高级接口来异步执行调用。这个模块可以非常方便地用于并行处理任务,支持 ThreadPoolExecutor(线程池)和 ProcessPoolExecutor(进程池)。
以下是一个使用 concurrent.futures
模块 ThreadPoolExecutor
和 ProcessPoolExecutor
的示例,展示了如何并行地下载多个网页的内容。
示例:使用 ThreadPoolExecutor 并行下载网页
首先,我们需要一个模拟的网页下载函数(在真实场景中,你会使用如 requests
库来下载网页)。然后,我们将使用 ThreadPoolExecutor
来并行执行这些下载任务
import concurrent.futures
import time
import random
# 模拟的网页下载函数
def download_url(url, timeout):
print(f'Downloading {url} ...')
# 模拟下载时间
time.sleep(random.uniform(0.5, 1.5)) # 假设下载时间随机在0.5到1.5秒之间
return f'Done downloading {url}'
# 要下载的URL列表
urls = ['http://example.com/1', 'http://example.com/2', 'http://example.com/3']
# 使用ThreadPoolExecutor并行下载
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(download_url, url, 60): url for url in urls}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result() # 获取结果,如果任务抛出异常,这里会重新抛出
except Exception as exc:
print(f'{url} generated an exception: {exc}')
else:
print(f'{data}')
# 输出将显示下载任务按完成顺序进行的情况
这段程序展示了如何使用 Python 的 concurrent.futures
模块中的 ThreadPoolExecutor
来并行地执行多个任务,这里模拟的是下载多个网页的场景。下面是详细的解释:
- 导入必要的模块:
concurrent.futures
:用于异步执行调用。time
:用于模拟下载过程中的时间延迟。random
:用于生成随机的下载时间。- 定义模拟的网页下载函数:
download_url(url, timeout)
:这个函数接收一个 URL 和一个超时时间(尽管在这个例子中超时时间没有被使用)作为参数。它首先打印出正在下载的 URL,然后使用time.sleep()
函数模拟下载过程,下载时间随机在 0.5 到 1.5 秒之间。最后,它返回一个字符串,表示下载完成。- 定义要下载的 URL 列表:
urls
列表包含了三个模拟的 URL。- 使用
ThreadPoolExecutor
并行下载: - 使用
with
语句创建一个ThreadPoolExecutor
实例,max_workers=5
指定了线程池中的最大线程数。这意味着最多可以同时有 5 个任务在执行。 - 使用字典推导式,将每个 URL 提交给
executor.submit()
方法,该方法会返回一个Future
对象。这个对象代表了异步执行的操作。future_to_url
字典将每个Future
对象映射到其对应的 URL 上,以便后续可以根据Future
对象找到对应的 URL。 - 使用
concurrent.futures.as_completed(future_to_url)
迭代器,它会按照Future
对象完成的顺序返回它们。这意味着循环将首先处理最快完成的任务。 - 在循环中,对于每个完成的
Future
对象,通过future_to_url
字典找到对应的 URL。然后,尝试调用future.result()
来获取任务的结果。如果任务执行过程中抛出了异常,future.result()
会重新抛出这个异常,这里通过try-except
语句捕获并打印异常信息。如果没有异常,就打印出下载完成的信息。 - 输出:
- 输出将显示下载任务按完成顺序进行的情况。由于下载时间是随机的,所以每次运行程序时,输出中各个 URL 的顺序都可能不同。但是,它们都将按照实际完成的时间顺序被打印出来。
示例:使用 ProcessPoolExecutor 并行处理
如果你需要执行的是CPU密集型任务,或者因为Python全局解释器锁(GIL)的限制,线程之间的并行执行没有带来明显的性能提升,那么可以使用 ProcessPoolExecutor
。以下是修改后的示例,使用 ProcessPoolExecutor
来并行执行相同的下载任务(尽管这里仍然是模拟的,因为实际的网络请求通常不会因为GIL而受限):
# 使用ProcessPoolExecutor并行下载(虽然对于网络请求来说不是最佳选择)
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(download_url, url, 60): url for url in urls}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print(f'{url} generated an exception: {exc}')
else:
print(f'{data}')
# 注意:ProcessPoolExecutor 可能会因为序列化/反序列化开销而在执行非常轻量级的任务时表现不佳
在上面的两个示例中,我们展示了如何使用 ThreadPoolExecutor 和 ProcessPoolExecutor 来并行执行多个任务。ThreadPoolExecutor 适用于I/O密集型任务(如网络请求、文件读写等),而 ProcessPoolExecutor 更适合CPU密集型任务。然而,对于网络请求,由于GIL的存在对线程的影响较小,并且进程间通信的开销通常比线程间通信要大,因此在实际应用中,ThreadPoolExecutor 可能是更合适的选择。
补充:Future 对象
上述示例中都提到Future 对象,Future 对象是一个表示异步执行的操作的对象。当你通过 ThreadPoolExecutor 或 ProcessPoolExecutor 提交一个可调用对象(如函数)给执行器(executor)时,执行器会返回一个 Future 对象。这个 Future 对象与提交的可调用对象相关联,并提供了检查调用完成情况和获取其结果的方法。
Future 对象提供了一系列方法和属性来与异步执行的任务交互。以下是一些 Future 对象的主要方法和属性:
done(): 返回一个布尔值,指示调用是否完成。
running(): 返回一个布尔值,如果调用正在执行,则返回 True。但请注意,这个方法的实现可能会因执行器的不同而有所不同,并且在某些执行器中可能不可用。
cancelled(): 返回一个布尔值,表示调用是否被取消。
cancel(): 尝试取消调用。如果调用尚未开始,则取消成功并返回 True。如果调用已经开始,则此方法将不会停止调用,但会立即返回 False。
result(timeout=None): 获取调用的结果。如果调用尚未完成,此方法将阻塞调用者,直到调用完成或直到超时(如果提供了超时时间)。如果调用被取消或引发了异常,则此方法将相应地重新抛出 CancelledError 或异常。
exception(timeout=None): 获取调用所引发的异常。如果调用成功完成或尚未完成,则返回 None。
下面是一个简单的例子,展示了如何提交一个任务并获取其 Future 对象:
import concurrent.futures
import time
# 一个简单的函数,模拟一些工作
def work(n):
time.sleep(n)
return f'Done after {n}s'
# 创建一个 ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# 提交一个任务给执行器,并获取返回的 Future 对象
future = executor.submit(work, 2)
# 现在我们有一个 Future 对象
print(f'Future object: {future}') # 这将打印出一个 Future 对象的表示
# 我们可以检查任务是否完成
print(f'Is done: {future.done()}') # 初始时应该是 False
# 等待任务完成(或超时,但这里没有设置超时)
time.sleep(3) # 假设我们知道这个任务大约需要2秒完成
# 再次检查任务是否完成
print(f'Is done now: {future.done()}') # 现在应该是 True
# 获取结果
result = future.result()
print(f'Result: {result}') # 打印出 'Done after 2s'
提交任务给执行器执行:executor.submit(),
会即时返回一个 Future
对象(虽然任务还没完成),print(f'Future object: {future}') 这行代码将打印出一个 Future 对象的字符串表示,但这通常不会包含太多有用的信息(除了对象的类型和可能的内存地址)。要获取有关异步执行的任务的实际信息(如是否完成、结果或异常),你需要使用 Future 对象提供的方法。
补充:列表推导与集合推导
1、列表推导(List Comprehension)
tasks = [executor.submit(run, item) for item in data_to_run]
这行代码使用列表推导来创建一个列表tasks,其中包含了对每个item在data_to_run中调用run函数的Future对象。列表推导保留了元素的插入顺序,因此如果你需要按照特定顺序处理这些异步任务的结果,或者需要多次迭代这些任务(例如,检查完成情况),使用列表推导是一个好选择。
2、集合推导(Set Comprehension)
tasks = {executor.submit(run, item) for item in data_to_run}
这行代码使用集合推导来创建一个集合tasks
,其中包含了对每个item
在data_to_run
中调用run
函数的Future
对象。然而,需要注意的是,集合是一个无序的、不包含重复元素的数据结构。因此,如果你不关心任务执行的顺序,或者你的data_to_run
中可能包含重复项(尽管在大多数情况下,提交给执行器的任务不应是重复的),并且你希望自动去除任何潜在的重复任务,那么使用集合推导可能是合适的。但是,由于Future
对象通常被视为唯一的(即使它们代表相同的操作),所以在这个场景中,使用集合推导主要是为了去重,这在大多数异步编程场景中并不常见。
作者:人工干智能