python-20-理解多进程多线程,进程池,线程池,进程间通信最详细整理!
python-20-多进程多线程详解
一.说明
在python中的基础系列我们终于来到了多进程和多线程的知识点了,在学习这个知识点前,我想问问都是用来进行并发处理的,为啥要区分多进程 多线程?或者说什么是多进程 什么是多线程?
多进程:
1.多进程是通过创建多个独立的进程来并行执行任务
举个例子:你电脑是8核心的,那么可以创建超过8个进程,但是超过8个进程会导致上下文切换开销增加,从而导致降低性能!
说句人话,当进程数小于cpu核心数,每个进程都能分配到自己的核心进行运行,不必进行进程切换,从而没有切换上下文的开销!!
所以,多进程并不是越多越好!!
多线程:
-
线程 是程序中执行流的最小单元,一个进程可以包含多个线程;
-
多线程是通过在同一进程中创建多个线程来并发执行任务,如果任务是 I/O 密集型的(例如网络请求、文件操作),由于 GIL 释放给了 I/O 操作,其他线程可以在 GIL 被释放时执行,因此多线程仍然能够在 I/O 密集型任务中提高并发性能。
-
多个线程共享同一进程的内存空间和资源,线程之间的创建和切换比进程更加轻量级;
-
对于 CPU 密集型任务,Python 的 全局解释器锁(GIL) 会限制线程的并行执行,这就使得多线程在 CPU 密集型任务上无法提供预期的性能提升;
-
Python 的 全局解释器锁(GIL):
在 Python 中,全局解释器锁(GIL) 会影响多线程的并发执行。GIL 只允许一个线程在任何时刻执行 Python 字节码,这意味着对于 CPU 密集型任务,Python 的多线程不能利用多核 CPU 的计算能力,只能在单个核心上执行。
-
线程池的大小不可随便确定
CPU 核心数:对于 CPU 密集型任务,线程池的大小通常不会超过 CPU 核心数,因为超出核心数的线程会增加上下文切换的开销,反而降低性能。
I/O 密集型任务:对于 I/O 密集型任务,线程池的大小可以更大,因为线程在等待 I/O 操作完成时不会占用 CPU 资源,操作系统会将 CPU 分配给其他线程。
内存和系统资源:每个线程都需要一定的内存和系统资源,过多的线程会导致内存消耗过大,甚至会导致系统资源耗尽。
-
线程可以跨多个 CPU 核心运行,操作系统会管理线程在核心之间的调度。
二.多进程
1. os.fork() 函数 (不推荐,不能跨平台,只能Linux、Unix )
它在当前进程的上下文中创建一个子进程。子进程是当前进程(称为父进程)的副本,但拥有自己的进程ID(PID)
fork()
的基本原理
fork()
返回新创建的子进程的进程 ID。fork()
返回 0。fork()
返回 -1,并抛出OSError。示例
import os
import time
def main():
pid = os.fork() # 创建子进程
if pid < 0:
# fork() 失败
print("Fork failed")
elif pid == 0:
# 子进程执行的代码
print(f"Child process: My PID is {os.getpid()}")
time.sleep(2) # 模拟子进程工作
print("Child process: Work done!")
else:
# 父进程执行的代码
print(f"Parent process: My PID is {os.getpid()} and my child's PID is {pid}")
os.wait() # 等待子进程结束
print("Parent process: Child has terminated.")
if __name__ == "__main__":
main()
2.multiprocessing.Process()函数
- multiprocessing模块提供Process类实现新建进程
import multiprocessing
import time
def worker(num):
print(f'Worker {num} starting')
time.sleep(2)
print(f'Worker {num} finished')
if __name__ == '__main__':
processes = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()
'''
Worker 0 starting
Worker 1 starting
Worker 2 starting
Worker 3 starting
Worker 4 starting
Worker 0 finished
Worker 1 finished
Worker 2 finished
Worker 3 finished
Worker 4 finished
'''
-
pool = multiprocessing.Pool(processes=5) # 创建进程池实现多进程
pool.apply()
和pool.apply_async()
: apply()
:类似于顺序调用函数,它会阻塞,直到任务执行完成。apply_async()
:异步执行任务,不会阻塞主程序。map()
:将函数func
应用到iterable
中的每一个元素,返回结果列表,并且是同步执行的。map_async()
:map()
的异步版本,返回一个AsyncResult
对象,允许你在执行期间继续做其他工作。close()
:# 关闭进程池,表示不能在往进程池中添加进程join()
:# 等待进程池中的所有进程执行完毕,必须在close()之后调用-
多进程获取进程返回结果
multiprocessing.Process 本身并不能直接返回值,我们需要Queue (队列)来实现收集结果
import multiprocessing import time def worker(num, queue): print(f'Worker {num} starting') time.sleep(2) print(f'Worker {num} finished') queue.put(num * 2) # 将计算结果放入队列中 if __name__ == '__main__': processes = [] queue = multiprocessing.Queue() # 创建一个队列用于进程间通信 # 创建并启动 5 个进程 for i in range(5): p = multiprocessing.Process(target=worker, args=(i, queue)) processes.append(p) p.start() # 等待所有进程完成 for p in processes: p.join() # 获取并打印每个进程的返回值 while not queue.empty(): # 确保队列中所有结果都已获取 result = queue.get() print(f'Result from worker: {result}')
-
进程池获取返回结果
Pool.map()
:是并行计算并按顺序返回结果,适合任务较简单且结果需要按顺序返回的场景;Pool.apply_async()
:是异步调用任务,适合任务较复杂或需要更灵活控制的场景。需要使用get()
来获取任务结果。Pool.map_async()
:是map()
的异步版本,返回一个AsyncResult
对象。#利用map import multiprocessing import time def worker(num): print(f'Worker {num} starting') time.sleep(2) print(f'Worker {num} finished') return num * 2 # 返回任务的结果 if __name__ == '__main__': # 创建一个进程池,最多同时运行 5 个进程 with multiprocessing.Pool(processes=5) as pool: # 使用 map 并行地执行任务 results = pool.map(worker, range(5)) # 返回每个进程的返回值列表 # 打印所有结果 print("Results:", results) ######利用get方法 import multiprocessing import time def worker(num): print(f'Worker {num} starting') time.sleep(2) print(f'Worker {num} finished') return num * 2 if __name__ == '__main__': # 创建进程池 with multiprocessing.Pool(processes=5) as pool: # 使用 apply_async 异步提交任务 results = [pool.apply_async(worker, (i,)) for i in range(5)] # 获取每个任务的返回值 output = [result.get() for result in results] # 使用 get() 等待任务完成并获取结果 print("Results:", output) ''' Worker 0 starting Worker 1 starting Worker 2 starting Worker 3 starting Worker 4 starting Worker 0 finished Worker 2 finished Worker 1 finished Worker 3 finished Worker 4 finished Results: [0, 2, 4, 6, 8] ''' ################错误map_async用法 import multiprocessing import time def worker(num): print(f'Worker {num} starting') time.sleep(2) print(f'Worker {num} finished') return num * 2 if __name__ == '__main__': # 创建进程池 with multiprocessing.Pool(processes=5) as pool: # 使用 map_async 异步提交任务 results = [pool.map_async(worker, (i,)) for i in range(5)] # 获取每个任务的返回值 output = [result.get() for result in results] # 使用 get() 等待任务完成并获取结果 print("Results:", output) ''' Worker 0 starting Worker 2 starting Worker 1 starting Worker 3 starting Worker 4 starting Worker 2 finished Worker 0 finished Worker 1 finished Worker 3 finished Worker 4 finished Results: [[0], [2], [4], [6], [8]] ''' #########正确map_async用法 import multiprocessing import time def worker(num): print(f'Worker {num} starting') time.sleep(2) print(f'Worker {num} finished') return num * 2 if __name__ == '__main__': # 创建进程池 with multiprocessing.Pool(processes=5) as pool: # 使用 map_async 异步提交任务 result = pool.map_async(worker, range(5)) # 获取每个任务的返回值 output = result.get() # 使用 get() 等待任务完成并获取结果 print("Results:", output) ''' Worker 0 starting Worker 1 starting Worker 2 starting Worker 3 starting Worker 4 starting Worker 0 finished Worker 2 finished Worker 1 finished Worker 3 finished Worker 4 finished Results: [0, 2, 4, 6, 8] '''
注意:
上面的例子是同样的写法,但结果却不一样:
apply_async获取的结果是[0, 2, 4, 6, 8]
map_async获取的结果是[[0], [2], [4], [6], [8]]
为什么2个代码基本结构一样,但是执行的结构却不一样?
因为
apply_async
是异步产生5个独立的任务,每个任务都在独立的进程中运行,并且apply_async()
返回的是AsyncResult
对象;map_async()
提供了一个 包含单个元素的元组(i,)
,这意味着它会按顺序执行 5 次map_async()
,每次提交 单个任务,这与apply_async()
的行为不同,实际上是将任务映射到整个可迭代对象的所有元素,它会在内部处理多个任务,而返回的是一个包含所有结果的AsyncResult
对象,因此输出和任务提交的方式不一样;通过这个例子能更好的理解
apply_async
和map_async()
的区别 -
进程间通讯方式
进程间通信(IPC, Inter-Process Communication) 是指多个进程之间交换数据和信息的机制。python中进程间通信方式,主要包括以下几种;
Queue(队列):允许多个进程之间以生产者-消费者模式进行数据交换。
Pipe(管道):通过管道的两端进行数据交换。
Manager(共享内存):提供进程间共享对象(如共享字典、列表等)。
Value 和 Array(共享内存):允许多个进程共享单一的变量或数组。
Lock(锁):确保多个进程对共享资源的同步访问
-
Queue(队列)
import multiprocessing import time def worker(num, queue): print(f'Worker {num} starting') time.sleep(2) print(f'Worker {num} finished') queue.put(num * 2) # 将计算结果放入队列中 if __name__ == '__main__': processes = [] queue = multiprocessing.Queue() # 创建一个队列用于进程间通信 # 创建并启动 5 个进程 for i in range(5): p = multiprocessing.Process(target=worker, args=(i, queue)) processes.append(p) p.start() # 等待所有进程完成 for p in processes: p.join() # 获取并打印每个进程的返回值 while not queue.empty(): # 确保队列中所有结果都已获取 result = queue.get() print(f'Result from worker: {result}')
-
Pipe(管道)
Pipe
是multiprocessing
模块提供的另一种进程间通信方式,它通过管道的两端进行通信。管道有两个端口,一个用于发送数据(send
),一个用于接收数据(recv
)。import multiprocessing import time def sender(conn): for i in range(5): print(f"Sender sending {i}") conn.send(i) # 发送数据 time.sleep(1) def receiver(conn): while True: data = conn.recv() # 接收数据 if data == "DONE": break print(f"Receiver received {data}") if __name__ == "__main__": # 创建管道,返回两个连接对象 parent_conn, child_conn = multiprocessing.Pipe() # 创建进程 p1 = multiprocessing.Process(target=sender, args=(parent_conn,)) p2 = multiprocessing.Process(target=receiver, args=(child_conn,)) p1.start() p2.start() p1.join() parent_conn.send("DONE") # 结束信号 p2.join() ''' Sender sending 0 Receiver received 0 Sender sending 1 Receiver received 1 Sender sending 2 Receiver received 2 Sender sending 3 Receiver received 3 Sender sending 4 Receiver received 4 '''
-
Manager(共享内存)
Manager
是multiprocessing
模块中提供的一个特殊对象,它允许不同进程共享数据。Manager
可以创建共享对象,比如共享的列表、字典等,这些对象会在进程间同步。import multiprocessing import time def worker(shared_dict): for i in range(5): shared_dict[i] = f"Task {i}" print(f"Worker added: {shared_dict[i]}") time.sleep(1) if __name__ == "__main__": with multiprocessing.Manager() as manager: shared_dict = manager.dict() # 创建一个共享字典 p = multiprocessing.Process(target=worker, args=(shared_dict,)) p.start() p.join() print(f"Shared dict: {shared_dict}") ''' Worker added: Task 0 Worker added: Task 1 Worker added: Task 2 Worker added: Task 3 Worker added: Task 4 Shared dict: {0: 'Task 0', 1: 'Task 1', 2: 'Task 2', 3: 'Task 3', 4: 'Task 4'} '''
-
Value 和 Array(共享内存)
Value
和Array
是multiprocessing
模块中的两种共享内存对象,可以用于在多个进程之间共享数据。Value
用于存储单一数据类型的值,而Array
用于存储数组(类似于列表)。import multiprocessing import time def worker(shared_value): for i in range(5): shared_value.value += 1 # 修改共享变量 print(f"Worker incremented: {shared_value.value}") time.sleep(1) if __name__ == "__main__": # 创建一个共享变量 shared_value = multiprocessing.Value('i', 0) # 'i' 表示整型 p = multiprocessing.Process(target=worker, args=(shared_value,)) p.start() p.join() print(f"Final shared value: {shared_value.value}") ''' Worker incremented: 1 Worker incremented: 2 Worker incremented: 3 Worker incremented: 4 Worker incremented: 5 Final shared value: 5 '''
-
Lock(锁)
在多个进程共享数据时,往往需要确保数据的一致性和线程安全。
Lock
是 Python 提供的一种同步原语,它用于确保同一时刻只有一个进程能够访问某个共享资源。import multiprocessing import time def worker(lock, shared_value): for _ in range(5): with lock:# 使用锁来保证线程安全 shared_value.value += 1 print(f"Worker incremented: {shared_value.value}") time.sleep(1) if __name__ == "__main__": shared_value = multiprocessing.Value('i', 0) # 创建一个共享变量 lock = multiprocessing.Lock() # 创建一个锁 processes = [multiprocessing.Process(target=worker, args=(lock, shared_value)) for _ in range(3)] for p in processes: p.start() for p in processes: p.join() print(f"Final shared value: {shared_value.value}")
-
pool.map()
和 pool.map_async()
:
pool.close()
和 pool.join()
:
import multiprocessing
import time
def worker(num):
print(f'Worker {num} starting')
time.sleep(2)
print(f'Worker {num} finished')
if __name__ == '__main__':
# 创建一个进程池,最多同时运行 5 个进程
with multiprocessing.Pool(processes=5) as pool:
# 使用 map 方法将任务分配给进程池,map 会阻塞,直到所有任务完成
pool.map(worker, range(5))
###########利用.get()方法来阻塞和等待结果
import multiprocessing
import time
def worker(num):
print(f'Worker {num} starting')
time.sleep(2)
print(f'Worker {num} finished')
return num * 2
if __name__ == '__main__':
# 使用进程池
with multiprocessing.Pool(processes=5) as pool:
# 使用 apply_async 异步执行任务
async_result = pool.apply_async(worker, (1,))
print(f'Result of apply_async: {async_result.get()}') # 阻塞等待结果
# 使用 map_async 异步执行任务
async_results = pool.map_async(worker, range(5))
print(f'Result of map_async: {async_results.get()}') # 阻塞等待所有结果
####################利用 pool.close()和pool.join()
import multiprocessing
import time
def worker(num):
print(f'Worker {num} starting')
time.sleep(2)
print(f'Worker {num} finished')
return num
if __name__ == '__main__':
# 创建进程池,最多同时运行 5 个进程
resList = []
with multiprocessing.Pool(processes=5) as pool:
# 使用 apply_async 异步提交任务
results = []
for i in range(5):
result = pool.apply_async(worker, (i,))
results.append(result) # 存储每个任务的 AsyncResult 对象
# 关闭进程池,不再接受新任务
pool.close()
# 等待所有任务完成并获取结果
for result in results:
resList.append(result.get()) # 阻塞,等待每个任务的完成
# 等待所有子进程结束
pool.join()
print(resList)
'''
Worker 0 starting
Worker 1 starting
Worker 2 starting
Worker 3 starting
Worker 4 starting
Worker 2 finished
Worker 0 finished
Worker 1 finished
Worker 3 finished
Worker 4 finished
[0, 1, 2, 3, 4]
'''
三.多线程
多线程是通过在同一进程中创建多个线程来并发执行任务。线程共享进程的内存空间,因此它们之间可以更容易地共享数据,但也容易引发线程安全问题。由于 GIL 的存在,Python 的多线程在 CPU 密集型任务上通常不能提高性能,但在 I/O 密集型任务上表现良好。
- 基本用法
Python 中可以使用 threading
模块来实现多线程。以下是一个简单的示例:
import threading
import time
def worker(num):
print(f'Worker {num} starting')
time.sleep(2)
print(f'Worker {num} finished')
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
-
线程池
线程池是一种设计模式,用于管理一定数量的线程来处理多个任务。线程池的目的是避免频繁地创建和销毁线程带来的开销,同时通过复用线程来提高程序的效率。
在 Python 中,
concurrent.futures.ThreadPoolExecutor
提供了线程池的功能,它会管理一定数量的线程,并按需调度这些线程来执行任务。import concurrent.futures import time def worker(num): print(f"Worker {num} starting") time.sleep(2) print(f"Worker {num} finished") return num * 2 # 返回结果 # 使用线程池执行任务 with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: results = [executor.submit(worker, i) for i in range(5)] # 获取每个任务的结果 output = [result.result() for result in results] print("Results:", output) ''' Worker 0 starting Worker 1 starting Worker 2 starting Worker 2 finished Worker 0 finished Worker 3 starting Worker 4 starting Worker 1 finished Worker 4 finished Worker 3 finished Results: [0, 2, 4, 6, 8] '''
-
线程锁
import threading import time # 共享资源(全局变量) shared_value = 0 # 创建一个锁对象 lock = threading.Lock() # 线程工作函数 def increment(): global shared_value # 获取锁,防止其他线程访问共享资源 lock.acquire() # 获取锁 try: print(f"Thread {threading.current_thread().name} is incrementing shared_value") current_value = shared_value time.sleep(0.5) # 模拟一些工作 shared_value = current_value + 1 print(f"Thread {threading.current_thread().name} updated shared_value to {shared_value}") finally: # 无论是否发生异常,都要释放锁 lock.release() # 释放锁 if __name__ == "__main__": # 创建多个线程 threads = [threading.Thread(target=increment, name=f"Thread-{i}") for i in range(5)] # 启动线程 for t in threads: t.start() # 等待所有线程完成 for t in threads: t.join() print(f"Final shared_value: {shared_value}")
五.总结
Python 多线程、多进程初学编程的人可能觉得很复杂 其实静下来,理解他 就知道原来就这么回事,没啥!重点是要理解什么是进程什么是线程!!理解了根据真实项目 照着写就行!切记学编程不是死记硬背!如果你采用死记硬背的方式来学编程,那么你完蛋了!!!你永远背不完!!chatgpt也有错误。。何况人。。人与chatgpt的区别是理解,是思想。。
创作整理不易,请大家多多关注 多多点赞,有写的不对的地方欢迎大家补充,我来整理,再次感谢!
作者:SEEONTIME