Python中的ProcessPoolExecutor模块详解
ProcessPoolExecutor是python中的作用:用来创建和管理进程池。
本文将详细介绍python中的ProcessPoolExecutor,包括它的工作原理、如何使用它、常见问题和最佳实践。
Python进程简单介绍
python中,通常我们创建一个用进程,可以使用类Process,如图1,一个简单的例子,创建进程实例p1、p2的时候通过target和 args传递函数名(func1)和func1的实参。进程p1、p2调用start方法开启进程。
from multiprocessing import Process
import os
from time import sleep
def func1(name):
print("当前进程ID:",os.getpid())
print("父进程ID:",os.getppid())
print(f"Process:{name} start")
sleep(3)
print(f"Process:{name} end")
if __name__ =="__main__":
print("当前进程ID:",os.getpid())
# 创建进程
p1 = Process(target=func1, args=('p1',))
p2 = Process(target=func1, args=('p2',))
p1.start()
p2.start()
图1
Process实例方法还有join、kill、is_alive等方法,这些在这里不在赘述,大家自行翻阅资料查看。
什么是进程池?
进程池是用于自动管理工作进程池的编程模式,负责固定数量的进程,控制何时创建它们,例如何时需要它们,控制它们不被使用时应该做什么,比如让它们在不消耗计算资源的情况下等待。
进程池提供了一个通用接口,用于执行具有可变参数数量的临时任务,与Process对象上的属性非常相似,但不需要我们选择一个进程来运行任务、启动进程或等待任务完成。
Python可以通过ProcessPoolExecutor类实现进程池。
ProcessPoolExecutor简介
ProcessPoolExecutor继承Executor 类,并在调用时返回Future对象。
类Executor
Executor类定义了三种用于控制进程池的方法:submit()、map()和shutdown()。
Executor在创建类时启动,并且必须通过调用shutdown()显式关闭,这将释放Executtor持有的所有资源,当然也可以自动关闭。
submit()和map()函数用于将任务提交给Executor进行异步执行。
map()函数用于将函数应用于可迭代对象(如列表)中的每个元素,该函数对应于元素的每个进程都将异步运行。
submit()函数接受一个函数以及对应的参数,并将异步执行,调用会立即返回Future对象。
对象Futures
我们只用知道他是Executor和ProcessPoolExecutor返回的对象就行了,对象Futures的方法:
这些都先作为了解,后面结合例子,说说具体的作用。
下面,正片开始!
ProcessPoolExecutor的工作流程:
第一步 创建进程池:
我将进程池的容量设为6,容量大小是根据cpu核数来确定的,最好不要超过你电脑的核数。
# 创建进程池,并设置进程池容量
executor = ProcessPoolExecutor(6)
第二步 往进程池里添加任务:
上一步创建的进程池有6个“空位置”,相当于我工厂里预留了6个生产线,但是这六个生产线还没接到任务,都在空闲状态。现在我们往进程池里添加任务。
比如,我现在有个函数my_task,这个函数执行时需要二个入参。
def my_task(a,b):
sleep(2)
return a+b
一次提交一个任务:可以通过调用executor的submit方法,传入函数my_task和其入参来提交任务。
# 计算88加66
future = executor.submit(my_task, 88,66)
也可以一次提交多个任务:可以通过调用executor的map方法,传入两个等长的列表或者迭代器。
#计算11加1、22加2、33加3、44加4、55加5
futures = executor.map(my_task, [11,22,33,44,55],[1,2,3,4,5])
第三步 获取结果:
单个任务的话可以直接调用future的result()方法,result()可以加timeout参数,超过timeout指定的时间就会抛出异常。
#88加66的结果154,result = 154
result = future.result()
#result = future.result(timeout=5)
多个任务获取结果。
futures = executor.map(my_task, [11, 22, 33, 44, 55], [1, 2, 3, 4, 5])
# 打印所有进程的运行结果
for i in futures:
print(i)
第四步 关闭进程池:
executor.shutdown()
提交多次任务完整代码:
最好使用__name__=='__main__'的方法执行main()函数,不然会报RuntimeError错误。
from time import sleep
from concurrent.futures import ProcessPoolExecutor
def my_task(a, b):
sleep(2)
return a + b
def main():
executor = ProcessPoolExecutor(4)
futures = executor.map(my_task, [11, 22, 33, 44, 55], [1, 2, 3, 4, 5])
# 打印所有进程的运行结果
for i in futures:
print(i)
if __name__ == '__main__':
main()
ProcessPoolExecutor进阶方法:
#提交多次任务的方法
futures = [executor.submit(task, i) for i in range(10)]
# 获取所有任务数量
num = len(executor._pending_work_items)
# 获取某个任务的状态,'RUNNING','PENDING','FINISHED','CANCELLED'
# 分别表示正在运行,排队,结束,被取消执行
state = future._state
# 取消进程,无法取消正在运行的进程
futures.cancel()
# 如果任务已完成或被取消,则返回 True。
futures.done()
作者:调包人