【Python】多进程并行计算:multiprocessing模块详解
Python multiprocessing
模块
Python 的 multiprocessing
模块用于 多进程并行计算,可以充分利用 多核 CPU 进行任务加速,突破 Python GIL(全局解释器锁) 的限制,提高程序执行效率。
1. 为什么使用 multiprocessing
?
Python 默认的 threading
模块使用 线程 进行并发,但由于 GIL(全局解释器锁)的存在,多线程无法真正实现 CPU 级别的并行计算,适用于 I/O 密集型任务(如文件读写、网络请求)。
✅ multiprocessing
创建多个进程,每个进程有独立的 Python 解释器,能够真正实现并行计算,适用于 CPU 密集型任务(如数学计算、数据处理、图像渲染)。
2. 基本用法:创建进程
2.1 multiprocessing.Process
import multiprocessing
import time
def worker(name):
print(f"Process {name} is running")
time.sleep(2)
print(f"Process {name} is done")
if __name__ == "__main__":
# 创建进程
p1 = multiprocessing.Process(target=worker, args=("A",))
p2 = multiprocessing.Process(target=worker, args=("B",))
# 启动进程
p1.start()
p2.start()
# 等待子进程完成
p1.join()
p2.join()
print("All processes finished.")
📌 说明
multiprocessing.Process(target=worker, args=("A",))
创建进程。start()
启动进程,join()
等待进程结束。3. multiprocessing.Pool
:进程池
当需要 大量进程 时,直接创建多个 Process
可能导致 资源消耗过大。Pool
允许我们 控制进程的数量,避免创建过多进程。
3.1 Pool.map()
import multiprocessing
import time
def square(n):
time.sleep(1)
return n * n
if __name__ == "__main__":
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(square, [1, 2, 3, 4, 5])
print(results) # [1, 4, 9, 16, 25]
📌 说明
Pool(processes=4)
创建一个最多 4
个进程的进程池。pool.map(square, [1, 2, 3, 4, 5])
将任务分配到不同进程并行执行。3.2 Pool.apply_async()
(异步)
import multiprocessing
import time
def worker(n):
time.sleep(1)
print(f"Worker {n} finished")
return n * n
if __name__ == "__main__":
with multiprocessing.Pool(processes=4) as pool:
results = [pool.apply_async(worker, args=(i,)) for i in range(5)]
# 获取结果
outputs = [r.get() for r in results]
print(outputs) # [0, 1, 4, 9, 16]
📌 区别
apply_async()
异步执行任务,返回 AsyncResult
对象,需要使用 get()
获取结果。map()
适用于返回列表结果,apply_async()
更灵活。4. 进程间通信
Python 提供了 Queue
、Pipe
、Manager
进行 进程间数据共享。
4.1 multiprocessing.Queue
Queue
是 进程安全 的队列,适用于 生产者-消费者模型。
import multiprocessing
import time
def producer(queue):
for i in range(5):
time.sleep(1)
queue.put(i)
print(f"Produced: {i}")
def consumer(queue):
while not queue.empty():
item = queue.get()
print(f"Consumed: {item}")
if __name__ == "__main__":
q = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producer, args=(q,))
p2 = multiprocessing.Process(target=consumer, args=(q,))
p1.start()
p1.join() # 生产完成后再消费
p2.start()
p2.join()
📌 说明
queue.put(i)
生产数据,queue.get()
取出数据。Queue
进程安全,适用于数据交换。4.2 multiprocessing.Manager
(共享变量)
普通 list/dict
不能 在多个进程间共享,需要使用 Manager
。
import multiprocessing
def worker(shared_list):
for i in range(5):
shared_list.append(i)
print("Updated list:", shared_list)
if __name__ == "__main__":
with multiprocessing.Manager() as manager:
shared_list = manager.list() # 共享列表
p1 = multiprocessing.Process(target=worker, args=(shared_list,))
p2 = multiprocessing.Process(target=worker, args=(shared_list,))
p1.start()
p2.start()
p1.join()
p2.join()
print("Final list:", shared_list)
📌 说明
manager.list()
使列表可在多个进程间共享。manager.dict()
可共享字典。5. multiprocessing.Lock
处理进程同步
多个进程 同时访问共享资源 可能会发生 数据竞争。Lock
允许一次只允许一个进程访问资源,防止数据不一致。
import multiprocessing
import time
def worker(lock, n):
with lock: # 获取锁
print(f"Process {n} is working")
time.sleep(1)
# 释放锁
if __name__ == "__main__":
lock = multiprocessing.Lock()
processes = [multiprocessing.Process(target=worker, args=(lock, i)) for i in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
📌 说明
with lock:
保证同时只有 一个进程 访问临界区。6. multiprocessing.Value
和 Array
(共享内存)
如果多个进程需要共享 简单数值(int/float
)或 数组,可以使用 Value
或 Array
。
import multiprocessing
import time
def worker(value):
with value.get_lock(): # 进程锁,防止竞争
value.value += 1
if __name__ == "__main__":
v = multiprocessing.Value("i", 0) # "i" 代表整数
processes = [multiprocessing.Process(target=worker, args=(v,)) for _ in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
print("Final value:", v.value) # 结果: 5
📌 说明
Value("i", 0)
创建共享整数("i"
代表 int
)。Array("i", [1,2,3])
共享数组。value.get_lock()
保证线程安全。7. multiprocessing
vs threading
特性 | multiprocessing |
threading |
---|---|---|
是否真正并行 | ✅ 是(多核 CPU 计算) | ❌ 否(受 GIL 限制) |
适用于 | CPU 密集型(计算任务) | I/O 密集型(网络、文件) |
进程开销 | 高 | 低 |
数据共享 | 需要 Queue 或 Manager |
共享内存 |
8. 总结
multiprocessing.Process
创建进程 以实现并行计算。Pool.map()
批量处理任务,更高效。Queue
、Manager
进行进程间通信。Lock
同步进程,防止竞争。作者:彬彬侠