Python多进程解析:探索Multiprocessing的高效并行处理之道

Python 多进程解析:Multiprocessing 高效并行处理的奥秘

文章目录

  • Python 多进程解析:Multiprocessing 高效并行处理的奥秘
  • 一 多进程
  • 1 导入进程标准模块
  • 2 定义调用函数
  • 3 创建和启动进程
  • 二 存储进程结果 Queue
  • 三 threading & multiprocessing 对比
  • 1 创建多进程 multiprocessing
  • 2 创建多线程 multithread
  • 3 创建普通函数
  • 4 创建对比时间函数
  • 5 运行结果
  • 四 进程池 Pool
  • 1 进程池 Pool() 和 map()
  • 2 自定义核数量
  • 3 apply_async 单结果返回
  • 4 apply_async 多结果返回
  • 5 划重点
  • 五 共享内存 shared memory
  • 六 进程锁 Lock
  • 1 不加进程锁
  • 2 加进程锁
  • 七 完整代码示例
  • 八 源码地址
  • 在 Python 编程中,多进程(Multiprocessing)是一种提高程序执行效率的重要手段。本文深入解析了多进程的概念与应用,帮助开发者充分利用多核处理器的计算能力。我们从基本的进程创建与启动开始,讲解了如何通过 Queue 实现进程间的数据传递,并通过对比多进程与多线程的性能差异,揭示了多进程在处理 CPU 密集型任务时的显著优势。文章还详细介绍了进程池(Pool)的使用方法,包括 mapapply_async 的不同应用场景。最后,我们探讨了共享内存和进程锁的使用,确保多进程在并发操作中的数据安全性。本文为希望掌握多进程编程的读者提供了全面且易懂的实践指导。

    一 多进程

    Multiprocessing 是一种编程和执行模式,它允许多个进程同时运行,以此提高应用程序的效率和性能。在 Python 中,multiprocessing 模块可以帮助你创建多个进程,使得每个进程都可以并行处理任务,从而有效利用多核处理器的能力。

    1 导入进程标准模块
    import multiprocessing as mp
    
    2 定义调用函数
    def job(a, d):
        print('你好 世界')
    
    3 创建和启动进程
    # 创建进程
    p1 = mp.Process(target=job, args=(1, 2))
    # 启动进程
    p1.start()
    # 连接进程
    p1.join()
    

    二 存储进程结果 Queue

    1 存入输出到 Queue

    # 该函数没有返回值!!!
    def job02(q):
        res = 0
        for i in range(1000):
            res += i + i ** 2 + i ** 3
        q.put(res)  #
    
    def my_result_process02():
        q = mp.Queue()
        p1 = mp.Process(target=job02, args=(q,))
        p2 = mp.Process(target=job02, args=(q,))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        res1 = q.get()
        res2 = q.get()
        print(res1)
        print(res2)
        print(res1 + res2)
    

    三 threading & multiprocessing 对比

    1 创建多进程 multiprocessing
    def job03(q):
        res = 0
        for i in range(1000000):
            res += i + i ** 2 + i ** 3
        # 结果加 queue
        q.put(res)
    
    
    # 多核运算多进程
    def multicore03():
        q = mp.Queue()
        p1 = mp.Process(target=job03, args=(q,))
        p2 = mp.Process(target=job03, args=(q,))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        res1 = q.get()
        res2 = q.get()
        print('multicore:', res1 + res2)
    
    2 创建多线程 multithread
    # 单核运算多线程
    def multithread03():
        # thread可放入process同样的queue中
        q = mp.Queue()
        t1 = td.Thread(target=job03, args=(q,))
        t2 = td.Thread(target=job03, args=(q,))
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        res1 = q.get()
        res2 = q.get()
        print('multithread:', res1 + res2)
    
    3 创建普通函数
    def normal03():
        res = 0
        for _ in range(2):
            for i in range(1000000):
                res += i + i ** 2 + i ** 3
        print('normal:', res)
    
    4 创建对比时间函数
    def time_result03():
        st = time.time()
        normal03()
        st1 = time.time()
        print('normal time:', st1 - st)
        multithread03()
        st2 = time.time()
        print('multithread time:', st2 - st1)
        multicore03()
        print('multicore time:', time.time() - st2)
    
    5 运行结果
    normal03: 499999666667166666000000
    normal03 time: 0.6855959892272949
    multithread03: 499999666667166666000000
    multithread03 time: 0.6804449558258057
    multicore03: 499999666667166666000000
    multicore03 time: 0.38849496841430664
    

    我运行的是 normal03 > multithread03 > multicore03normal03multithread03 相差不大,multicore03normal03multithread03 快将近一倍。

    四 进程池 Pool

    使用进程池 Pool ,Python 会自行解决多进程问题。

    1 进程池 Pool() 和 map()

    map() 返回的是多结果。

    def job04(x):
        # Pool的函数有返回值
        return x * x
    
    def multicore04():
        # Pool的函数有返回值
        pool = mp.Pool()
        # 自分配 CPU 计算
        res = pool.map(job04, range(10))
        print(res)
    
    2 自定义核数量

    Pool 默认大小是 CPU的核数,传入 processes 参数自定义需要的核数量。

    def multicore05():
      	# 定义CPU核数量为3
        pool = mp.Pool(processes=3)  
        res = pool.map(job04, range(10))
        print(res)
    
    3 apply_async 单结果返回

    apply_async() 中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的, 所以在传入值后需要加逗号, 同时需要用 get() 方法获取返回值。

    def multicore06():
        pool = mp.Pool()
        res = pool.apply_async(job04, (2,))
        # 用get获得结果
        print(res.get())
    
    4 apply_async 多结果返回
    def multicore07():
        pool = mp.Pool()
        multi_res = [pool.apply_async(job04, (i,)) for i in range(10)]
        # 用get获得结果
        print([res.get() for res in multi_res])
    
    5 划重点
  • Pool 默认调用是 CPU 的核数,传入 processes 参数可自定义CPU核数。
  • map() 放入迭代参数,返回多个结果。
  • apply_async() 只能放入一组参数,并返回一个结果,如果想得到 map() 的效果需要通过迭代。
  • 五 共享内存 shared memory

    1 定义 Shared Value

    value1 = mp.Value('i', 0)
    value2 = mp.Value('d', 3.14)
    

    2 定义 Shared Array

    它只能是一维数组

    array = mp.Array('i', [1, 2, 3, 4])
    

    其中 d 和 i 参数用来设置数据类型的,d 表示一个双精浮点类型,i 表示一个带符号的整型,参考数据类型如下:

    Type code C Type Python Type Minimum size in bytes Notes
    'b' signed char int 1
    'B' unsigned char int 1
    'u' wchar_t Unicode character 2 (1)
    'h' signed short int 2
    'H' unsigned short int 2
    'i' signed int int 2
    'I' unsigned int int 2
    'l' signed long int 4
    'L' unsigned long int 4
    'q' signed long long int 8
    'Q' unsigned long long int 8
    'f' float float 4
    'd' double float 8

    具体链接:Efficient arrays of numeric values

    六 进程锁 Lock

    1 不加进程锁

    争抢共享内存

    def job08(v, num):
        for _ in range(5):
            time.sleep(0.1)  # 暂停0.1秒,让输出效果更明显
            v.value += num  # v.value获取共享变量值
            print(v.value, end="\n")
    
    
    def multicore08():
        v = mp.Value('i', 0)  # 定义共享变量
        p1 = mp.Process(target=job08, args=(v, 1))
        p2 = mp.Process(target=job08, args=(v, 3))  # 设定不同的number看如何抢夺内存
        p1.start()
        p2.start()
        p1.join()
        p2.join()
    
    2 加进程锁
    def job09(v, num, l):
        l.acquire()  # 锁住
        for _ in range(5):
            # print(v.value, num)
            time.sleep(0.1)
            v.value = v.value + num  # 获取共享内存
            print(v.value)
        l.release()  # 释放
    
    
    def multicore09():
        l = mp.Lock()  # 定义一个进程锁
        v = mp.Value('i', 0)  # 定义共享内存
        
        p1 = mp.Process(target=job09, args=(v, 1, l))  # 需要将lock传入
        p1.start()
        p1.join()
    
        p2 = mp.Process(target=job09, args=(v, 3, l))
        p2.start()
        p2.join()
    
    
    # def multicore10():
    #     l = mp.Lock()  # 定义一个进程锁
    #     v = mp.Value('i', 0)  # 定义共享内存
    #     p1 = mp.Process(target=job09, args=(v, 1, l))  # 需要将lock传入
    #     p2 = mp.Process(target=job09, args=(v, 3, l))
    #     p1.start()
    #     p2.start()
    #     p1.join()
    #     p2.join()
    

    在这个示例中,必须先执行 p1 以达到预期效果。分别运行 multicore09multicore10 会发现一些有意思的情况。

    七 完整代码示例

    :建议在运行 main.py 对应的代码功能时,逐行使用注释进行操作。

    # This is a sample Python script.
    
    # Press ⌃R to execute it or replace it with your code.
    # Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.
    
    import multiprocessing as mp
    import threading as td
    import time as time
    
    
    def print_hi(name):
        # Use a breakpoint in the code line below to debug your script.
        print(f'Hi, {name}')  # Press ⌘F8 to toggle the breakpoint.
    
        # 创建进程
        p1 = mp.Process(target=job, args=(1, 2))
        # 启动进程
        p1.start()
    
        # Shared Value
        value1 = mp.Value('i', 0)
        value2 = mp.Value('d', 3.14)
        # Shared Array,只能是一维数组
        array = mp.Array('i', [1, 2, 3, 4])
    
    
    def job(a, d):
        print('你好 世界')
    
    
    # 该函数没有返回值!!!
    def job02(q):
        res = 0
        for i in range(1000):
            res += i + i ** 2 + i ** 3
        q.put(res)  #
    
    
    def my_result_process02():
        q = mp.Queue()
        p1 = mp.Process(target=job02, args=(q,))
        p2 = mp.Process(target=job02, args=(q,))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        res1 = q.get()
        res2 = q.get()
        print(res1)
        print(res2)
        print(res1 + res2)
    
    
    def job03(q):
        res = 0
        for i in range(1000000):
            res += i + i ** 2 + i ** 3
        # 结果加 queue
        q.put(res)
    
    
    # 多核运算多进程
    def multicore03():
        q = mp.Queue()
        p1 = mp.Process(target=job03, args=(q,))
        p2 = mp.Process(target=job03, args=(q,))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
        res1 = q.get()
        res2 = q.get()
        print('multicore03:', res1 + res2)
    
    
    # 单核运算多线程
    def multithread03():
        # thread可放入process同样的queue中
        q = mp.Queue()
        t1 = td.Thread(target=job03, args=(q,))
        t2 = td.Thread(target=job03, args=(q,))
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        res1 = q.get()
        res2 = q.get()
        print('multithread03:', res1 + res2)
    
    
    def normal03():
        res = 0
        for _ in range(2):
            for i in range(1000000):
                res += i + i ** 2 + i ** 3
        print('normal03:', res)
    
    
    def time_result03():
        st = time.time()
        normal03()
        st1 = time.time()
        print('normal03 time:', st1 - st)
        multithread03()
        st2 = time.time()
        print('multithread03 time:', st2 - st1)
        multicore03()
        print('multicore03 time:', time.time() - st2)
    
    
    def job04(x):
        # Pool的函数有返回值
        return x * x
    
    
    def multicore04():
        # Pool的函数有返回值
        pool = mp.Pool()
        # 自分配 CPU 计算
        res = pool.map(job04, range(10))
        print(res)
    
    
    def multicore05():
        pool = mp.Pool(processes=3)  # 定义CPU核数量为3
        res = pool.map(job04, range(10))
        print(res)
    
    
    def multicore06():
        pool = mp.Pool()
        # apply_async() 中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,
        # 所以在传入值后需要加逗号, 同时需要用get()方法获取返回值
        res = pool.apply_async(job04, (2,))
        # 用get获得结果
        print(res.get())
    
    
    def multicore07():
        pool = mp.Pool()
        multi_res = [pool.apply_async(job04, (i,)) for i in range(10)]
        # 用get获得结果
        print([res.get() for res in multi_res])
    
    
    def job08(v, num):
        for _ in range(5):
            time.sleep(0.1)  # 暂停0.1秒,让输出效果更明显
            v.value += num  # v.value获取共享变量值
            print(v.value, end="\n")
    
    
    def multicore08():
        v = mp.Value('i', 0)  # 定义共享变量
        p1 = mp.Process(target=job08, args=(v, 1))
        p2 = mp.Process(target=job08, args=(v, 3))  # 设定不同的number看如何抢夺内存
        p1.start()
        p2.start()
        p1.join()
        p2.join()
    
    
    def job09(v, num, l):
        l.acquire()  # 锁住
        for _ in range(5):
            # print(v.value, num)
            time.sleep(0.1)
            v.value = v.value + num  # 获取共享内存
            print(v.value)
        l.release()  # 释放
    
    
    def multicore09():
        l = mp.Lock()  # 定义一个进程锁
        v = mp.Value('i', 0)  # 定义共享内存
    
        p1 = mp.Process(target=job09, args=(v, 1, l))  # 需要将lock传入
        p1.start()
        p1.join()
    
        p2 = mp.Process(target=job09, args=(v, 3, l))
        p2.start()
        p2.join()
    
    
    def multicore10():
        l = mp.Lock()  # 定义一个进程锁
        v = mp.Value('i', 0)  # 定义共享内存
        p1 = mp.Process(target=job09, args=(v, 1, l))  # 需要将lock传入
        p2 = mp.Process(target=job09, args=(v, 3, l))
        p1.start()
        p2.start()
        p1.join()
        p2.join()
    
    
    # Press the green button in the gutter to run the script.
    if __name__ == '__main__':
        print_hi('什么是 Multiprocessing')
        my_result_process02()
        time_result03()
        multicore04()
        multicore05()
        multicore06()
        multicore07()
        multicore08()
        multicore09()
        # multicore10()
    
    # See PyCharm help at https://www.jetbrains.com/help/pycharm/
    
    

    复制粘贴并覆盖到你的 main.py 中运行,运行结果如下。

    Hi, 什么是 Multiprocessing
    你好 世界
    249833583000
    249833583000
    499667166000
    normal03: 499999666667166666000000
    normal03 time: 0.7139420509338379
    multithread03: 499999666667166666000000
    multithread03 time: 0.6696178913116455
    multicore03: 499999666667166666000000
    multicore03 time: 0.3917398452758789
    [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    4
    [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    3
    4
    7
    8
    11
    12
    1515
    
    16
    19
    1
    2
    3
    4
    5
    8
    11
    14
    17
    20
    

    八 源码地址

    代码地址:

    国内看 Gitee 之 什么是 Multiprocessing.py

    国外看 GitHub 之 什么是 Multiprocessing.py

    引用 莫烦 Python

    作者:敲代码不忘补水

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python多进程解析:探索Multiprocessing的高效并行处理之道

    发表回复