(Python)Multiprocessing子进程实战教程:Process类的深度解析与总结

Python提供了multiprocessing模块来开启子进程,并在子进程中执行我们定制的任务。

(python)multiprocessing子进程(Process类的使用)

  • 两种使用方法
  • 第一种方法:使用Process类
  • 第二种方法:使用multiprocessing.Pool
  • 以下是两个实例,分别演示如何使用Process类和multiprocessing.Pool来创建和管理子进程。
  • 实例一:使用Process类
  • 实例二:使用multiprocessing.Pool
  • self.pool.starmap和pool.apply_async区别
  • self.pool.starmap
  • pool.apply_async
  • 异步执行期间,继续执行主进程代码
  • 进程池完成结束判断
  • 选择进程启动方法
  • 两种使用方法

    第一种方法:使用Process类

    self.process = Process(target=Server, args=(self.zmqQThread.ipc_url0, self.zmqQThread.ipc_url1))
    # self.process.daemon = True
    self.process.start()
    
    

    解释:这段代码直接创建一个Process对象,并启动一个新的进程来运行Server函数,传入self.zmqQThread.ipc_url0和self.zmqQThread.ipc_url1作为参数。
    适用场景:这种方法适用于需要直接管理单个子进程的情况,尤其是当你需要在特定点启动和控制这个进程时。比如界面中需要调用一个子进程

    第二种方法:使用multiprocessing.Pool

    mp_context = multiprocessing.get_context('spawn')
    pool = mp_context.Pool(processes=1)
    
    

    解释:这段代码使用multiprocessing.get_context(‘spawn’)获取一个特定的上下文(这里是’spawn’),并基于这个上下文创建一个进程池(Pool)。
    适用场景:这种方法适用于需要管理多个子进程的情况,尤其是在需要并行处理多个任务时。进程池可以有效地管理和分配多个任务给多个进程。这里需要程序需要处理多个任务时使用

    以下是两个实例,分别演示如何使用Process类和multiprocessing.Pool来创建和管理子进程。

    实例一:使用Process类

    展示了如何直接使用Process类来启动一个单独的服务器进程。

    import time
    from multiprocessing import Process
    
    def Server(ipc_url0, ipc_url1):
        print(f"Server started with IPC URLs: {ipc_url0}, {ipc_url1}")
        for _ in range(5):
            print("Server is running...")
            time.sleep(1)
        print("Server has finished running.")
    
    class MyServer:
        def __init__(self, ipc_url0, ipc_url1):
            self.ipc_url0 = ipc_url0
            self.ipc_url1 = ipc_url1
            self.process = Process(target=Server, args=(self.ipc_url0, self.ipc_url1))
    
        def start(self):
            self.process.start()
            self.process.join()
    
    if __name__ == '__main__':
        my_server = MyServer('ipc://localhost:5555', 'ipc://localhost:5556')
        my_server.start()
        print("Main process has finished.")
    
    

    同步执行:
    self.process.join()的作用是阻塞主进程,直到self.process进程结束。这确保了主进程会等待子进程完成后再继续执行后续代码。

    实例二:使用multiprocessing.Pool

    展示了如何使用multiprocessing.Pool来管理多个并行任务。在这个例子中,我们创建一个进程池,并使用它来并行执行多个任务。

    import time
    import multiprocessing
    
    def worker(number):
        print(f"Worker {number} started")
        time.sleep(2)
        print(f"Worker {number} finished")
    
    if __name__ == '__main__':
        # 使用 'spawn' 上下文创建进程池
        mp_context = multiprocessing.get_context('spawn')
        pool = mp_context.Pool(processes=3)  # 创建一个包含3个进程的池
    
        # 提交任务到进程池
        results = []
        for i in range(5):
            result = pool.apply_async(worker, args=(i,))
            results.append(result)
    
        # 等待所有任务完成
        for result in results:
            result.wait()
    
        pool.close()
        pool.join()
        print("All tasks have finished.")
    
    

    异步执行

    self.pool.starmap和pool.apply_async区别

    self.pool.starmap

    starmap方法类似于map,但它接受一个参数列表的列表,将这些参数解包传递给目标函数。这使得starmap非常适合需要传递多个参数的函数。

    import multiprocessing
    import time
    
    def worker(x, y):
        print(f"Worker started with args: ({x}, {y})")
        time.sleep(2)
        return x + y
    
    if __name__ == '__main__':
        with multiprocessing.Pool(processes=3) as pool:
            # 参数列表的列表
            args = [(1, 2), (3, 4), (5, 6), (7, 8)]
            # 使用 starmap 执行任务
            results = pool.starmap(worker, args)
            print("Results:", results)
    
    

    pool.apply_async

    apply_async方法允许异步地调度单个任务,并且可以立即返回一个AsyncResult对象。这个对象可以用于获取任务结果、检查任务状态或者等待任务完成。

    import multiprocessing
    import time
    
    
    def worker(x):
        print(f"Worker started with arg: {x}")
        time.sleep(2)
        return x * x
    
    
    if __name__ == '__main__':
        # 创建一个进程池
        with multiprocessing.Pool(processes=3) as pool:
            # 提交多个异步任务
            results = [pool.apply_async(worker, args=(i,)) for i in range(15)]
    
            # 可以在这里执行其他操作,不需要等待任务完成
            print("Main process continues to run while workers are processing.")
    
            # 获取任务结果
            for result in results:
                print("Result:", result.get())
    
    
        print("Main process has finished.")
    
    

    异步执行期间,继续执行主进程代码

    import multiprocessing
    import time
    
    
    def worker(x):
        print(f"Worker started with arg: {x}")
    
    
    if __name__ == '__main__':
        pool = multiprocessing.Pool(processes=4)
    
        # 提交第一批任务
        results1 = [pool.apply_async(worker, args=(i,)) for i in range(15)]
    
        # 提交第二批任务
        results2 = [pool.apply_async(worker, args=(i + 5,)) for i in range(15)]
    
        # 可以在这里执行其他操作,不需要等待任务完成
        print("Main process continues to run while workers are processing.")
    
        # 继续执行主进程的其他操作
        for i in range(5):
            print(f"Main process doing other work {i}")
            time.sleep(1)
    
        print("Main process has finished.")
    
    


    上面进程池里的代码全部运行完,但是如果主进程比子进程快,那么子进程就会提前结束,那么为了避免进程池的任务提前结束未完成,则需要判断:

    进程池完成结束判断

    import multiprocessing
    import time
    
    
    def worker(x):
        print(f"Worker started with arg: {x}")
    
    
    def init_pool():
        pool = multiprocessing.Pool(processes=4)
        return pool
    
    
    def check_pool(results):
        # 检查任务状态
        while True:
            # 检查所有任务是否完成
            if all(result.ready() for result in results):
                print("All tasks are completed.")
                break
            else:
                print("Some tasks are still running...")
                time.sleep(0.2)  # 等待一段时间后再检查
    
    
    if __name__ == '__main__':
        pool = init_pool()
        # 提交第一批任务
        results1 = [pool.apply_async(worker, args=(i,)) for i in range(15)]
    
        # 提交第二批任务
        results2 = [pool.apply_async(worker, args=(i + 5,)) for i in range(15)]
    
        # 可以在这里执行其他操作,不需要等待任务完成
        print("Main process continues to run while workers are processing.")
        results = results1 + results2
        # 继续执行主进程的其他操作
        for i in range(5):
            print(f"Main process doing other work {i}")
            pool.apply_async(worker, args=(100 + i,))
    
            # time.sleep(0.1)
        check_pool(results)
    
        print("Main process has finished.")
    

    将其模块化:

  • init_pool初始化进程池,返回pool
  • check_pool检查进程池状态
  • results为组合进程池中的结果,没有返回结果一样可以用,如果worker中有return,则for result in all_results:result.get()
  • 选择进程启动方法

    通过 multiprocessing.get_context() 方法,你可以显式地选择进程启动方法,保证代码在不同平台上的行为一致。常见的启动方法有:

    fork: 父进程被复制,子进程继承父进程的资源。这是 Unix 系统默认的启动方法。
    spawn: 父进程启动一个全新的 Python 解释器,并在这个新解释器中导入必要的资源。这是 Windows 系统默认的启动方法,也是 Unix 系统的可选方法。
    forkserver: 父进程启动一个单独的服务器进程,后续子进程通过与这个服务器通信来创建。这在某些情况下比 fork 更安全。

    import multiprocessing
    import time
    
    
    def worker(x):
        print(f"Worker started with arg: {x}")
        time.sleep(2)
        print(f"Worker finished with arg: {x}")
    
    
    if __name__ == '__main__':
        # 获取 spawn 启动方法的上下文
        mp_context = multiprocessing.get_context('spawn')
        
        # 使用 spawn 上下文创建进程池
        pool = mp_context.Pool(processes=4)
        
        # 提交任务
        results = [pool.apply_async(worker, args=(i,)) for i in range(5)]
        
        # 可以在这里执行其他操作,不需要等待任务完成
        print("Main process continues to run while workers are processing.")
        
        # 获取所有任务结果
        for result in results:
            result.wait()  # 等待任务完成
        
        # 关闭进程池
        pool.close()
        pool.join()
    
        print("Main process has finished.")
    
    

    作者:一休哥※

    物联沃分享整理
    物联沃-IOTWORD物联网 » (Python)Multiprocessing子进程实战教程:Process类的深度解析与总结

    发表回复