【Python】线程池深度解析

一、线程池定义

线程池(Thread Pool)是一种高效管理多线程任务的机制,通过复用固定数量的线程来处理多个任务,避免频繁创建和销毁线程的开销。

线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。

此外,使用线程池可以有效地控制系统中并发线程的数量。

二、线程池核心方法

1、使用线程池执行线程任务步骤

(1)调用 ThreadPoolExecutor 类的构造器创建一个线程池。

(2)定义一个普通函数作为线程任务。
(3)调用 ThreadPoolExecutor 对象的 submit() 方法提交线程任务。
(4)当不想提交任何任务时,调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程

2、示例

(1)创建线程池

from concurrent.futures import ThreadPoolExecutor

# 创建包含4个线程的线程池
# 使用 with ThreadPoolExecutor() as executor 时,会在代码块结束时自动调用 shutdown(wait=True),无需手动操作

with ThreadPoolExecutor(max_workers=4) as executor:
    # 提交任务到线程池...
    pass

        注:若不使用with语句,手动管理线程池,应如下示例:

        executor.shutdown(wait=True)‌
        用于关闭线程池并释放资源。参数 wait=True(默认)表示‌阻塞等待所有任务完成后再关闭‌; 

        若设置为 wait=False,则直接关闭线程池,但未完成的任务会继续执行(需手动管理后续等待);

from concurrent.futures import ThreadPoolExecutor

# 手动管理线程池(不使用 with 语句)
executor = ThreadPoolExecutor(max_workers=4)
future = executor.submit(lambda: print("Task running"))

# 阻塞等待所有任务完成,再关闭线程池
executor.shutdown(wait=True)  
print("All tasks done")

(2)提交任务

        ①==‌submit(func, *args, **‌==kwargs)
        提交单个任务,返回一个Future对象(代表异步操作的结果)

future = executor.submit(my_function, arg1, arg2)

        ②map(func, iterable)‌
        批量提交任务,按顺序返回结果

results = executor.map(my_function, [1, 2, 3, 4])

(3)获取结果

        ①future.result()‌
        阻塞等待任务完成并获取结果

result = future.result()

        ②as_completed(futures)‌
        按任务完成顺序获取结果

from concurrent.futures import as_completed
for future in as_completed(futures):
    print(future.result())

三、线程池的典型用法

示例1:基本任务提交

import time
from concurrent.futures import ThreadPoolExecutor

def task(n):
    time.sleep(1)
    return f"Task {n} completed"

with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交任务
    futures = [executor.submit(task, i) for i in range(5)]
    
    # 获取结果
    for future in futures:
        print(future.result())

# 输出:
# Task 0 completed
# Task 1 completed
# Task 2 completed
# Task 3 completed
# Task 4 completed

‌示例2:使用map简化批量任务

with ThreadPoolExecutor() as executor:
    results = executor.map(task, range(5))
    for result in results:
        print(result)

示例3:异步回调

def callback(future):
    print(f"Callback received: {future.result()}")

future = executor.submit(task, 10)
future.add_done_callback(callback)  # 任务完成后触发回调

四、线程池的应用场景

1、‌I/O密集型任务‌

        如网络请求、文件读写、数据库查询等,线程在等待I/O时释放GIL,适合多线程。

‌2、并行处理多个独立任务‌

        例如批量下载文件、处理图片、发送邮件等。

‌3、控制并发数量‌

        避免一次性创建过多线程导致资源耗尽。

五、注意事项

‌1、GIL限制‌

Python的全局解释器锁(GIL)会限制CPU密集型任务的并行性能,此时建议使用多进程(ProcessPoolExecutor)。

‌2、线程安全‌

如果多个线程需要修改共享数据,需使用锁(threading.Lock)确保线程安全。

‌3、资源释放‌

使用with语句或手动调用executor.shutdown()关闭线程池,避免资源泄漏。

4、阻塞等待的两种方式

①‌通过 shutdown(wait=True) 阻塞主线程‌
调用 shutdown(wait=True) 会‌阻塞当前线程‌,直到所有任务完成。适用于需要确保所有任务结束后再继续主线程的场景。

executor = ThreadPoolExecutor(max_workers=2)
futures = [executor.submit(time.sleep, 1) for _ in range(5)]

# 主线程在此处阻塞,直到所有任务完成
executor.shutdown(wait=True)  
print("All tasks completed")  # 此代码在所有任务完成后执行

‌②通过 future.result() 或 as_completed() 按需阻塞‌
若希望主线程在等待任务结果时灵活控制(例如逐个处理完成的任务),可使用 future.result() 或 as_completed() 的阻塞特性:

with ThreadPoolExecutor() as executor:
    futures = [executor.submit(time.sleep, 1) for _ in range(3)]
    
    # 主线程在此处逐个等待任务完成
    for future in as_completed(futures):
        print(future.result())  # 每次循环阻塞直到一个任务完成

总结:

①何时需要 shutdown()‌

-如果使用 with 语句,无需手动调用 shutdown()。
-若手动创建线程池(无 with 语句),必须显式调用 shutdown() 或确保线程池被垃圾回收(不推荐依赖垃圾回收)。

②阻塞行为控制‌

shutdown(wait=True):全局阻塞,等待所有任务。
future.result() 或 as_completed():按需阻塞,逐个处理任务结果。

‌③资源释放‌

-无论是否设置 wait=True,调用 shutdown() 后线程池会拒绝新任务提交,并释放线程资源。确保在关闭后不再操作 executor 或 futures。

‌5、异常处理‌

在任务函数中捕获异常,或通过future.exception()获取异常信息。、

六、性能优化

‌1、合理设置线程数‌

        I/O密集型任务:线程数可设为 CPU核心数 * 5(经验值)。
        CPU密集型任务:避免使用线程池,改用多进程。

‌2、避免阻塞主线程‌

        使用as_completed或concurrent.futures.wait实现异步结果处理。

七、完整示例

1、自动关闭场景

import concurrent.futures
import requests

def download(url):
    response = requests.get(url)
    return f"Downloaded {url} (Status: {response.status_code})"

urls = [
    "https://www.example.com",
    "https://www.python.org",
    "https://www.github.com"
]

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = {executor.submit(download, url): url for url in urls}
    for future in concurrent.futures.as_completed(futures):
        url = futures[future]
        try:
            print(future.result())
        except Exception as e:
            print(f"Error downloading {url}: {e}")

2、手动关闭场景

import concurrent.futures
import time

def task(n):
    time.sleep(1)
    return f"Task {n} done"

# 手动管理线程池
executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
futures = [executor.submit(task, i) for i in range(5)]

# 主线程继续执行其他操作...
print("Main thread is working...")

# 阻塞等待所有任务完成后再关闭线程池
executor.shutdown(wait=True)  
print("All tasks finished")

# 此时再访问 futures 的结果(已全部完成)
for future in futures:
    print(future.result())

作者:黏苞米

物联沃分享整理
物联沃-IOTWORD物联网 » 【Python】线程池深度解析

发表回复