【Python】多进程并行计算:multiprocessing模块详解

Python multiprocessing 模块

Python 的 multiprocessing 模块用于 多进程并行计算,可以充分利用 多核 CPU 进行任务加速,突破 Python GIL(全局解释器锁) 的限制,提高程序执行效率。


1. 为什么使用 multiprocessing

Python 默认的 threading 模块使用 线程 进行并发,但由于 GIL(全局解释器锁)的存在,多线程无法真正实现 CPU 级别的并行计算,适用于 I/O 密集型任务(如文件读写、网络请求)。

multiprocessing 创建多个进程,每个进程有独立的 Python 解释器,能够真正实现并行计算,适用于 CPU 密集型任务(如数学计算、数据处理、图像渲染)。


2. 基本用法:创建进程

2.1 multiprocessing.Process

import multiprocessing
import time

def worker(name):
    print(f"Process {name} is running")
    time.sleep(2)
    print(f"Process {name} is done")

if __name__ == "__main__":
    # 创建进程
    p1 = multiprocessing.Process(target=worker, args=("A",))
    p2 = multiprocessing.Process(target=worker, args=("B",))

    # 启动进程
    p1.start()
    p2.start()

    # 等待子进程完成
    p1.join()
    p2.join()

    print("All processes finished.")

📌 说明

  • multiprocessing.Process(target=worker, args=("A",)) 创建进程。
  • start() 启动进程,join() 等待进程结束。
  • 多个进程可以真正并行运行

  • 3. multiprocessing.Pool:进程池

    当需要 大量进程 时,直接创建多个 Process 可能导致 资源消耗过大Pool 允许我们 控制进程的数量,避免创建过多进程。

    3.1 Pool.map()

    import multiprocessing
    import time
    
    def square(n):
        time.sleep(1)
        return n * n
    
    if __name__ == "__main__":
        with multiprocessing.Pool(processes=4) as pool:
            results = pool.map(square, [1, 2, 3, 4, 5])
            print(results)  # [1, 4, 9, 16, 25]
    

    📌 说明

  • Pool(processes=4) 创建一个最多 4 个进程的进程池。
  • pool.map(square, [1, 2, 3, 4, 5]) 将任务分配到不同进程并行执行

  • 3.2 Pool.apply_async()(异步)

    import multiprocessing
    import time
    
    def worker(n):
        time.sleep(1)
        print(f"Worker {n} finished")
        return n * n
    
    if __name__ == "__main__":
        with multiprocessing.Pool(processes=4) as pool:
            results = [pool.apply_async(worker, args=(i,)) for i in range(5)]
            
            # 获取结果
            outputs = [r.get() for r in results]
            print(outputs)  # [0, 1, 4, 9, 16]
    

    📌 区别

  • apply_async() 异步执行任务,返回 AsyncResult 对象,需要使用 get() 获取结果。
  • map() 适用于返回列表结果,apply_async() 更灵活。

  • 4. 进程间通信

    Python 提供了 QueuePipeManager 进行 进程间数据共享

    4.1 multiprocessing.Queue

    Queue进程安全 的队列,适用于 生产者-消费者模型

    import multiprocessing
    import time
    
    def producer(queue):
        for i in range(5):
            time.sleep(1)
            queue.put(i)
            print(f"Produced: {i}")
    
    def consumer(queue):
        while not queue.empty():
            item = queue.get()
            print(f"Consumed: {item}")
    
    if __name__ == "__main__":
        q = multiprocessing.Queue()
        
        p1 = multiprocessing.Process(target=producer, args=(q,))
        p2 = multiprocessing.Process(target=consumer, args=(q,))
    
        p1.start()
        p1.join()  # 生产完成后再消费
        p2.start()
        p2.join()
    

    📌 说明

  • queue.put(i) 生产数据,queue.get() 取出数据。
  • Queue 进程安全,适用于数据交换。

  • 4.2 multiprocessing.Manager(共享变量)

    普通 list/dict 不能 在多个进程间共享,需要使用 Manager

    import multiprocessing
    
    def worker(shared_list):
        for i in range(5):
            shared_list.append(i)
        print("Updated list:", shared_list)
    
    if __name__ == "__main__":
        with multiprocessing.Manager() as manager:
            shared_list = manager.list()  # 共享列表
            p1 = multiprocessing.Process(target=worker, args=(shared_list,))
            p2 = multiprocessing.Process(target=worker, args=(shared_list,))
    
            p1.start()
            p2.start()
    
            p1.join()
            p2.join()
    
            print("Final list:", shared_list)
    

    📌 说明

  • manager.list() 使列表可在多个进程间共享。
  • manager.dict() 可共享字典。

  • 5. multiprocessing.Lock 处理进程同步

    多个进程 同时访问共享资源 可能会发生 数据竞争Lock 允许一次只允许一个进程访问资源,防止数据不一致。

    import multiprocessing
    import time
    
    def worker(lock, n):
        with lock:  # 获取锁
            print(f"Process {n} is working")
            time.sleep(1)
        # 释放锁
    
    if __name__ == "__main__":
        lock = multiprocessing.Lock()
        processes = [multiprocessing.Process(target=worker, args=(lock, i)) for i in range(5)]
    
        for p in processes:
            p.start()
    
        for p in processes:
            p.join()
    

    📌 说明

  • with lock: 保证同时只有 一个进程 访问临界区。
  • 避免数据竞争,保证数据安全。

  • 6. multiprocessing.ValueArray(共享内存)

    如果多个进程需要共享 简单数值int/float)或 数组,可以使用 ValueArray

    import multiprocessing
    import time
    
    def worker(value):
        with value.get_lock():  # 进程锁,防止竞争
            value.value += 1
    
    if __name__ == "__main__":
        v = multiprocessing.Value("i", 0)  # "i" 代表整数
    
        processes = [multiprocessing.Process(target=worker, args=(v,)) for _ in range(5)]
    
        for p in processes:
            p.start()
    
        for p in processes:
            p.join()
    
        print("Final value:", v.value)  # 结果: 5
    

    📌 说明

  • Value("i", 0) 创建共享整数("i" 代表 int)。
  • Array("i", [1,2,3]) 共享数组。
  • value.get_lock() 保证线程安全。

  • 7. multiprocessing vs threading

    特性 multiprocessing threading
    是否真正并行 ✅ 是(多核 CPU 计算) ❌ 否(受 GIL 限制)
    适用于 CPU 密集型(计算任务) I/O 密集型(网络、文件)
    进程开销
    数据共享 需要 QueueManager 共享内存

    8. 总结

  • multiprocessing.Process 创建进程 以实现并行计算。
  • Pool.map() 批量处理任务,更高效。
  • QueueManager 进行进程间通信
  • Lock 同步进程,防止竞争
  • 适用于 CPU 密集型任务,如科学计算、数据处理、机器学习等。
  • 作者:彬彬侠

    物联沃分享整理
    物联沃-IOTWORD物联网 » 【Python】多进程并行计算:multiprocessing模块详解

    发表回复