Python threading 并发编程详解

一、引言

在现代编程中,提升程序效率和响应速度至关重要,而并发编程正是实现高效程序的一项重要技术。并发编程可以帮助我们在一个时间段内完成多个任务,尤其在高 I/O 密集型任务的场景中。Python 语言通过 threading 模块为并发编程提供了简洁的实现方法。本文将深入解析 Python 中的 threading 模块,包括其核心概念、使用方法、常见问题及实际场景中的应用。


二、并发与并行的区别

在深入探讨 threading 之前,先了解两个基本概念:并发并行

  • 并发(Concurrency):指的是在同一时间段内处理多个任务,可以是任务之间交替进行,也可以是分阶段处理,特别适合 I/O 密集型操作。
  • 并行(Parallelism):指的是在同一时刻同时执行多个任务,要求硬件具备多核或多处理器的支持,主要适合 CPU 密集型任务。
  • Python 的 threading 模块允许在单个进程内通过多线程实现并发,但由于 GIL(全局解释器锁) 的存在,同一时间只有一个线程在执行 Python 字节码,线程更适合 I/O 密集型任务。


    三、Python 中的 threading 模块简介

    threading 模块提供了创建和管理线程的基础功能。线程的概念类似于轻量级进程,多个线程共享同一内存空间,创建和切换线程的代价比进程更小。

  • 线程创建threading.Thread(target=...) 用于创建一个线程。
  • 线程控制:线程启动(start())、暂停、停止等控制。
  • 线程同步:为保证多线程访问共享资源的安全性,可以使用锁(Lock)、信号量(Semaphore)等。
  • 四、基础示例:创建线程

    以下是一个简单的例子,通过 threading.Thread 创建和启动一个线程来执行一个打印任务。

    import threading
    
    # 定义一个任务函数
    def print_numbers():
        for i in range(5):
            print(f"Number: {i}")
    
    # 创建一个线程
    thread = threading.Thread(target=print_numbers)
    
    # 启动线程
    thread.start()
    
    # 等待线程结束
    thread.join()
    print("Main thread finished.")
    

    在上面的代码中:

  • 我们定义了一个简单的任务 print_numbers
  • 使用 threading.Thread 创建了一个线程,并指定目标函数为 print_numbers
  • thread.start() 启动线程,thread.join() 等待线程执行完毕再继续主线程。

  • 五、线程同步与共享资源

    在多线程编程中,如果多个线程共享同一个资源(如变量或文件),可能会出现数据竞争问题。Python 提供了锁(Lock)、条件变量(Condition)等工具来解决资源竞争。

    1. 使用 Lock 保护共享资源

    锁是确保某段代码在同一时刻只能被一个线程执行的机制。下面是一个简单的示例,展示如何使用 Lock 保护共享变量:

    import threading
    
    # 定义共享资源和锁
    balance = 0
    balance_lock = threading.Lock()
    
    def update_balance(n):
        global balance
        for _ in range(1000):
            # 获取锁
            with balance_lock:
                balance += n
                balance -= n
    
    # 创建多个线程并启动
    threads = []
    for i in range(5):
        t = threading.Thread(target=update_balance, args=(i,))
        threads.append(t)
        t.start()
    
    # 等待所有线程完成
    for t in threads:
        t.join()
    
    print(f"Final balance: {balance}")
    

    在这个示例中:

  • balance_lock 用于保护共享变量 balance,避免多个线程同时修改。
  • with balance_lock 确保在同一时刻,只有一个线程可以进入临界区(更新 balance 的代码块)。

  • 六、常见的线程同步工具

    除了 Lockthreading 模块还提供了其他同步工具,如 信号量(Semaphore)条件变量(Condition)事件(Event)

    1. 信号量(Semaphore)

    信号量用于控制线程的数量,常用于限制同时访问资源的线程数量。例如,有限数量的数据库连接可以通过信号量管理。

    import threading
    import time
    
    # 定义信号量,允许的最大线程数量为3
    semaphore = threading.Semaphore(3)
    
    def limited_task(task_num):
        with semaphore:
            print(f"Task {task_num} is running...")
            time.sleep(2)
            print(f"Task {task_num} is finished.")
    
    # 创建并启动多个线程
    for i in range(5):
        threading.Thread(target=limited_task, args=(i,)).start()
    

    在此例中,信号量 semaphore 控制着最多3个线程可以同时执行任务。

    2. 条件变量(Condition)

    条件变量用于在特定条件满足后通知线程继续执行。它常用于实现“生产者-消费者”模型。

    import threading
    import time
    
    # 共享数据和条件变量
    data = []
    condition = threading.Condition()
    
    # 生产者
    def producer():
        for i in range(5):
            with condition:
                data.append(i)
                print(f"Produced: {i}")
                condition.notify()  # 通知消费者
    
    # 消费者
    def consumer():
        while True:
            with condition:
                condition.wait()  # 等待生产者通知
                item = data.pop(0)
                print(f"Consumed: {item}")
                if item == 4:
                    break
    
    # 创建并启动线程
    producer_thread = threading.Thread(target=producer)
    consumer_thread = threading.Thread(target=consumer)
    producer_thread.start()
    consumer_thread.start()
    producer_thread.join()
    consumer_thread.join()
    

    在该例中,生产者向 data 中添加数据,并通知消费者线程,消费者线程接收通知后处理数据。通过 condition.wait()condition.notify() 来协调生产和消费的时机。


    七、实际场景应用:实现一个多线程下载器

    假设我们要开发一个文件下载器,能够同时下载多个文件,每个文件下载任务占用一个线程。我们可以利用 threading 模块实现一个多线程下载器。

    import threading
    import requests
    
    # 下载文件函数
    def download_file(url, file_name):
        response = requests.get(url)
        with open(file_name, 'wb') as file:
            file.write(response.content)
        print(f"{file_name} downloaded.")
    
    # 文件列表
    file_urls = [
        ("https://example.com/file1.jpg", "file1.jpg"),
        ("https://example.com/file2.jpg", "file2.jpg"),
    ]
    
    # 创建并启动线程
    threads = []
    for url, file_name in file_urls:
        thread = threading.Thread(target=download_file, args=(url, file_name))
        threads.append(thread)
        thread.start()
    
    # 等待所有下载任务完成
    for thread in threads:
        thread.join()
    
    print("All files downloaded.")
    

    在该示例中,每个下载任务都由一个线程完成,多个文件可以同时下载,大大加快了下载速度。


    八、线程池(ThreadPoolExecutor)

    对于大量需要并发的任务,线程池(ThreadPoolExecutor)是一种更加高效的方式。线程池预先创建了一组线程,任务可以提交到线程池,线程池会自动调度线程来执行任务。

    from concurrent.futures import ThreadPoolExecutor
    import requests
    
    # 下载文件函数
    def download_file(url):
        response = requests.get(url)
        file_name = url.split('/')[-1]
        with open(file_name, 'wb') as file:
            file.write(response.content)
        print(f"{file_name} downloaded.")
    
    # 文件 URL 列表
    file_urls = ["https://example.com/file1.jpg", "https://example.com/file2.jpg"]
    
    # 使用线程池下载文件
    with ThreadPoolExecutor(max_workers=4) as executor:
        executor.map(download_file, file_urls)
    

    在此示例中,我们使用 ThreadPoolExecutor 管理线程,executor.map 方法会将 file_urls 中的每个 URL 提交给 download_file 函数,并由线程池处理。


    九、总结

    在本文中,我们详细介绍了 Python threading 模块的基本概念和使用方法,包括线程的创建、线程同步工具的应用和线程池的使用。在实际编程中,选择合适的线程管理方式至关重要,可以大大提高程序的效率和响应速度。

    作者:繁依Fanyi

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python threading 并发编程详解

    发表回复