目录

  • Joblib
  • Joblib 并行计算
  • 1. 核心功能
  • (1) 并行计算
  • (2) 缓存计算
  • (3)缓存优化
  • 2. 工作原理
  • (1) 并行机制
  • (2) 动态任务分配
  • (3) 内存管理
  • 3. 适用场景
  • (1) 循环并行化
  • (2) 参数搜索
  • (3) 重复计算缓存
  • 4. 优缺点
  • 优点:
  • 缺点:
  • 5. 与其他并行工具对比
  • Subprocess
  • 核心功能
  • 关键函数与类
  • 1. `subprocess.run`
  • 2. `subprocess.Popen`
  • 3. `subprocess.check_output`
  • 4. `subprocess.call`
  • 常用参数
  • 应用场景
  • 1. 执行系统命令
  • 2. 捕获命令输出
  • 3. 异步处理
  • 4. 传递输入给子进程
  • 子进程与主进程的交互
  • 等待子进程完成
  • 检查子进程状态
  • 错误处理
  • 捕获异常
  • 获取返回码
  • 高级用法
  • 1. 使用 `shell=True` 执行复杂命令
  • 2. 并行运行多个子进程
  • 性能优化
  • Multiprocessing
  • 1. 为什么使用 `multiprocessing`?
  • 线程与进程的区别
  • 适用场景
  • 2. 核心概念
  • 进程(Process)
  • 任务分配
  • 3. 常用类与方法
  • 1)`Process` 类
  • 示例
  • 2)`Pool` 类
  • 示例
  • 3)`Queue` 和 `Pipe`
  • 示例:`Queue`
  • 示例:`Pipe`
  • **4)`Lock` 和 `Manager`**
  • 示例:`Manager`
  • 4. 优势与限制
  • 优势
  • 限制
  • 5. 实际使用建议
  • 6. 多进程调试
  • 建议:
  • Python可以利用多种工具进行并行计算。可以使用一个核,也可以使用多个核。本文总结主要并行计算工具,并且详细介绍易用、常用的几种。

    在深入阅读之前,可以参考另一篇文章,了解python多进程和多线程的区别:python高级教程之多线程

    工具/框架 适用场景 特点 复杂度 多线程/多进程
    Joblib 简单 Python 数据并行 高层封装,支持结果缓存 简单 多进程
    Subprocess 调用外部程序或命令行工具 底层控制,可运行非 Python 程序 中等 独立子进程
    Multiprocessing 高度自定义并行任务 灵活性高,支持共享内存、复杂通信 中等 多进程
    ThreadPoolExecutor/ProcessPoolExecutor 简单任务并行 标准库内置,线程池/进程池实现 简单 多线程/多进程
    Dask 科学计算、大规模数据处理 自动调度、与 NumPy 等兼容 简单-中等 多线程/多进程
    Ray 大规模并行任务 分布式任务支持,Actor 模型 中等 多线程/多进程
    Celery 异步任务队列 专注任务队列,需依赖消息中间件 中等 多进程
    Numexpr 数值计算优化 面向 NumPy 的数组优化计算 简单 多线程
    PySpark 大数据分布式处理 支持 TB 级别大数据 多线程/多进程

    下面详细记录下常用的三种:Joblib, SubprocessMultiprocessing

    Joblib

    Joblib 并行计算

    Joblib 是一个专注于科学计算和机器学习优化的 Python 库,主要用于以下场景:

    1. 任务并行化: 利用多进程技术(默认)并行运行任务。
    2. 结果缓存: 支持中间结果的磁盘缓存,避免重复计算,适合参数调整和实验优化。
    3. 数组操作优化: 针对 NumPy 和 SciPy 的高效数组操作。

    以下是对 Joblib 并行计算的详细介绍:


    1. 核心功能

    (1) 并行计算

    Joblib 提供了高层抽象接口 Paralleldelayed,可以轻松并行化循环任务。例如,针对大数据集或 CPU 密集型任务的操作。

    from joblib import Parallel, delayed
    
    # 定义一个耗时函数
    def process_data(i):
        return i ** 2
    
    # 使用 joblib 并行计算
    results = Parallel(n_jobs=4)(delayed(process_data)(i) for i in range(10))
    print(results)  # 输出 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    

    参数解释:

  • n_jobs: 指定并行任务的线程或进程数。
  • -1: 使用所有可用的 CPU 核心。
  • 正整数:具体核心数。
  • delayed: 将函数包装成并行化任务。
  • Parallel: 管理任务的分发和结果的收集。
  • (2) 缓存计算

    Joblib 提供了 Memory 类,用于缓存计算结果。特别适用于参数不变的重复实验。

    from joblib import Memory
    
    memory = Memory("./cachedir", verbose=0)
    
    @memory.cache
    def expensive_function(x):
        print(f"Computing {x}...")
        return x ** 2
    
    # 第一次计算时会输出 "Computing..."
    result = expensive_function(10)
    
    # 第二次调用时直接从缓存加载
    result = expensive_function(10)
    

    优点:

  • 缓存结果保存在磁盘,程序重新启动也可加载。
  • 避免重复计算,提高效率。
  • Note:

  • 缓存的是结果,而不是函数;自动加载时,会检测参数是否一致,如果一致,则直接输出结果,如果不一致,则调用函数进行重新计算。
  • (3)缓存优化

    joblib在读取文件时,并不会直接把文件加载在缓存中,而是保留在磁盘中,可以按需求调取:

    import numpy as np
    from joblib import dump, load
    
    # 创建一个大数组并保存为内存映射文件
    large_array = np.random.rand(10000, 10000)  # 100M 数据
    dump(large_array, 'large_array.pkl')  # 保存为 .pkl 文件
    
    # 以内存映射模式加载大数组
    shared_array = load('large_array.pkl', mmap_mode='r')
    
    # 按需访问数组的一部分
    # 例如:只访问第100行到第200行,第500列到第600列
    subset = shared_array[100:200, 500:600]
    print(subset)
    
    

    2. 工作原理

    (1) 并行机制

    Joblib 使用 multiprocessingloky 后端实现多进程计算,同时支持线程和子进程:

  • 多进程(默认): 使用多个独立的进程并行计算,适合 CPU 密集型任务。
  • 多线程: 通过参数 backend="threading" 指定,适合 I/O 密集型任务。
  • 后端切换: 可在 Parallel 中指定 loky(默认)、multiprocessingthreading
  • from joblib import Parallel, delayed
    
    # 使用多线程
    results = Parallel(n_jobs=4, backend="threading")(
        delayed(process_data)(i) for i in range(10)
    )
    
    (2) 动态任务分配

    Joblib 会根据任务的耗时和复杂度动态分配计算资源。例如,任务较少时可能不使用所有核心。

    (3) 内存管理

    Joblib 使用内存映射(Memory-Mapped Files)优化大数组的共享。对于大数据集,不需要多次复制,提高性能。


    3. 适用场景

    (1) 循环并行化

    例如,大量文件处理或数据分析任务:

    import numpy as np
    
    data = np.random.rand(1000, 1000)
    
    # 并行计算每行的和
    row_sums = Parallel(n_jobs=-1)(delayed(np.sum)(row) for row in data)
    
    (2) 参数搜索

    在机器学习中,超参数搜索(如网格搜索)是常见任务。Joblib 能显著加速这类工作。

    from sklearn.model_selection import GridSearchCV
    from sklearn.ensemble import RandomForestClassifier
    
    # 使用 joblib 加速网格搜索
    param_grid = {'n_estimators': [10, 50, 100], 'max_depth': [3, 5, None]}
    clf = GridSearchCV(RandomForestClassifier(), param_grid, n_jobs=-1)
    clf.fit(X_train, y_train)
    
    (3) 重复计算缓存

    对于重复实验的计算结果缓存:

    @memory.cache
    def compute_features(data):
        return some_expensive_function(data)
    

    4. 优缺点

    优点:
    1. 易用性: 高层抽象接口,无需深入了解多进程。
    2. 跨平台: 支持 Windows、Linux 和 macOS。
    3. 结果缓存: 可磁盘缓存结果,提升重复任务效率。
    4. 兼容性: 与 NumPy、SciPy、Pandas 等科学计算库无缝兼容。
    缺点:
    1. 复杂任务限制: Joblib 适合独立任务,并不擅长需要复杂通信的任务。
    2. 内存占用: 多进程可能导致内存占用较高。
    3. 错误处理困难: 并行任务中出错可能导致难以调试。

    5. 与其他并行工具对比

    特性 Joblib Multiprocessing Threading Subprocess
    易用性
    适用场景 科学计算,独立任务 高度定制并行 I/O 密集任务 外部程序调用
    并行机制 多进程/多线程 多进程 多线程 独立进程
    结果缓存 支持 不支持 不支持 不支持
    性能调优 自动优化分配资源 完全手动控制 完全手动控制 完全手动控制

    Joblib 是一种高效且灵活的并行计算工具,特别适合科学计算任务。如果需要更多控制,可以结合其他并行工具使用。

    Subprocess

    subprocess 是 Python 中用于启动子进程并与其进行交互的模块。它提供了对操作系统中外部程序执行的强大控制功能,支持启动、输入输出通信以及错误处理等。(官网documentation)

    以下是对 subprocess 的详细介绍,包括使用场景、方法及示例。


    核心功能

    1. 启动子进程

    2. 启动外部命令或程序(如 shell 命令、脚本、二进制文件)。
    3. 支持同步和异步执行。
    4. 管理输入输出

    5. 捕获标准输入(stdin)、标准输出(stdout)和标准错误(stderr)。
    6. 实现与子进程的实时通信。
    7. 错误处理

    8. 处理子进程的错误输出。
    9. 获取子进程的返回码。
    10. 并发控制

    11. 支持同时启动多个子进程。

    关键函数与类

    1. subprocess.run

  • 功能:运行命令并等待其完成(同步)。
  • 返回值subprocess.CompletedProcess 对象。
  • 示例
  • import subprocess
    
    result = subprocess.run(['ls', '-l'], capture_output=True, text=True)
    print("Return code:", result.returncode)  # 子进程返回码
    print("Output:\n", result.stdout)        # 子进程标准输出
    print("Error:\n", result.stderr)         # 子进程错误输出
    

    2. subprocess.Popen

  • 功能:更灵活,允许启动命令并实时与其交互(异步)。
  • 返回值subprocess.Popen 对象。
  • 示例
  • import subprocess
    
    process = subprocess.Popen(
        ['ping', '-c', '4', 'google.com'],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True
    )
    stdout, stderr = process.communicate()  # 与子进程通信并获取输出
    print("Output:\n", stdout)
    print("Error:\n", stderr)
    

    3. subprocess.check_output

  • 功能:运行命令并直接返回标准输出内容(出错时抛异常)。
  • 返回值:标准输出。
  • 示例
  • import subprocess
    
    output = subprocess.check_output(['echo', 'Hello World'], text=True)
    print("Output:", output)
    

    4. subprocess.call

  • 功能:运行命令并返回其退出状态码。
  • 返回值:整数,表示子进程的退出状态。
  • 示例
  • import subprocess
    
    ret_code = subprocess.call(['ls', '-l'])
    print("Return code:", ret_code)
    

    常用参数

  • args:要运行的命令(列表或字符串)。
  • shell:是否通过 shell 执行命令(默认为 False)。
  • cwd:设置子进程的当前工作目录。
  • env:设置子进程的环境变量。
  • capture_output:是否捕获标准输出和错误输出。
  • text:是否将输入/输出作为文本处理(默认为字节)。
  • stdoutstderr:重定向标准输出或错误输出。
  • stdin:传递给子进程的输入。

  • 应用场景

    1. 执行系统命令

    subprocess.run(['ls', '-l'])
    

    2. 捕获命令输出

    result = subprocess.run(['cat', 'file.txt'], capture_output=True, text=True)
    print(result.stdout)
    

    3. 异步处理

    process = subprocess.Popen(['ping', '-c', '4', 'google.com'], stdout=subprocess.PIPE)
    for line in process.stdout:
        print(line.decode(), end='')
    

    4. 传递输入给子进程

    process = subprocess.Popen(['grep', 'hello'], stdin=subprocess.PIPE, text=True)
    process.communicate(input="hello world\nhi there")
    

    子进程与主进程的交互

    等待子进程完成

  • 使用 process.wait() 等待子进程完成。
  • process = subprocess.Popen(['sleep', '5'])
    process.wait()
    print("Subprocess finished")
    

    检查子进程状态

  • process.poll():检查子进程是否完成(非阻塞)。
  • process = subprocess.Popen(['sleep', '5'])
    while process.poll() is None:
        print("Subprocess is still running...")
    print("Subprocess finished")
    

    错误处理

    捕获异常

  • 使用 try-except 捕获运行异常。
  • try:
        subprocess.run(['false'], check=True)
    except subprocess.CalledProcessError as e:
        print("Error occurred:", e)
    

    获取返回码

  • 返回码通常为 0 表示成功,非 0 表示失败。
  • result = subprocess.run(['false'])
    print("Return code:", result.returncode)
    

    高级用法

    1. 使用 shell=True 执行复杂命令

  • 当需要执行管道或通配符等 shell 特性时,可以使用 shell=True
  • subprocess.run("ls | grep py", shell=True)
    

    注意:不建议在未处理用户输入的情况下使用 shell=True,可能存在安全风险(例如命令注入)。

    2. 并行运行多个子进程

  • 使用 subprocess.Popen 启动多个子进程。
  • processes = [
        subprocess.Popen(['ping', '-c', '2', 'google.com']),
        subprocess.Popen(['ping', '-c', '2', 'yahoo.com'])
    ]
    for process in processes:
        process.wait()
    

    性能优化

  • 避免阻塞
    使用 Popen 进行异步操作,而不是 run 等同步方法。

  • 减少复制
    重定向 stdin/stdout 或使用 os.devnull 忽略输出。

  • 管理资源
    通过 process.terminate()process.kill() 确保清理未完成的子进程。


  • subprocess 模块是 Python 提供的强大接口,用于与外部程序交互和控制子进程。理解其同步与异步方法、参数以及常见使用场景,可以有效提升任务的执行效率与灵活性。

    Multiprocessing

    multiprocessing 是 Python 提供的一个用于并行处理的模块,它允许用户轻松地创建多个进程,充分利用多核 CPU 的计算能力。以下是对 multiprocessing 的详细介绍,包括其核心概念、使用场景、常用类与函数、以及注意事项。


    1. 为什么使用 multiprocessing

    线程与进程的区别

  • 线程(thread): Python 的 threading 模块受制于 全局解释器锁(GIL),导致在多线程的情况下无法充分利用多核 CPU 进行并行计算。
  • 进程(process): multiprocessing 模块通过创建独立的 Python 解释器进程,绕过了 GIL,允许在多核 CPU 上真正实现并行计算。
  • 适用场景

  • CPU 密集型任务(如图像处理、矩阵运算):适合用 multiprocessing
  • I/O 密集型任务(如文件读写、网络请求):多线程可能更高效,但 multiprocessing 也能胜任。

  • 2. 核心概念

    进程(Process)

    每个进程有独立的内存空间和资源,multiprocessing 中的进程类似于操作系统中的进程。

    任务分配

    multiprocessing 支持将任务分配给多个进程,通常通过以下方式:

  • 进程池(Pool): 管理一组进程,自动分配任务。
  • 进程间通信(IPC): 使用队列、管道等方式在进程之间共享数据。

  • 3. 常用类与方法

    1)Process

    Process 类用于创建和管理单个进程。

    示例
    from multiprocessing import Process
    import os
    
    def worker(task_name):
        print(f"Task {task_name} is running in process {os.getpid()}")
    
    if __name__ == "__main__":
        p1 = Process(target=worker, args=("Task1",))
        p2 = Process(target=worker, args=("Task2",))
    
        p1.start()
        p2.start()
    
        p1.join()
        p2.join()
        print("All tasks are complete.")
    

    输出

    Task Task1 is running in process 12345
    Task Task2 is running in process 12346
    All tasks are complete.
    

    2)Pool

    Pool 用于创建一个进程池,允许批量执行任务,自动管理进程。

    示例
    from multiprocessing import Pool
    
    def square(x):
        return x ** 2
    
    if __name__ == "__main__":
        with Pool(4) as pool:  # 创建一个包含 4 个进程的进程池
            results = pool.map(square, range(10))
        print(results)
    

    输出

    [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    

    3)QueuePipe

    用于在进程之间通信和共享数据。

    示例:Queue
    from multiprocessing import Process, Queue
    
    def producer(q):
        q.put("Hello from producer")
    
    def consumer(q):
        print(q.get())
    
    if __name__ == "__main__":
        q = Queue()
        p1 = Process(target=producer, args=(q,))
        p2 = Process(target=consumer, args=(q,))
    
        p1.start()
        p2.start()
    
        p1.join()
        p2.join()
    

    输出

    Hello from producer
    
    示例:Pipe
    from multiprocessing import Process, Pipe
    
    def worker(conn):
        conn.send("Hello from child process")
        conn.close()
    
    if __name__ == "__main__":
        parent_conn, child_conn = Pipe()
        p = Process(target=worker, args=(child_conn,))
        p.start()
        print(parent_conn.recv())
        p.join()
    

    输出

    Hello from child process
    

    4)LockManager

    用于管理进程间共享数据和避免数据竞争。

    示例:Manager
    from multiprocessing import Process, Manager
    
    def worker(shared_list):
        shared_list.append("Hello")
    
    if __name__ == "__main__":
        with Manager() as manager:
            shared_list = manager.list()  # 共享列表
            processes = [Process(target=worker, args=(shared_list,)) for _ in range(5)]
    
            for p in processes:
                p.start()
            for p in processes:
                p.join()
    
            print(shared_list)
    

    输出

    ['Hello', 'Hello', 'Hello', 'Hello', 'Hello']
    

    4. 优势与限制

    优势

    1. 突破 GIL: 真正实现多核并行。
    2. 简单易用: 提供高层接口,抽象复杂的底层实现。
    3. 灵活: 支持进程间通信、同步、共享数据等。

    限制

    1. 进程开销: 进程的创建和销毁比线程更昂贵。
    2. 数据复制: 在进程间传递数据需要序列化(如使用 pickle),数据量大时会有性能开销。
    3. 适用场景: 更适合 CPU 密集型任务,对于 I/O 密集型任务,threading 更高效。

    5. 实际使用建议

    1. 使用 Process
    2. 如果任务需要手动管理多个进程,可以使用 Process
    3. 使用 Pool
    4. 当任务数量较多且结构化时,使用 Pool 批量管理进程。
    5. 使用进程间通信
    6. 如果需要共享数据或进行复杂的同步,使用 QueueManager

    6. 多进程调试

    由于每个进程都有独立的运行环境,多进程调试可能较困难。

    建议:

    1. 使用 print 或日志记录。
    2. 在单进程中测试代码逻辑后,再扩展到多进程。
    3. 尽量避免在子进程中修改全局变量。

    multiprocessing 是 Python 中实现并行计算的强大工具,特别适合在多核 CPU 上执行高性能的计算密集型任务。合理使用可以显著提升程序的性能,但需要注意进程间通信和资源管理的开销。

    作者:CL.LIANG

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python并行计算工具

    发表回复