Python多进程深度解析:代码实战助你轻松应对面试官挑战
本篇文章结合大量可运行代码详细且简洁地介绍了 Python 当中使用多进程的常用方法、进程间的通信方式及每种方式的适用场景、进程池的使用方法。最后,通过多进程实现了统计大文件行数的 demo 程序。
本篇文章内容的思维导图如下:
文章目录
1. 进程的基本概念
进程(Process)是操作系统资源分配的基本单位,是程序的一次执行过程。每个进程都有自己独立的内存空间和系统资源,进程之间相互隔离,一个进程的崩溃不会影响其他进程。
与线程相比,进程具有以下特点:
在Python中,由于全局解释器锁(GIL)的存在,多线程不适合 CPU 密集型任务,而多进程可以绕过 GIL 限制,真正实现并行计算。
什么是全局解释器锁?打算另写一遍文章介绍。
2. Python 进程的使用方法
Python 可以使用 multiprocessing.Process
创建进程。
2.1 进程创建方式:Process类详解
2.1.1 Process 创建进程的签名
multiprocessing.Process
函数签名:
multiprocessing.Process(
group=None,
target=None,
name=None,
args=(),
kwargs={},
*,
daemon=None
)
参数详解:
-
group
始终为None
,保留给未来扩展使用 -
target
调用对象(函数),表示子进程要执行的任务
示例:target=worker_function
-
name
进程名称(字符串),默认自动生成如"Process-1"
示例:name="DataProcessing"
-
args
传递给target函数的位置参数元组
注意:单个参数需写成(value,)
形式
示例:args=("param1", 123)
-
kwargs
传递给target函数的关键字参数字典
示例:kwargs={'param': 'value'}
-
daemon
守护进程标志(布尔值): True
:父进程终止时自动终止子进程False
(默认):子进程会独立运行
必须在start()
前设置
2.1.2 Process类的常用方法
常用方法:
start()
: 启动进程,调用run()方法run()
: 进程启动时运行的方法(可重写)join([timeout])
: 阻塞主进程直到子进程结束terminate()
: 强制终止进程is_alive()
: 检查进程是否在运行常用属性:
pid
: 进程IDname
: 进程名称daemon
: 是否为守护进程(需在start()前设置)exitcode
: 进程退出代码(运行时为None)2.2 代码示例
2.2.1 创建单个进程
from multiprocessing import Process
def worker(name, count):
print(f"{name} processing {count} items")
if __name__ == '__main__':
p = Process(
target=worker,
name="WorkerProcess",
args=("Child",),
kwargs={'count': 100},
daemon=False
)
p.start()
p.join()
执行结果:
Child processing 100 items
2.2.2 创建多个进程,使用列表存放进程
from multiprocessing import Process
import os
def worker(name):
print(f"子进程 {name} PID: {os.getpid()}")
result = sum([i*i for i in range(1000000)]) # 模拟计算密集型任务
print(f"子进程 {name} 计算结果: {result}")
def main():
print(f"主进程 PID: {os.getpid()}")
processes = []
for i in range(4): # 创建4个子进程
p = Process(target=worker, name=f"Process{i}", args=(f"Process{i}",))
processes.append(p)
p.start()
print(f"子进程 {p.name} 是否还活着?{p.is_alive()}")
for p in processes:
p.join() # 等待所有子进程完成
print("所有子进程已完成")
if __name__ == "__main__":
main()
执行结果:
主进程 PID: 42912
子进程 Process0 是否还活着?True
子进程 Process1 是否还活着?True
子进程 Process2 是否还活着?True
子进程 Process3 是否还活着?True
子进程 Process0 PID: 180
子进程 Process2 PID: 37640
子进程 Process0 计算结果: 333332833333500000
子进程 Process1 PID: 33416
子进程 Process2 计算结果: 333332833333500000
子进程 Process3 PID: 40660
子进程 Process1 计算结果: 333332833333500000
子进程 Process3 计算结果: 333332833333500000
所有子进程已完成
2.2.3 继承 Process 类定制化进程的创建过程
当进程需要维护自身状态或实现复杂控制逻辑、需要重写 run()
、terminate()
等方法时,通常需要定制化进程的创建、执行逻辑。这时,可以通过继承 Process
类实现。
下面我们继承 Process
类实现模拟下载文件的功能:
from multiprocessing import Process
import time
class DownloadProcess(Process):
def __init__(self, filename):
super().__init__()
self.filename = filename
def run(self):
print(f"开始下载 {self.filename}")
time.sleep(0.5) # 模拟下载耗时
print(f"{self.filename} 下载完成")
if __name__ == '__main__':
files = ["file1.zip", "file2.mp4", "file3.pdf"]
processes = []
for f in files:
p = DownloadProcess(f)
processes.append(p)
p.start()
for p in processes:
p.join()
print("所有文件下载完成")
输出结果:
开始下载 file1.zip
开始下载 file2.mp4
开始下载 file3.pdf
file1.zip 下载完成
file2.mp4 下载完成
file3.pdf 下载完成
所有文件下载完成
3. 进程间通信方式(含代码示例)
在操作系统的概念中,实现进程间通信方式有几种:
为了减少文章篇幅,我们只讲解 Python 当中前三种进程间通信方式。
3.1 队列
Python 当中 Queue
数据结构是多进程/线程安全的,不需要使用额外的同步原语。因为 Queue
内置了锁机制实现同步:
- 互斥锁:对
put()
和get()
操作自动加锁,防止多进程同时修改队列导致数据竞争 - 信号量:控制队列容量,当队列满时
put()
自动阻塞,空时get()
自动阻塞 - 原子操作:队列的
qsize()
、empty()
等方法也是线程/进程安全的。
因为队列是先进先出,因此使用场景是:
# queue_process.py 文件
from multiprocessing import Process, Queue
def producer(q):
for i in range(5):
q.put(f"产品{i}")
print(f"生产产品{i}")
def consumer(q):
while True:
item = q.get()
if item is None: break
print(f"消费{item}")
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
q.put(None) # 结束信号
p2.join()
运行结果:
生产产品0
生产产品1
生产产品2
生产产品3
生产产品4
消费产品0
消费产品1
消费产品2
消费产品3
消费产品4
3.2 管道
管道的特点:
- 双向通信:
multiprocessing.Pipe()
返回两个连接对象,支持双向数据流 - 高效传输:比队列更轻量级,适合少量数据快速传递
- 半双工/全双工:匿名管道通常是半双工,命名管道支持全双工
- 直接连接:建立两个进程间的直接通道,无需中间存储
使用场景:
# pipe_process.py
from multiprocessing import Process, Pipe
import time
import random
def sender(conn):
"""发送者进程:通过管道发送数据"""
for i in range(5):
data = f"消息 {i}"
print(f"发送者进程: 发送 {data}")
conn.send(data)
time.sleep(random.random()) # 模拟处理时间
# 接收响应
response = conn.recv()
print(f"发送者进程: 收到响应 {response}")
# 发送结束信号
conn.send("END")
conn.close()
def receiver(conn):
"""接收者进程:通过管道接收数据并发送响应"""
while True:
data = conn.recv()
if data == "END":
break
print(f"接收者进程: 收到 {data}")
# 发送响应
response = f"已处理 {data}"
conn.send(response)
time.sleep(random.random()) # 模拟处理时间
conn.close()
if __name__ == "__main__":
# 创建管道
parent_conn, child_conn = Pipe()
# 创建进程
sender_process = Process(target=sender, args=(parent_conn,))
receiver_process = Process(target=receiver, args=(child_conn,))
# 启动进程
sender_process.start()
receiver_process.start()
# 等待进程结束
sender_process.join()
receiver_process.join()
print("所有进程已完成")
执行结果:
发送者进程: 发送 消息 0
接收者进程: 收到 消息 0
发送者进程: 收到响应 已处理 消息 0
发送者进程: 发送 消息 1
接收者进程: 收到 消息 1
发送者进程: 收到响应 已处理 消息 1
发送者进程: 发送 消息 2
接收者进程: 收到 消息 2
发送者进程: 收到响应 已处理 消息 2
发送者进程: 发送 消息 3
接收者进程: 收到 消息 3
发送者进程: 收到响应 已处理 消息 3
发送者进程: 发送 消息 4
接收者进程: 收到 消息 4
发送者进程: 收到响应 已处理 消息 4
所有进程已完成
3.3 共享内存
特点:
- 最高效IPC:直接访问同一内存区域,避免数据拷贝
- 零拷贝:相比管道/队列的4次拷贝,共享内存只需2次
- 需手动同步:需要配合锁/信号量等机制保证数据一致性
- 复杂数据结构支持:可通过
Value/Array
或shared_memory
模块实现
使用场景:
- 大规模数据交换(如图像/视频处理)
- 高性能计算场景
- 实时数据处理系统
- 需要频繁读写的大型数据结构共享
from multiprocessing import Process, Value, Array, Lock
import time
import random
def writer(shared_value, shared_array, lock):
"""写入进程:修改共享内存中的数据"""
for i in range(5):
# 使用锁保护共享内存的访问
with lock:
# 修改共享值
shared_value.value += 1
print(f"写入进程: 修改共享值为 {shared_value.value}")
# 修改共享数组
for j in range(len(shared_array)):
shared_array[j] = shared_value.value * (j + 1)
print(f"写入进程: 修改共享数组为 {list(shared_array)}")
time.sleep(random.random()) # 模拟处理时间
def reader(shared_value, shared_array, lock):
"""读取进程:读取共享内存中的数据"""
for i in range(5):
# 使用锁保护共享内存的访问
with lock:
print(f"读取进程: 当前共享值为 {shared_value.value}")
print(f"读取进程: 当前共享数组为 {list(shared_array)}")
time.sleep(random.random()) # 模拟处理时间
if __name__ == "__main__":
# 创建共享内存
# Value('i', 0) 创建一个整型共享值,初始值为0
# Array('i', 5) 创建一个长度为5的整型共享数组
shared_value = Value('i', 0)
shared_array = Array('i', 5)
# 创建锁用于同步
lock = Lock()
# 创建进程
writer_process = Process(target=writer, args=(shared_value, shared_array, lock))
reader_process = Process(target=reader, args=(shared_value, shared_array, lock))
# 启动进程
writer_process.start()
reader_process.start()
# 等待进程结束
writer_process.join()
reader_process.join()
print("所有进程已完成")
执行结果:
写入进程: 修改共享值为 1
写入进程: 修改共享数组为 [1, 2, 3, 4, 5]
读取进程: 当前共享值为 1
读取进程: 当前共享数组为 [1, 2, 3, 4, 5]
写入进程: 修改共享值为 2
写入进程: 修改共享数组为 [2, 4, 6, 8, 10]
读取进程: 当前共享值为 2
读取进程: 当前共享数组为 [2, 4, 6, 8, 10]
写入进程: 修改共享值为 3
写入进程: 修改共享数组为 [3, 6, 9, 12, 15]
读取进程: 当前共享值为 3
读取进程: 当前共享数组为 [3, 6, 9, 12, 15]
写入进程: 修改共享值为 4
写入进程: 修改共享数组为 [4, 8, 12, 16, 20]
读取进程: 当前共享值为 4
读取进程: 当前共享数组为 [4, 8, 12, 16, 20]
写入进程: 修改共享值为 5
写入进程: 修改共享数组为 [5, 10, 15, 20, 25]
读取进程: 当前共享值为 5
读取进程: 当前共享数组为 [5, 10, 15, 20, 25]
所有进程已完成
multiprocessing
当中的 Value
和 Array
用来共享简单的数据,两者支持的数据类型为整型和 double 类型。如果想要共享其它数据类型,可以使用 multiprocessing.sharedctypes
。
下面的代码展示了如何在多进程环境中安全地共享和修改各种类型的数据,包括基本类型、字符串和自定义结构体。
from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double
class Point(Structure):
_fields_ = [('x', c_double), ('y', c_double)]
def modify(n, x, s, A):
n.value **= 2
x.value **= 2
s.value = s.value.upper()
for a in A:
a.x **= 2
a.y **= 2
if __name__ == '__main__':
lock = Lock()
n = Value('i', 7)
x = Value(c_double, 1.0/3.0, lock=False)
s = Array('c', b'hello world', lock=lock)
A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)
p = Process(target=modify, args=(n, x, s, A))
p.start()
p.join()
print(n.value)
print(x.value)
print(s.value)
print([(a.x, a.y) for a in A])
执行结果:
4. 进程池
4.1 进程池基本概念
进程池用来管理已创建的进程,通过提前创建并复用一组固定数量的子进程来执行任务,从而提高程序的并发量。其核心目标是减少频繁创建/销毁进程的开销,并控制并发规模以优化系统资源利用率。
进程池的工作机制:
- 任务调度机制:采用生产者-消费者模式,主进程提交任务到任务队列,空闲子进程自动获取任务执行,结果返回至结果队列。
- 进程复用:子进程执行完任务后不会销毁,而是等待新任务,避免重复创建的开销
使用进程池的优点:
1. 性能提升
2. 资源管理
3. 开发便捷性
4.2 创建和使用方法
通过 multiprocessing.Pool
创建进程池。使用 with
语句管理线程池,确保自动调用 close()
和 join()
函数。
from multiprocessing import Pool
# 创建含4个进程的池(默认数量为CPU核心数)
with Pool(processes=4) as pool:
# 执行任务...
有 4 种常见的任务提交方式:
代码示例:
from multiprocessing import Pool
import os
import time
def square(x):
print(f"Process {os.getpid()} 计算 {x} 的平方")
time.sleep(1) # 模拟计算
return x * x
if __name__ == "__main__":
# 创建进程池,设置最大进程数为4
with Pool(processes=4) as pool:
# 1. map方法 - 同步阻塞方式
print("\n1. 使用map方法(同步阻塞):")
start_time = time.time()
results_map = pool.map(square, range(10))
end_time = time.time()
print(f"map结果: {results_map}")
print(f"map执行时间: {end_time - start_time:.2f}秒")
# 2. map_async方法 - 异步非阻塞方式
print("\n2. 使用map_async方法(异步非阻塞):")
start_time = time.time()
result_map_async = pool.map_async(square, range(10))
# 等待所有任务完成
result_map_async.wait()
end_time = time.time()
print(f"map_async结果: {result_map_async.get()}")
print(f"map_async执行时间: {end_time - start_time:.2f}秒")
# 3. apply方法 - 同步阻塞方式,一次提交一个任务
print("\n3. 使用apply方法(同步阻塞,单任务):")
start_time = time.time()
results_apply = [pool.apply(square, args=(x,)) for x in range(10)]
end_time = time.time()
print(f"apply结果: {results_apply}")
print(f"apply执行时间: {end_time - start_time:.2f}秒")
# 4. apply_async方法 - 异步非阻塞方式,一次提交一个任务
print("\n4. 使用apply_async方法(异步非阻塞,单任务):")
start_time = time.time()
results_apply_async = [pool.apply_async(square, args=(x,)) for x in range(10)]
# 获取所有结果
results_apply_async = [r.get() for r in results_apply_async]
end_time = time.time()
print(f"apply_async结果: {results_apply_async}")
print(f"apply_async执行时间: {end_time - start_time:.2f}秒")
print("\n四种方法的区别说明:")
print("1. map: 同步阻塞,等待所有任务完成才返回结果")
print("2. map_async: 异步非阻塞,立即返回AsyncResult对象,可以后续获取结果")
print("3. apply: 同步阻塞,一次只能提交一个任务")
print("4. apply_async: 异步非阻塞,一次提交一个任务,立即返回AsyncResult对象")
执行结果:
1. 使用map方法(同步阻塞):
Process 43784 计算 0 的平方
Process 45232 计算 1 的平方
Process 29036 计算 2 的平方
map结果: [0, 1, 4]
map执行时间: 1.16秒
2. 使用map_async方法(异步非阻塞):
Process 17284 计算 0 的平方
Process 43784 计算 1 的平方
Process 45232 计算 2 的平方
map_async结果: [0, 1, 4]
map_async执行时间: 1.00秒
3. 使用apply方法(同步阻塞,单任务):
Process 29036 计算 0 的平方
Process 17284 计算 1 的平方
Process 43784 计算 2 的平方
apply结果: [0, 1, 4]
apply执行时间: 3.01秒
4. 使用apply_async方法(异步非阻塞,单任务):
Process 45232 计算 0 的平方
Process 29036 计算 1 的平方
Process 17284 计算 2 的平方
apply_async结果: [0, 1, 4]
apply_async执行时间: 1.01秒
四种方法的区别说明:
1. map: 同步阻塞,等待所有任务完成才返回结果
2. map_async: 异步非阻塞,立即返回AsyncResult对象,可以后续获取结果
3. apply: 同步阻塞,一次只能提交一个任务
4. apply_async: 异步非阻塞,一次提交一个任务,立即返回AsyncResult对象
可以看到异步非阻塞的任务提交方式的执行速度更快。
5. 项目实战:多进程实现大文件的行数统计
未完成。
各位道友,记得一键三连啊。
作者:核桃AI编程