【Python】线程池深度解析
一、线程池定义
线程池(Thread Pool)是一种高效管理多线程任务的机制,通过复用固定数量的线程来处理多个任务,避免频繁创建和销毁线程的开销。
线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。
此外,使用线程池可以有效地控制系统中并发线程的数量。
二、线程池核心方法
1、使用线程池执行线程任务步骤
(1)调用 ThreadPoolExecutor 类的构造器创建一个线程池。
(2)定义一个普通函数作为线程任务。
(3)调用 ThreadPoolExecutor 对象的 submit() 方法提交线程任务。
(4)当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程
2、示例
(1)创建线程池
from concurrent.futures import ThreadPoolExecutor
# 创建包含4个线程的线程池
# 使用 with ThreadPoolExecutor() as executor 时,会在代码块结束时自动调用 shutdown(wait=True),无需手动操作
with ThreadPoolExecutor(max_workers=4) as executor:
# 提交任务到线程池...
pass
注:若不使用with语句,手动管理线程池,应如下示例:
executor.shutdown(wait=True)
用于关闭线程池并释放资源。参数 wait=True(默认)表示阻塞等待所有任务完成后再关闭;
若设置为 wait=False,则直接关闭线程池,但未完成的任务会继续执行(需手动管理后续等待);
from concurrent.futures import ThreadPoolExecutor
# 手动管理线程池(不使用 with 语句)
executor = ThreadPoolExecutor(max_workers=4)
future = executor.submit(lambda: print("Task running"))
# 阻塞等待所有任务完成,再关闭线程池
executor.shutdown(wait=True)
print("All tasks done")
(2)提交任务
①==submit(func, *args, **‌==kwargs)
提交单个任务,返回一个Future对象(代表异步操作的结果)
future = executor.submit(my_function, arg1, arg2)
②map(func, iterable)
批量提交任务,按顺序返回结果
results = executor.map(my_function, [1, 2, 3, 4])
(3)获取结果
①future.result()
阻塞等待任务完成并获取结果
result = future.result()
②as_completed(futures)
按任务完成顺序获取结果
from concurrent.futures import as_completed
for future in as_completed(futures):
print(future.result())
三、线程池的典型用法
示例1:基本任务提交
import time
from concurrent.futures import ThreadPoolExecutor
def task(n):
time.sleep(1)
return f"Task {n} completed"
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交任务
futures = [executor.submit(task, i) for i in range(5)]
# 获取结果
for future in futures:
print(future.result())
# 输出:
# Task 0 completed
# Task 1 completed
# Task 2 completed
# Task 3 completed
# Task 4 completed
示例2:使用map简化批量任务
with ThreadPoolExecutor() as executor:
results = executor.map(task, range(5))
for result in results:
print(result)
示例3:异步回调
def callback(future):
print(f"Callback received: {future.result()}")
future = executor.submit(task, 10)
future.add_done_callback(callback) # 任务完成后触发回调
四、线程池的应用场景
1、I/O密集型任务
如网络请求、文件读写、数据库查询等,线程在等待I/O时释放GIL,适合多线程。
2、并行处理多个独立任务
例如批量下载文件、处理图片、发送邮件等。
3、控制并发数量
避免一次性创建过多线程导致资源耗尽。
五、注意事项
1、GIL限制
Python的全局解释器锁(GIL)会限制CPU密集型任务的并行性能,此时建议使用多进程(ProcessPoolExecutor)。
2、线程安全
如果多个线程需要修改共享数据,需使用锁(threading.Lock)确保线程安全。
3、资源释放
使用with语句或手动调用executor.shutdown()关闭线程池,避免资源泄漏。
4、阻塞等待的两种方式
①通过 shutdown(wait=True) 阻塞主线程
调用 shutdown(wait=True) 会阻塞当前线程,直到所有任务完成。适用于需要确保所有任务结束后再继续主线程的场景。
executor = ThreadPoolExecutor(max_workers=2)
futures = [executor.submit(time.sleep, 1) for _ in range(5)]
# 主线程在此处阻塞,直到所有任务完成
executor.shutdown(wait=True)
print("All tasks completed") # 此代码在所有任务完成后执行
②通过 future.result() 或 as_completed() 按需阻塞
若希望主线程在等待任务结果时灵活控制(例如逐个处理完成的任务),可使用 future.result() 或 as_completed() 的阻塞特性:
with ThreadPoolExecutor() as executor:
futures = [executor.submit(time.sleep, 1) for _ in range(3)]
# 主线程在此处逐个等待任务完成
for future in as_completed(futures):
print(future.result()) # 每次循环阻塞直到一个任务完成
总结:
①何时需要 shutdown()
-如果使用 with 语句,无需手动调用 shutdown()。
-若手动创建线程池(无 with 语句),必须显式调用 shutdown() 或确保线程池被垃圾回收(不推荐依赖垃圾回收)。
②阻塞行为控制
shutdown(wait=True):全局阻塞,等待所有任务。
future.result() 或 as_completed():按需阻塞,逐个处理任务结果。
③资源释放
-无论是否设置 wait=True,调用 shutdown() 后线程池会拒绝新任务提交,并释放线程资源。确保在关闭后不再操作 executor 或 futures。
5、异常处理
在任务函数中捕获异常,或通过future.exception()获取异常信息。、
六、性能优化
1、合理设置线程数
I/O密集型任务:线程数可设为 CPU核心数 * 5(经验值)。
CPU密集型任务:避免使用线程池,改用多进程。
2、避免阻塞主线程
使用as_completed或concurrent.futures.wait实现异步结果处理。
七、完整示例
1、自动关闭场景
import concurrent.futures
import requests
def download(url):
response = requests.get(url)
return f"Downloaded {url} (Status: {response.status_code})"
urls = [
"https://www.example.com",
"https://www.python.org",
"https://www.github.com"
]
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(download, url): url for url in urls}
for future in concurrent.futures.as_completed(futures):
url = futures[future]
try:
print(future.result())
except Exception as e:
print(f"Error downloading {url}: {e}")
2、手动关闭场景
import concurrent.futures
import time
def task(n):
time.sleep(1)
return f"Task {n} done"
# 手动管理线程池
executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
futures = [executor.submit(task, i) for i in range(5)]
# 主线程继续执行其他操作...
print("Main thread is working...")
# 阻塞等待所有任务完成后再关闭线程池
executor.shutdown(wait=True)
print("All tasks finished")
# 此时再访问 futures 的结果(已全部完成)
for future in futures:
print(future.result())
作者:黏苞米