multiprocessing包详解【Python】
multiprocessing包详解【Python】
简介
multiprocessing
是 Python 标准库中的一个包,它支持进程间的并发执行,提供了一个与 threading 模块相似的API。通过使用 multiprocessing
,开发者可以在 Python 程序中创建新的进程,每个进程都拥有自己的 Python 解释器和内存空间,从而能够并行执行任务。这在进行CPU密集型任务时尤其有用,因为它可以绕过全局解释器锁(GIL),让多核或多CPU的系统可以被充分利用.
主要功能
Process
类:用于表示一个进程对象。它允许你启动、停止、等待进程结束等。可以通过传递目标函数和参数来创建 Process 实例。Queue
:提供了一种安全的方式来在进程之间进行数据交换。队列内部使用锁来保证数据的一致性。Pipe
:提供了一种进程间双向通信的方法。管道可以是双向的(双工)或单向的(半双工)。Lock
:用于同步进程间的操作,确保一次只有一个进程可以访问共享资源。Event
:允许进程等待某些事件的发生。进程可以通过调用 set() 方法来触发事件。Semaphore
:用于限制对共享资源的访问,它初始化一个计数器,该计数器表示可以同时访问资源的进程数量。Condition
:允许一个或多个进程等待某个条件的发生,同时提供了通知机制。Pool
类:允许创建一组进程池,可以将任务分配给池中的进程执行。支持同步和异步的任务执行方式,如 map、apply、apply_async 和 map_async。Value
和 Array
类允许在不同进程之间共享数据。这些数据存储在共享内存中,可以由多个进程并发访问。Manager
:提供了一种更灵活的方式来共享数据。它可以创建一个服务器进程,其他进程通过代理的方式访问服务器进程中的对象,如列表、字典等。cpu_count
:返回可用的 CPU 数量,这对于决定进程池的大小很有用。current_process
:返回当前进程的信息。active_children
:返回当前活跃子进程的列表。功能详解
进程管理
Process
类
构造方法
group
:目前为保留参数,用于未来扩展,应始终为 None。target
:表示这个进程实例所调用对象,你可以传入一个可调用对象或函数,这个对象会在新进程启动时执行。name
:为进程指定一个名称,主要用于调试和跟踪。args
:是 target 调用时的位置参数,传递给函数的参数元组。kwargs
:是 target 调用时的关键字参数,传递给函数的字典。daemon
:如果设置为 True,则这个进程将被标记为守护进程,主程序结束时会自动杀死守护进程。默认值 None 表示继承父进程的守护标志。实例方法
start()
join(timeout=None)
is_alive()
terminate()
kill()
close()
属性
exitcode
name
daemon
pid
示例
from multiprocessing import Process
import time
def worker(name, sleep_time):
print(f"Started worker {name}")
time.sleep(sleep_time)
print(f"Sleep time :{sleep_time}")
if __name__ == "__main__":
for i in range(3):
process = Process(target=worker, args=(f'Bob-{i}',i), name=f'Worker-{i}')
process.start() # 启动进程
print(f"进程是否存活:{process.is_alive()}")
print(time.time())
>>>
进程是否存活:True
1734319313.823571
进程是否存活:True
1734319313.824584
进程是否存活:True
1734319313.825512
Started worker Bob-1
Started worker Bob-2
Started worker Bob-0
Sleep time :0
Sleep time :1
Sleep time :2
Process
类的方法和属性可以让你更精确地控制进程的生命周期和行为join()
方法,则各个进程则会被阻塞进程间通信(IPC)
Queue
类
Queue
类是 multiprocessing
模块中提供的一个重要组件,用于实现进程间通信(IPC)。通过队列,不同的进程可以安全地交换信息或数据。
构造方法
Queue(maxsize=0)
实例方法
put(item, block=True, timeout=None)
get(block=True, timeout=None)
如果 block 为 False,但队列为空,则立即抛出 queue.Empty 异常。
empty()
full()
qsize()
close()
join_thread()
cancel_join_thread()
示例
from multiprocessing import Process, Queue
def worker(q, msg):
q.put(msg)
if __name__ == "__main__":
msg = "Hello from parent process"
q = Queue()
p_1 = Process(target=worker, args=(q, f"{msg}-1"))
p_2 = Process(target=worker, args=(q, f"{msg}-2"))
p_1.start()
p_2.start()
p_1.join()
p_2.join()
while not q.empty():
print(q.get())
>>>
Hello from parent process-1
Hello from parent process-2
注意事项
Pipe
函数
Pipe 函数是 multiprocessing 模块中用于进程间通信的另一种主要机制。它创建了一对连接对象,默认情况下是双向的(双工),但也可以配置为单向(半双工)。通过管道,进程可以相互发送和接收 Python 对象。Pipe 通常用于两个进程间的通信。
Pipe() 函数
multiprocessing.Pipe(duplex=True)
Pipe() 函数返回一对 Connection 对象,代表管道的两端。
Connection
对象的方法
管道两端的 Connection 对象提供了一组用于通信的方法:
send(obj)
recv()
fileno()
close()
poll(timeout=None)
示例
from multiprocessing import Process, Pipe
def worker(conn):
conn.send([42, None, 'hello from child'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn,))
p.start()
print(parent_conn.recv()) # 输出: [42, None, 'hello from child']
parent_conn.close()
p.join()
>>>
[42, None, 'hello from child']
注意事项
Queue
。同步原语
Lock
类
Lock
类是 multiprocessing
模块中提供的一种简单的同步原语。它用于控制多个进程对共享资源的访问,以防止资源的并发访问导致数据错乱或损坏。Lock 类似于线程模块中的锁,但它是为进程间同步而设计的。
构造方法
Lock 类没有特定的构造参数,你可以直接创建一个 Lock 实例
from multiprocessing import Lock
lock = Lock()
实例方法
acquire(block=True, timeout=-1)
release()
释放锁。只有在当前进程持有锁时才能调用此方法;否则,将抛出 RuntimeError。释放锁后,其他阻塞在 acquire() 调用中等待获取锁的进程将能够获取锁。
locked()
示例
from multiprocessing import Process, Lock
import time
# 定义一个简单的共享资源访问函数
def worker_with(lock, num):
with lock:
print(f"Worker {num} has acquired the lock")
time.sleep(1)
print(f"Worker {num} is releasing the lock")
if __name__ == "__main__":
lock = Lock()
processes = [Process(target=worker_with, args=(lock, n)) for n in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
print("Processing complete.")
>>>
确保了在任何时刻只有一个进程能够访问共享资源,从而避免了并发访问的问题
注意事项
Lock
应该谨慎使用,以避免复杂的同步问题。Event
类
Event
类是 Python multiprocessing
模块中提供的一个同步原语,用于在进程之间通信和协调。它模拟了线程模块中的 Event 对象,允许一个进程向其他多个进程发出某个事件的发生,这对于在多个进程之间同步操作非常有用
构造方法
创建一个 Event
对象很简单,不需要任何参数
from multiprocessing import Event
event = Event()
实例方法
set()
clear()
is_set()
wait(timeout=None)
示例
from multiprocessing import Process, Event
import time
def worker(event, id):
print(f"Worker {id} waiting for event.")
event.wait() # 阻塞,直到事件被设置。
print(f"Worker {id} received event.")
if __name__ == "__main__":
event = Event()
# 创建并启动三个工作进程
workers = [Process(target=worker, args=(event, i)) for i in range(3)]
for w in workers:
w.start()
print("Main process doing some work.")
time.sleep(2) # 模拟主进程工作一段时间
event.set() # 设置事件,唤醒所有等待的进程
print("Main process triggered the event.")
for w in workers:
w.join() # 等待所有工作进程完成
print("Main process exiting.")
>>>
Main process doing some work.
Worker 0 waiting for event.
Worker 1 waiting for event.
Worker 2 waiting for event.
Main process triggered the event.
Worker 0 received event.
Worker 1 received event.
Worker 2 received event.
Main process exiting.
注意事项
Event
可以实现进程间的简单通信,但它不提供保护共享资源的机制。如果需要对共享资源进行访问控制,应考虑使用锁(如 Lock
、RLock
)。Event
对象适用于当一个进程必须等待一个或多个进程发出特定信号才能继续执行的场景。Event
进行同步时,要注意可能的死锁情况,特别是在复杂的进程间通信和协调场景下。Semaphore
类
Semaphore 类是 multiprocessing 模块中提供的一种同步原语,用于控制对共享资源的访问数量。它可以被视为一个可用资源的计数器,是一种更为通用的同步机制,可以用来解决多个进程访问有限数量的资源问题。
构造方法
Semaphore
对象在创建时可以接受一个可选的整数值,用于指定信号量的初始值,即同时可以访问共享资源的进程数量。如果不指定,则默认值为1,此时它的行为类似于 Lock
。
from multiprocessing import Semaphore
sem = Semaphore(value=3)
实例方法
acquire(block=True, timeout=None)
release()
get_value()
(在某些Python版本中不推荐使用)
示例
from multiprocessing import Process, Semaphore
import time
sem = Semaphore(2) # 最多允许2个进程同时访问共享资源
def worker(sem, num):
with sem:
print(f"Worker {num} is working.")
time.sleep(num) # 模拟耗时操作
print(f"Worker {num} finished.")
if __name__ == "__main__":
workers = [Process(target=worker, args=(sem, i)) for i in range(5)]
for w in workers:
w.start()
for w in workers:
w.join()
print("All workers completed.")
>>>
Worker 0 is working.
Worker 0 finished.
Worker 4 is working.
Worker 3 is working.
Worker 3 finished.
Worker 2 is working.
Worker 4 finished.
Worker 1 is working.
Worker 1 finished.
Worker 2 finished.
All workers completed.
注意事项
Semaphore
时应当注意,acquire
和 release
方法调用需要成对出现,以避免信号量计数器的值被错误地修改,导致资源访问控制混乱。Semaphore
可用于实现各种同步模式,包括限制对资源的并发访问、实现生产者消费者问题等。Semaphore
可以有效地控制资源访问,防止资源竞争和冲突,但也需要注意避免死锁和其他同步问题。Condition
类
Condition
类是 multiprocessing
模块中提供的一个同步原语,用于在进程之间等待某些条件的满足。它允许一个或多个进程等待某个条件变为真,而在条件满足时能够通知一个或多个等待的进程继续执行。Condition
常常与共享资源或状态变更相关的场景一起使用,提供了一种更加灵活的进程同步机制
构造方法
在创建 Condition 对象时,可以选择传递一个 Lock 或 RLock 对象用于内部使用。如果不传递,则 Condition 对象会自动创建一个新的 RLock 对象
from multiprocessing import Condition
cond = Condition()
实例方法
acquire(*args, **kwargs)
release()
wait(timeout=None)
notify(n=1)
notify_all()
或 notifyAll()
示例
from multiprocessing import Process, Condition, Array
import time
def producer(cond, shared_array):
with cond:
print("Producer adding item.")
shared_array[0] += 1
print("Producer done, item added.")
cond.notify()
def consumer(cond, shared_array):
with cond:
print("Consumer waiting for item.")
cond.wait()
print("Consumer consumed item.")
shared_array[0] -= 1
if __name__ == "__main__":
condition = Condition()
shared_array = Array('i', [0])
p = Process(target=producer, args=(condition, shared_array))
c = Process(target=consumer, args=(condition, shared_array))
c.start()
time.sleep(2) # 确保消费者先运行
p.start()
p.join()
c.join()
print("Final shared value:", shared_array[0])
>>>
Consumer waiting for item.
Producer adding item.
Producer done, item added.
Consumer consumed item.
Final shared value: 0
注意事项
Condition
对象时,必须确保在调用 wait()
、notify()
和 notify_all()
方法前已经获得了与条件关联的锁。wait()
方法在返回前会重新获得锁,这意味着调用 wait()
后不需要再次显式调用 acquire()
。notify()
和 notify_all()
不会立即释放锁,而是在当前进程的锁释放操作(release()
)执行后,被通知的进程才能继续执行。Condition
可以解决复杂的同步问题,但设计不当可能导致死锁或性能问题,因此需要谨慎使用。进程池
Pool
类
Pool
类是 Python 的 multiprocessing
模块提供的一个高级接口,用于并行执行多个进程。Pool
类可以自动管理进程池中的进程数量,让你轻松地将任务分配给进程池执行,而无需手动管理每个进程的创建和终止。这对于执行 CPU 密集型任务特别有用
构造方法
创建 Pool 对象时,可以指定几个参数,最常用的是 processes 参数,它指定了进程池中进程的数量。如果不指定,默认为机器的 CPU 核心数
from multiprocessing import Pool
pool = Pool(processes=4)
实例方法
apply(func, args=(), kwds={})
apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
map(func, iterable, chunksize=None)
map_async(func, iterable, chunksize=None, callback=None, error_callback=None)
close()
join()
示例
from multiprocessing import Pool
import time
import os
def square(x):
time.sleep(1)
return x * x
if __name__ == '__main__':
print(time.time())
# 创建一个包含4个进程的进程池
with Pool(processes=os.cpu_count()) as p:
# 使用 map 函数分配任务
results = p.map(square, range(10))
print(results)
# 使用 apply_async 异步执行任务
async_result = p.apply_async(square, (10,))
print(async_result.get()) # 使用 get 方法获取异步任务的结果
print(time.time())
>>>
1734330507.251192
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
100
1734330510.379658
注意事项
Pool
时,确保只在 if __name__ == '__main__'
: 块中创建 Pool
对象和执行任务。这可以防止在子进程中无意中创建新的子进rocesses
,导致程序崩溃或行为异常。Pool
的 map
和 apply
方法会阻塞主进程,直到所有任务完成。如果需要非阻塞操作,应使用 map_async
和 apply_async
方法。Pool
后,应调用 close()
方法关闭进程池,然后调用 join()
方法等待所有进程完成。这是良好的资源管理习惯,可以防止资源泄露。terminate()
方法会立即停止所有进程,应谨慎使用,以免丢失未完成的工作共享状态
Value
和 Array
类
在 Python 的 multiprocessing
模块中,Value
和 Array
是两个用于进程间共享数据的类。由于进程间共享内存并非像线程间共享全局变量那样简单,这两个类提供了一种在不同进程间安全共享数据的方法。
Value
Value
类用于在进程之间共享一个存储在共享内存中的单一数据值。它适用于当你只需要共享一个简单的数据元素,如一个整数或浮点数时。
使用方法:创建 Value
对象时,需要指定数据类型和初始值。数据类型是使用类似于 C 语言类型声明的字符串来指定的,例如 ‘i’ 表示整数,‘d’ 表示双精度浮点数。
from multiprocessing import Value
# 创建一个共享的整数变量,初始值为 0
num = Value('i', 0)
访问和修改值:可以通过 .value 属性来访问和修改存储在 Value 中的数据。
# 读取值
print(num.value)
# 修改值
num.value = 10
Array
Array
类用于在进程之间共享一个存储在共享内存中的数组。它适用于当你需要共享一组数据(如一系列整数或浮点数)时。
使用方法:创建 Array
对象时,同样需要指定数据类型和数组的大小。数据类型的指定方式与 Value 类似,数组的大小则通过一个整数来指定。
from multiprocessing import Array
# 创建一个共享的整数数组,包含 10 个元素,初始值都为 0
arr = Array('i', 10)
访问和修改数组元素:Array 支持类似于列表的索引和切片操作来访问和修改数组中的元素。
# 读取第一个元素
print(arr[0])
# 修改第一个元素
arr[0] = 1
# 获取数组的长度
print(len(arr))
# 使用切片
arr[1:3] = [2, 3]
Manager
类
Manager 类在 Python multiprocessing 模块中提供了一种方式,允许你在不同进程间共享数据。与 Value 和 Array 直接在共享内存中存储数据不同,Manager 类创建的数据结构是通过一个服务器进程管理的。这意味着它允许更加灵活的数据共享方式,不仅限于简单的数值或数组,也支持列表、字典、Namespace 等更复杂的数据结构。
基本使用
使用 Manager 类时,首先需要创建一个 Manager 对象,然后使用这个对象来创建共享的数据结构。
from multiprocessing import Manager
with Manager() as manager:
# 创建一个共享的列表
shared_list = manager.list([0, 1, 2])
# 创建一个共享的字典
shared_dict = manager.dict({'a': 1, 'b': 2})
创建的共享数据结构可以像普通的数据结构那样被访问和修改,但是它们实际上是在一个单独的服务器进程中维护的。这意味着你可以安全地在多个进程间共享和修改这些数据结构,而无需担心进程安全问题。
Manager 支持的类型
Manager 类支持多种类型的共享数据结构,包括但不限于:
示例代码
以下示例展示了如何使用 Manager 类在进程间共享列表和字典:
from multiprocessing import Process, Manager
def worker(shared_list, shared_dict):
shared_list.append('new item')
shared_dict['new_key'] = 'new value'
if __name__ == "__main__":
with Manager() as manager:
shared_list = manager.list([1, 2, 3])
shared_dict = manager.dict({'key1': 'value1', 'key2': 'value2'})
p = Process(target=worker, args=(shared_list, shared_dict))
p.start()
p.join()
print(f"Shared list: {shared_list}")
print(f"Shared dict: {shared_dict}")
其他组件
multiprocessing 模块提供了一些用于查询系统和进程状态的函数,这些函数对于编写并发程序时了解资源利用情况和进程管理非常有用。以下是对 cpu_count、current_process 和 active_children 三个方法的详细解释:
cpu_count()
cpu_count
函数返回机器上可用的 CPU 核心数。这个信息对于决定并行程序中进程池(Pool)的大小非常有用,因为你可能想要根据可用的处理器数量来优化你的程序性能。
使用场景:在创建进程池时,可以根据 cpu_count 的返回值来设置进程池的大小。例如,如果你的机器有 4 个 CPU 核心,那么创建一个包含 4 个进程的进程池可能是一个合理的选择。
from multiprocessing import cpu_count
print(f"Number of CPUs: {cpu_count()}")
current_process()
current_process
函数返回当前进程的信息。这个函数返回一个 Process 对象,其中包含有关当前进程的详细信息,如进程的名称、PID(进程ID)等。
使用场景:这个函数在调试并发程序时特别有用,因为你可以用它来识别当前正在执行的进程。例如,在多进程环境中打印日志时,可能会包含进程的名称或 PID 来区分日志消息是从哪个进程生成的。
from multiprocessing import current_process
process = current_process()
print(f"Current process name: {process.name}")
print(f"Current process ID: {process.pid}")
active_children()
active_children
函数返回一个列表,包含当前活跃的子进程对象。每个子进程对象都包含有关该进程的信息,如名称、PID 等。这个函数可以在父进程中调用,以获取当前所有活跃的子进程列表。
使用场景:在管理和监控基于 multiprocessing 的并发程序时,active_children 函数可以帮助你了解有多少子进程正在运行。这对于确保所有预期内的子进程都已启动,或者在程序结束时检查是否还有未完成的子进程非常有用。
from multiprocessing import Process, active_children
import time
def worker():
print("Worker sleeping...")
time.sleep(2)
print("Worker done.")
if __name__ == "__main__":
for _ in range(2):
p = Process(target=worker)
p.start()
time.sleep(1) # 等待一段时间,让子进程启动
for child in active_children():
print(f"Active child: PID={child.pid}, Name={child.name}")
作者:BBluster