Python:multiprocessing多进程简单使用——Process、进程通信

Python中的多进程编程主要使用multiprocessing模块。多进程是指在同一个程序中同时运行多个进程,每个进程拥有自己的内存空间,因此不会像多线程那样存在全局解释器锁(GIL)的限制。多进程适用于CPU密集型任务,而多线程更适合I/O密集型任务。

1、多进程的基本使用

multiprocessing模块提供了类似于threading模块的API,创建和管理进程非常简单。

from multiprocessing import Process

def worker():
    print("Worker process running")

if __name__ == '__main__':
    p = Process(target=worker)
    p.start()  
    p.join()  

2、多进程间的数据通信

1、队列

multiprocessing.Queue是进程安全的队列,多个进程可以通过它来实现数据共享。

from multiprocessing import Process, Queue

def worker(q):
    q.put("Data from worker")

if __name__ == '__main__':
    q = Queue()
    p = Process(target=worker, args=(q,))
    p.start()
    print(q.get())
    p.join()

2、管道

multiprocessing.Pipe提供了双向通信的管道,它返回一对连接对象,进程可以通过它们进行通信。

from multiprocessing import Process, Pipe

def worker(conn):
    conn.send("Message from worker")
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=worker, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    p.join()

3、共享内存(Value 和 Array)

multiprocessing.Valuemultiprocessing.Array允许在进程间共享一些基本数据类型。虽然每个进程有自己独立的内存空间,但这些共享变量存储在共享内存中,所有进程都可以访问。

from multiprocessing import Process, Value

def worker(val):
    val.value = 100  

if __name__ == '__main__':
    shared_value = Value('i', 0) 
    p = Process(target=worker, args=(shared_value,))
    p.start()
    p.join()
    print(shared_value.value) 

4、管理器

multiprocessing.Manager可以创建共享的对象,如listdict等。这些对象可以在多个进程之间共享和修改。

from multiprocessing import Process, Manager

def worker(shared_dict):
    shared_dict['data'] = 42

if __name__ == '__main__':
    manager = Manager()
    shared_dict = manager.dict()
    p = Process(target=worker, args=(shared_dict,))
    p.start()
    p.join()
    print(shared_dict) 

总结

  • Queue: 适合进程间的消息传递,先进先出。
  • Pipe: 提供双向通信。
  • ValueArray: 适合共享基本数据类型。
  • Manager: 提供复杂对象的共享,如列表、字典等。
  • 3、子进程之间的通信

    子进程之间的通信可以通过相同的进程间通信(IPC)机制来实现,比如使用队列、管道、共享内存、管理器等。你可以让两个子进程都访问同一个通信对象,来进行数据的交互。具体实现方式如下:

    1. 通过Queue进行子进程之间的通信

    可以让两个子进程共享同一个Queue对象来实现通信:

    from multiprocessing import Process, Queue
    
    def producer(queue):
        queue.put("Data from producer")
    
    def consumer(queue):
        data = queue.get()
        print(f"Consumer got: {data}")
    
    if __name__ == '__main__':
        q = Queue()
    
        p1 = Process(target=producer, args=(q,))
        p2 = Process(target=consumer, args=(q,))
    
        p1.start()
        p2.start()
        p1.join()
        p2.join()
    

    2. 通过Manager共享复杂对象

    也可以使用Manager来创建共享的数据结构(如字典或列表),然后多个子进程都可以访问和修改这些共享数据:

    from multiprocessing import Process, Manager
    
    def worker1(shared_list):
        shared_list.append("Data from worker 1")
    
    def worker2(shared_list):
        shared_list.append("Data from worker 2")
    
    if __name__ == '__main__':
        manager = Manager()
        shared_list = manager.list()  # 共享列表
    
        p1 = Process(target=worker1, args=(shared_list,))
        p2 = Process(target=worker2, args=(shared_list,))
    
        p1.start()
        p2.start()
    
        p1.join()
        p2.join()
    
        print(shared_list)
    

    4、进程运行时获取结果

    进程运行时同样可以获取结果,不必等到进程结束后再获取。可以通过轮询或事件驱动的方式,实时地从子进程中获取数据。

    1. 通过Queue实时获取数据

    可以在主进程中轮询Queue,持续获取子进程传来的数据。例如,子进程可以在其执行过程中持续往队列中发送数据,而主进程实时地从队列中读取。

    from multiprocessing import Process, Queue
    import time
    
    def worker(queue):
        for i in range(5):
            time.sleep(1)
            queue.put(f"Data {i} from worker")
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=worker, args=(q,))
        p.start()
    
        while p.is_alive() or not q.empty():
            while not q.empty():
                data = q.get()
                print(f"Main process got: {data}")
            time.sleep(0.5)
    
        p.join()
    

    2. 通过Pipe实时获取数据

    Pipe也可以用来实现实时通信,主进程可以不断从管道中读取子进程传来的数据。

    from multiprocessing import Process, Pipe
    import time
    
    def worker(conn):
        for i in range(5):
            time.sleep(1)
            conn.send(f"Data {i} from worker")
        conn.close()
    
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()
        p = Process(target=worker, args=(child_conn,))
        p.start()
    
        while p.is_alive() or parent_conn.poll():
            if parent_conn.poll():
                data = parent_conn.recv()
                print(f"Main process got: {data}")
            time.sleep(0.5)
    
        p.join()
    
    

    5、说明

    1. 主进程和子进程的资源关系

    在Python的multiprocessing模块中,每个子进程都是独立的,和主进程之间的资源关系如下:

  • 独立内存空间: 子进程有自己独立的内存空间。主进程和子进程之间的数据不共享,彼此的变量和对象在各自的进程中是独立的。即使主进程修改了某个变量,子进程并不会看到这个变化,反之亦然。
  • 资源继承: 子进程会继承主进程启动时的一些资源(比如文件描述符、环境变量、当前的工作目录等)。但是,子进程运行后,它们对这些资源的更改不会影响主进程。
  • 独立执行: 每个子进程都有自己的代码执行路径和生命周期,可以独立于主进程运行。
  • 此外:

    Python 的多进程模块在创建进程时,底层可能采用不同的启动方式,主要有三种:

    1. fork(仅 Unix):子进程是通过 os.fork() 创建的,它会复制父进程的内存空间(写时复制)。初始时,子进程和父进程共享相同的内存内容,但当其中一个进程对内存进行写操作时,才会真正复制对应的内存页。
    2. spawn(默认在 Windows 上):子进程会启动一个新的 Python 解释器,主进程中的资源不会被继承。需要通过显式传递来共享数据。
    3. forkserver(仅 Unix):会启动一个单独的进程作为服务来管理新进程的创建。

    2. 子进程之间的通信注意事项

    1. 同步与锁:当多个进程同时访问共享资源时,可能需要使用锁(Lock)来避免竞争条件。

    2. 死锁风险:需要小心处理进程间的等待,避免出现死锁情况。

    3. join和terminate

    join等待进程自然结束,terminate强制结束进程。

    例如无线循环的进程,使用join会无限等待,则需要使用terminate强制停止,或传入Event优雅停止。

    4. 全局解释锁

    全局解释器锁(Global Interpreter Lock,简称 GIL)是 Python 解释器中的一种机制,主要用于同步线程执行。在 CPython(Python 的标准实现)中,由于 GIL 的存在,任何时刻只有一个线程能够在解释器中执行 Python 字节码,即使是在多核或多 CPU 系统上也是如此。

    GIL 的主要目的是简化内存管理,并防止多线程同时修改数据而导致冲突。尽管 GIL 对于保护 CPython 中共享的内存资源是有用的,但它也限制了多线程程序在多核处理器上的并行执行能力。这意味着即使是高度并行化的应用,在 CPython 上也可能无法充分利用所有可用的处理器核心来加速计算密集型任务。

    然而,对于 I/O 密集型或需要频繁切换任务的应用来说,GIL 的影响可能不太显著,因为当线程等待 I/O 操作完成时(如磁盘访问、网络请求等),GIL 会被释放,允许其他线程运行。此外,一些 Python 替代实现,如 Jython 或 IronPython,并没有 GIL,因此可以更好地支持真正的并行执行。对于希望绕过 GIL 的 CPython 用户,通常会采用多进程而不是多线程的方法,或者使用像 NumPy 这样的库来加速数值计算。

    作者:Dark_Y3

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python:multiprocessing多进程简单使用——Process、进程通信

    发表回复