Python中的ProcessPoolExecutor模块详解

        ProcessPoolExecutor是python中的作用:用来创建和管理进程池。

        本文将详细介绍python中的ProcessPoolExecutor,包括它的工作原理、如何使用它、常见问题和最佳实践。

Python进程简单介绍

        python中,通常我们创建一个用进程,可以使用类Process,如图1,一个简单的例子,创建进程实例p1、p2的时候通过target和 args传递函数名(func1)和func1的实参。进程p1、p2调用start方法开启进程。

from multiprocessing import Process
import os
from time import sleep
​
def func1(name):
  print("当前进程ID:",os.getpid())
  print("父进程ID:",os.getppid())
  print(f"Process:{name} start")
  sleep(3)
  print(f"Process:{name} end")
​
if __name__ =="__main__":
  print("当前进程ID:",os.getpid())
  # 创建进程
  p1 = Process(target=func1, args=('p1',))
  p2 = Process(target=func1, args=('p2',))
  p1.start()
  p2.start()

图1

        Process实例方法还有join、kill、is_alive等方法,这些在这里不在赘述,大家自行翻阅资料查看。

什么是进程池?

        进程池是用于自动管理工作进程池的编程模式,负责固定数量的进程,控制何时创建它们,例如何时需要它们,控制它们不被使用时应该做什么,比如让它们在不消耗计算资源的情况下等待。
        进程池提供了一个通用接口,用于执行具有可变参数数量的临时任务,与Process对象上的属性非常相似,但不需要我们选择一个进程来运行任务、启动进程或等待任务完成。
        Python可以通过ProcessPoolExecutor类实现进程池。

ProcessPoolExecutor简介

        ProcessPoolExecutor继承Executor 类,并在调用时返回Future对象。

  •         Executor:ProcessPoolExecutor的父类,用于定义进程池的基本生命周期操作。
  •         Future:将任务提交到进程池时返回的对象。
  • 类Executor

            Executor类定义了三种用于控制进程池的方法:submit()、map()和shutdown()。

  • submit():分派一个要执行的函数并返回一个Future对象。
  • map():将函数应用于可迭代的元素。
  • shutdown():关闭执行器。
  •         Executor在创建类时启动,并且必须通过调用shutdown()显式关闭,这将释放Executtor持有的所有资源,当然也可以自动关闭。

            submit()和map()函数用于将任务提交给Executor进行异步执行。

            map()函数用于将函数应用于可迭代对象(如列表)中的每个元素,该函数对应于元素的每个进程都将异步运行。

            submit()函数接受一个函数以及对应的参数,并将异步执行,调用会立即返回Future对象。

    对象Futures

            我们只用知道他是Executor和ProcessPoolExecutor返回的对象就行了,对象Futures的方法:

  • cancelled(): Returns True if the task was cancelled before being executed.
  • running(): Returns True if the task is currently running.
  • done(): Returns True if the task has completed or was cancelled.
  • result(): Access the result from running the task.
  • exception(): Access any exception raised while running the task.
  • add_done_callback(): Add a callback function to the task to be executed by the process pool once the task is completed.
  • 这些都先作为了解,后面结合例子,说说具体的作用。

    下面,正片开始!

    ProcessPoolExecutor的工作流程:

    第一步 创建进程池:

            我将进程池的容量设为6,容量大小是根据cpu核数来确定的,最好不要超过你电脑的核数。

    # 创建进程池,并设置进程池容量
    executor = ProcessPoolExecutor(6)

    第二步 往进程池里添加任务:

            上一步创建的进程池有6个“空位置”,相当于我工厂里预留了6个生产线,但是这六个生产线还没接到任务,都在空闲状态。现在我们往进程池里添加任务。

            比如,我现在有个函数my_task,这个函数执行时需要二个入参。

    def my_task(a,b):
        sleep(2)
        return a+b

            一次提交一个任务:可以通过调用executor的submit方法,传入函数my_task和其入参来提交任务。

    # 计算88加66
    future = executor.submit(my_task, 88,66)

            也可以一次提交多个任务:可以通过调用executor的map方法,传入两个等长的列表或者迭代器。

    #计算11加1、22加2、33加3、44加4、55加5
    futures = executor.map(my_task, [11,22,33,44,55],[1,2,3,4,5])

    第三步 获取结果:

            单个任务的话可以直接调用future的result()方法,result()可以加timeout参数,超过timeout指定的时间就会抛出异常。

    #88加66的结果154,result = 154
    result = future.result()
    #result = future.result(timeout=5)

            多个任务获取结果。

    futures = executor.map(my_task, [11, 22, 33, 44, 55], [1, 2, 3, 4, 5])
    # 打印所有进程的运行结果
    for i in futures:
        print(i)

    第四步 关闭进程池:

    executor.shutdown()

    提交多次任务完整代码:

            最好使用__name__=='__main__'的方法执行main()函数,不然会报RuntimeError错误。

    from time import sleep
    from concurrent.futures import ProcessPoolExecutor
    
    
    def my_task(a, b):
        sleep(2)
        return a + b
    
    
    def main():
        executor = ProcessPoolExecutor(4)
        futures = executor.map(my_task, [11, 22, 33, 44, 55], [1, 2, 3, 4, 5])
        # 打印所有进程的运行结果
        for i in futures:
            print(i)
    
    
    if __name__ == '__main__':
        main()
    

    ProcessPoolExecutor进阶方法:

    #提交多次任务的方法
    futures = [executor.submit(task, i) for i in range(10)]
    
    
    # 获取所有任务数量
    num = len(executor._pending_work_items)
    
    
    # 获取某个任务的状态,'RUNNING','PENDING','FINISHED','CANCELLED'
    # 分别表示正在运行,排队,结束,被取消执行
    state = future._state
    
    # 取消进程,无法取消正在运行的进程
    futures.cancel()
    
    # 如果任务已完成或被取消,则返回 True。
    futures.done()
    
    

    作者:调包人

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python中的ProcessPoolExecutor模块详解

    发表回复