Python并行计算工具
目录
Python可以利用多种工具进行并行计算。可以使用一个核,也可以使用多个核。本文总结主要并行计算工具,并且详细介绍易用、常用的几种。
在深入阅读之前,可以参考另一篇文章,了解python多进程和多线程的区别:python高级教程之多线程
工具/框架 | 适用场景 | 特点 | 复杂度 | 多线程/多进程 |
---|---|---|---|---|
Joblib | 简单 Python 数据并行 | 高层封装,支持结果缓存 | 简单 | 多进程 |
Subprocess | 调用外部程序或命令行工具 | 底层控制,可运行非 Python 程序 | 中等 | 独立子进程 |
Multiprocessing | 高度自定义并行任务 | 灵活性高,支持共享内存、复杂通信 | 中等 | 多进程 |
ThreadPoolExecutor/ProcessPoolExecutor | 简单任务并行 | 标准库内置,线程池/进程池实现 | 简单 | 多线程/多进程 |
Dask | 科学计算、大规模数据处理 | 自动调度、与 NumPy 等兼容 | 简单-中等 | 多线程/多进程 |
Ray | 大规模并行任务 | 分布式任务支持,Actor 模型 | 中等 | 多线程/多进程 |
Celery | 异步任务队列 | 专注任务队列,需依赖消息中间件 | 中等 | 多进程 |
Numexpr | 数值计算优化 | 面向 NumPy 的数组优化计算 | 简单 | 多线程 |
PySpark | 大数据分布式处理 | 支持 TB 级别大数据 | 高 | 多线程/多进程 |
下面详细记录下常用的三种:Joblib
, Subprocess
和Multiprocessing
Joblib
Joblib 并行计算
Joblib 是一个专注于科学计算和机器学习优化的 Python 库,主要用于以下场景:
- 任务并行化: 利用多进程技术(默认)并行运行任务。
- 结果缓存: 支持中间结果的磁盘缓存,避免重复计算,适合参数调整和实验优化。
- 数组操作优化: 针对 NumPy 和 SciPy 的高效数组操作。
以下是对 Joblib 并行计算的详细介绍:
1. 核心功能
(1) 并行计算
Joblib 提供了高层抽象接口 Parallel
和 delayed
,可以轻松并行化循环任务。例如,针对大数据集或 CPU 密集型任务的操作。
from joblib import Parallel, delayed
# 定义一个耗时函数
def process_data(i):
return i ** 2
# 使用 joblib 并行计算
results = Parallel(n_jobs=4)(delayed(process_data)(i) for i in range(10))
print(results) # 输出 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
参数解释:
n_jobs
: 指定并行任务的线程或进程数。
-1
: 使用所有可用的 CPU 核心。delayed
: 将函数包装成并行化任务。Parallel
: 管理任务的分发和结果的收集。(2) 缓存计算
Joblib 提供了 Memory
类,用于缓存计算结果。特别适用于参数不变的重复实验。
from joblib import Memory
memory = Memory("./cachedir", verbose=0)
@memory.cache
def expensive_function(x):
print(f"Computing {x}...")
return x ** 2
# 第一次计算时会输出 "Computing..."
result = expensive_function(10)
# 第二次调用时直接从缓存加载
result = expensive_function(10)
优点:
Note:
(3)缓存优化
joblib
在读取文件时,并不会直接把文件加载在缓存中,而是保留在磁盘中,可以按需求调取:
import numpy as np
from joblib import dump, load
# 创建一个大数组并保存为内存映射文件
large_array = np.random.rand(10000, 10000) # 100M 数据
dump(large_array, 'large_array.pkl') # 保存为 .pkl 文件
# 以内存映射模式加载大数组
shared_array = load('large_array.pkl', mmap_mode='r')
# 按需访问数组的一部分
# 例如:只访问第100行到第200行,第500列到第600列
subset = shared_array[100:200, 500:600]
print(subset)
2. 工作原理
(1) 并行机制
Joblib 使用 multiprocessing
或 loky
后端实现多进程计算,同时支持线程和子进程:
backend="threading"
指定,适合 I/O 密集型任务。Parallel
中指定 loky
(默认)、multiprocessing
或 threading
。from joblib import Parallel, delayed
# 使用多线程
results = Parallel(n_jobs=4, backend="threading")(
delayed(process_data)(i) for i in range(10)
)
(2) 动态任务分配
Joblib 会根据任务的耗时和复杂度动态分配计算资源。例如,任务较少时可能不使用所有核心。
(3) 内存管理
Joblib 使用内存映射(Memory-Mapped Files)优化大数组的共享。对于大数据集,不需要多次复制,提高性能。
3. 适用场景
(1) 循环并行化
例如,大量文件处理或数据分析任务:
import numpy as np
data = np.random.rand(1000, 1000)
# 并行计算每行的和
row_sums = Parallel(n_jobs=-1)(delayed(np.sum)(row) for row in data)
(2) 参数搜索
在机器学习中,超参数搜索(如网格搜索)是常见任务。Joblib 能显著加速这类工作。
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
# 使用 joblib 加速网格搜索
param_grid = {'n_estimators': [10, 50, 100], 'max_depth': [3, 5, None]}
clf = GridSearchCV(RandomForestClassifier(), param_grid, n_jobs=-1)
clf.fit(X_train, y_train)
(3) 重复计算缓存
对于重复实验的计算结果缓存:
@memory.cache
def compute_features(data):
return some_expensive_function(data)
4. 优缺点
优点:
- 易用性: 高层抽象接口,无需深入了解多进程。
- 跨平台: 支持 Windows、Linux 和 macOS。
- 结果缓存: 可磁盘缓存结果,提升重复任务效率。
- 兼容性: 与 NumPy、SciPy、Pandas 等科学计算库无缝兼容。
缺点:
- 复杂任务限制: Joblib 适合独立任务,并不擅长需要复杂通信的任务。
- 内存占用: 多进程可能导致内存占用较高。
- 错误处理困难: 并行任务中出错可能导致难以调试。
5. 与其他并行工具对比
特性 | Joblib | Multiprocessing | Threading | Subprocess |
---|---|---|---|---|
易用性 | 高 | 中 | 高 | 低 |
适用场景 | 科学计算,独立任务 | 高度定制并行 | I/O 密集任务 | 外部程序调用 |
并行机制 | 多进程/多线程 | 多进程 | 多线程 | 独立进程 |
结果缓存 | 支持 | 不支持 | 不支持 | 不支持 |
性能调优 | 自动优化分配资源 | 完全手动控制 | 完全手动控制 | 完全手动控制 |
Joblib 是一种高效且灵活的并行计算工具,特别适合科学计算任务。如果需要更多控制,可以结合其他并行工具使用。
Subprocess
subprocess
是 Python 中用于启动子进程并与其进行交互的模块。它提供了对操作系统中外部程序执行的强大控制功能,支持启动、输入输出通信以及错误处理等。(官网documentation)
以下是对 subprocess
的详细介绍,包括使用场景、方法及示例。
核心功能
-
启动子进程:
- 启动外部命令或程序(如 shell 命令、脚本、二进制文件)。
- 支持同步和异步执行。
-
管理输入输出:
- 捕获标准输入(stdin)、标准输出(stdout)和标准错误(stderr)。
- 实现与子进程的实时通信。
-
错误处理:
- 处理子进程的错误输出。
- 获取子进程的返回码。
-
并发控制:
- 支持同时启动多个子进程。
关键函数与类
1. subprocess.run
subprocess.CompletedProcess
对象。import subprocess
result = subprocess.run(['ls', '-l'], capture_output=True, text=True)
print("Return code:", result.returncode) # 子进程返回码
print("Output:\n", result.stdout) # 子进程标准输出
print("Error:\n", result.stderr) # 子进程错误输出
2. subprocess.Popen
subprocess.Popen
对象。import subprocess
process = subprocess.Popen(
['ping', '-c', '4', 'google.com'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate() # 与子进程通信并获取输出
print("Output:\n", stdout)
print("Error:\n", stderr)
3. subprocess.check_output
import subprocess
output = subprocess.check_output(['echo', 'Hello World'], text=True)
print("Output:", output)
4. subprocess.call
import subprocess
ret_code = subprocess.call(['ls', '-l'])
print("Return code:", ret_code)
常用参数
args
:要运行的命令(列表或字符串)。shell
:是否通过 shell 执行命令(默认为 False
)。cwd
:设置子进程的当前工作目录。env
:设置子进程的环境变量。capture_output
:是否捕获标准输出和错误输出。text
:是否将输入/输出作为文本处理(默认为字节)。stdout
、stderr
:重定向标准输出或错误输出。stdin
:传递给子进程的输入。应用场景
1. 执行系统命令
subprocess.run(['ls', '-l'])
2. 捕获命令输出
result = subprocess.run(['cat', 'file.txt'], capture_output=True, text=True)
print(result.stdout)
3. 异步处理
process = subprocess.Popen(['ping', '-c', '4', 'google.com'], stdout=subprocess.PIPE)
for line in process.stdout:
print(line.decode(), end='')
4. 传递输入给子进程
process = subprocess.Popen(['grep', 'hello'], stdin=subprocess.PIPE, text=True)
process.communicate(input="hello world\nhi there")
子进程与主进程的交互
等待子进程完成
process.wait()
等待子进程完成。process = subprocess.Popen(['sleep', '5'])
process.wait()
print("Subprocess finished")
检查子进程状态
process.poll()
:检查子进程是否完成(非阻塞)。process = subprocess.Popen(['sleep', '5'])
while process.poll() is None:
print("Subprocess is still running...")
print("Subprocess finished")
错误处理
捕获异常
try-except
捕获运行异常。try:
subprocess.run(['false'], check=True)
except subprocess.CalledProcessError as e:
print("Error occurred:", e)
获取返回码
0
表示成功,非 0
表示失败。result = subprocess.run(['false'])
print("Return code:", result.returncode)
高级用法
1. 使用 shell=True
执行复杂命令
shell=True
。subprocess.run("ls | grep py", shell=True)
注意:不建议在未处理用户输入的情况下使用 shell=True
,可能存在安全风险(例如命令注入)。
2. 并行运行多个子进程
subprocess.Popen
启动多个子进程。processes = [
subprocess.Popen(['ping', '-c', '2', 'google.com']),
subprocess.Popen(['ping', '-c', '2', 'yahoo.com'])
]
for process in processes:
process.wait()
性能优化
避免阻塞:
使用 Popen
进行异步操作,而不是 run
等同步方法。
减少复制:
重定向 stdin/stdout
或使用 os.devnull
忽略输出。
管理资源:
通过 process.terminate()
或 process.kill()
确保清理未完成的子进程。
subprocess
模块是 Python 提供的强大接口,用于与外部程序交互和控制子进程。理解其同步与异步方法、参数以及常见使用场景,可以有效提升任务的执行效率与灵活性。
Multiprocessing
multiprocessing
是 Python 提供的一个用于并行处理的模块,它允许用户轻松地创建多个进程,充分利用多核 CPU 的计算能力。以下是对 multiprocessing
的详细介绍,包括其核心概念、使用场景、常用类与函数、以及注意事项。
1. 为什么使用 multiprocessing
?
线程与进程的区别
threading
模块受制于 全局解释器锁(GIL),导致在多线程的情况下无法充分利用多核 CPU 进行并行计算。multiprocessing
模块通过创建独立的 Python 解释器进程,绕过了 GIL,允许在多核 CPU 上真正实现并行计算。适用场景
multiprocessing
。multiprocessing
也能胜任。2. 核心概念
进程(Process)
每个进程有独立的内存空间和资源,multiprocessing
中的进程类似于操作系统中的进程。
任务分配
multiprocessing
支持将任务分配给多个进程,通常通过以下方式:
3. 常用类与方法
1)Process
类
Process
类用于创建和管理单个进程。
示例
from multiprocessing import Process
import os
def worker(task_name):
print(f"Task {task_name} is running in process {os.getpid()}")
if __name__ == "__main__":
p1 = Process(target=worker, args=("Task1",))
p2 = Process(target=worker, args=("Task2",))
p1.start()
p2.start()
p1.join()
p2.join()
print("All tasks are complete.")
输出
Task Task1 is running in process 12345
Task Task2 is running in process 12346
All tasks are complete.
2)Pool
类
Pool
用于创建一个进程池,允许批量执行任务,自动管理进程。
示例
from multiprocessing import Pool
def square(x):
return x ** 2
if __name__ == "__main__":
with Pool(4) as pool: # 创建一个包含 4 个进程的进程池
results = pool.map(square, range(10))
print(results)
输出
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
3)Queue
和 Pipe
用于在进程之间通信和共享数据。
示例:Queue
from multiprocessing import Process, Queue
def producer(q):
q.put("Hello from producer")
def consumer(q):
print(q.get())
if __name__ == "__main__":
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
输出
Hello from producer
示例:Pipe
from multiprocessing import Process, Pipe
def worker(conn):
conn.send("Hello from child process")
conn.close()
if __name__ == "__main__":
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()
输出
Hello from child process
4)Lock
和 Manager
用于管理进程间共享数据和避免数据竞争。
示例:Manager
from multiprocessing import Process, Manager
def worker(shared_list):
shared_list.append("Hello")
if __name__ == "__main__":
with Manager() as manager:
shared_list = manager.list() # 共享列表
processes = [Process(target=worker, args=(shared_list,)) for _ in range(5)]
for p in processes:
p.start()
for p in processes:
p.join()
print(shared_list)
输出
['Hello', 'Hello', 'Hello', 'Hello', 'Hello']
4. 优势与限制
优势
- 突破 GIL: 真正实现多核并行。
- 简单易用: 提供高层接口,抽象复杂的底层实现。
- 灵活: 支持进程间通信、同步、共享数据等。
限制
- 进程开销: 进程的创建和销毁比线程更昂贵。
- 数据复制: 在进程间传递数据需要序列化(如使用
pickle
),数据量大时会有性能开销。 - 适用场景: 更适合 CPU 密集型任务,对于 I/O 密集型任务,
threading
更高效。
5. 实际使用建议
- 使用
Process
类: - 如果任务需要手动管理多个进程,可以使用
Process
。 - 使用
Pool
类: - 当任务数量较多且结构化时,使用
Pool
批量管理进程。 - 使用进程间通信:
- 如果需要共享数据或进行复杂的同步,使用
Queue
或Manager
。
6. 多进程调试
由于每个进程都有独立的运行环境,多进程调试可能较困难。
建议:
- 使用
print
或日志记录。 - 在单进程中测试代码逻辑后,再扩展到多进程。
- 尽量避免在子进程中修改全局变量。
multiprocessing
是 Python 中实现并行计算的强大工具,特别适合在多核 CPU 上执行高性能的计算密集型任务。合理使用可以显著提升程序的性能,但需要注意进程间通信和资源管理的开销。
作者:CL.LIANG