Python 多线程编程详解:用 Threading 优雅提升程序并发性能

Python 多线程编程详解:用 Threading 优雅提升程序并发性能

文章目录

  • Python 多线程编程详解:用 Threading 优雅提升程序并发性能
  • 一 多线程
  • 1 基本概念
  • 2 注意事项
  • 二 线程的常用功能
  • 1 线程模块
  • 2 常规操作
  • 3 创建线程
  • 三 join 功能
  • 1 不加 join 功能
  • 2 加 join 的具体示例
  • 四 储存线程结果 Queue
  • 1 导入线程队列的标准模块
  • 2 定义被线程调用的函数
  • 3 定义多线程任务函数
  • 五 GIL 全局解释器锁
  • 划重点
  • 六 线程锁 Lock
  • 1 不使用 Lock 的情况
  • 2 使用 Lock 的情况
  • 七 完整代码示例
  • 八 源码地址
  • 本文深入探讨了 Python 中多线程(Threading)的原理与应用,帮助读者更好地理解多线程在程序优化中的重要性。文章从多线程的基本概念入手,详细讲解了线程的创建与管理、join 方法的使用、线程之间的数据共享与同步、以及如何使用 Queue 实现线程间的数据传递。通过生动的代码示例,演示了如何在 Python 中有效利用多线程处理 I/O 密集型任务,以及如何通过线程锁(Lock)避免数据竞争问题。还解析了 GIL(全局解释器锁)对多线程性能的影响,为开发者提供了多线程编程的实战指导和性能优化的最佳实践。

    一 多线程

    Python 中的多线程(Threading)是一种允许程序在同一时间内执行多个操作的方式。它的核心思想是在程序中创建多个线程(Thread),每个线程可以独立地执行任务。

    1 基本概念
  • 线程(Thread):线程是操作系统能够进行运算调度的最小单位,它包含在进程中,是进程的实际运作单位。一个进程可以包含多个线程,这些线程共享进程的内存空间和资源。
  • 多线程(Multithreading):在一个程序中创建多个线程,使它们并行运行,从而提升程序的效率。多线程特别适用于 I/O 密集型操作(如网络请求、文件读写)或多个独立任务需要同时执行的场景。
  • 2 注意事项
  • 全局解释器锁(GIL):Python 的线程虽然可以实现并发,但由于 GIL 的存在,在 CPU 密集型任务中,多线程的性能提升可能不明显。对于 I/O 密集型任务,多线程仍然非常有用。
  • 数据竞争:多线程访问共享数据时,如果没有合适的同步机制(如锁),可能会出现数据竞争问题。
  • 多线程的核心目标是提高程序的并发性,但在 Python 中使用时需要特别注意线程安全问题和 GIL 的限制。

    二 线程的常用功能

    1 线程模块
    import threading
    
    2 常规操作
    # 获取已激活的线程数
    print("已激活的线程数 ", threading.active_count())
    # 查看所有线程信息
    print("所有线程信息 ", threading.enumerate())
    # 查看现在正在运行的线程
    print("现在正在运行的线程 ", threading.current_thread())
    
    3 创建线程
    def thread_job():
        print(' 1 This is a thread of %s' % threading.current_thread())
    
    def my_thread():
        # target 为目标函数,想要调用的任务方法。name 为当前线程名称。
        thread = threading.Thread(target=thread_job, name='我新建的线程')  # 定义线程
        thread.start()  # 让线程开始工作
        # time.sleep(10)
    

    三 join 功能

    1 不加 join 功能
    def thread_job02():
        print(' 2 This is a thread of %s' % threading.current_thread())
        print("thread_job02 T1 start\n")
        for i in range(10):
            time.sleep(0.1)  # 任务间隔0.1s
        print("thread_job02 T1 finish\n")
    
    def job02_thread():
        # join 功能 
        added_thread = threading.Thread(target=thread_job02, name='T1')
        added_thread.start()
        # 加 join 功能,放开注释就是加 join 的功能,和预想的结果一致。
        # added_thread.join()
        print("job02_thread all done\n")
        # join 控制多个线程的执行顺序
    

    结果对比

    # 预想的结果
    thread_job02 T1 start
    thread_job02 T1 finish
    job02_thread all done
    # 实际的结果
    thread_job02 T1 start
    job02_thread all done
    thread_job02 T1 finish
    
    2 加 join 的具体示例
    def T1_job():
        print("T1_job T1 start\n")
        for i in range(10):
            time.sleep(0.1)
        print("T1_job T1 finish\n")
    
    
    def T2_job():
        print("T2_job T2 start\n")
        print("T2_job T2 finish\n")
    
    
    def T1T2_job_thread():
        thread_1 = threading.Thread(target=T1_job, name='T1')
        thread_2 = threading.Thread(target=T2_job, name='T2')
        thread_1.start()  # 开启T1
        thread_2.start()  # 开启T2
        # 不加 join 打印信息的前后顺序,取决去线程处理数据的速度
        # 在写 join 的时候可以把处理数据量少的写在前面,主要目的是减少主线程或其他依赖线程的等待时间
        thread_2.join()
        thread_1.join()
        print("T1T2_job_thread all done\n")
    

    不加 join 打印信息的前后顺序,取决去线程处理数据的速度;在写 join 的时候可以把处理数据量少的写在前面,主要目的是减少主线程或其他依赖线程的等待时间

    四 储存线程结果 Queue

    1 导入线程队列的标准模块
    from queue import Queue
    
    2 定义被线程调用的函数
    # 参数 一个列表l和一个队列q
    def job(l, q):
        for i in range(len(l)):
            # 给列表元素作平方计算
            l[i] = l[i] ** 2
        q.put(l)  
    

    注:多线程调用的函数不能用return返回值

    3 定义多线程任务函数
    def multithreading():
        q = Queue()  # q中存放返回值,代替return的返回值
        threads = []
        data = [[1, 2, 3], [3, 4, 5], [4, 4, 4], [5, 5, 5]]
    
        for i in range(4):
            # 定义四个线程, Thread首字母要大写,被调用的job函数没有括号,只是一个索引,参数在后面
            t = threading.Thread(target=job, args=(data[i], q))
            # 开始线程
            t.start()
            # 把每个线程append到线程列表中
            threads.append(t)
    
        for thread in threads:
            # 阻塞当前线程,主线程暂停执行(或调用 join() 的线程)
            thread.join()
        results = []
        for _ in range(4):
            results.append(q.get())  # q.get()按顺序从q中拿出一个值
        print(results)
    

    五 GIL 全局解释器锁

    GIL(Global Interpreter Lock,全局解释器锁)是 Python 的一种机制,它限制了同一时刻只有一个线程可以执行 Python 字节码。这意味着在多线程的情况下,即使系统有多个 CPU 核心,也只能有一个线程在同一时间执行 Python 代码。

    def job02(l, q):
        res = sum(l)
        q.put(res)
    
    
    def multithreading02(l):
        q = Queue()
        threads = []
        for i in range(4):
            t = threading.Thread(target=job02, args=(copy.copy(l), q), name='T%i' % i)
            t.start()
            threads.append(t)
        [t.join() for t in threads]
        total = 0
        for _ in range(4):
            total += q.get()
        print(total)
    
    
    def normal02(l):
        total = sum(l)
        print(total)
    

    运行时间对比

    if __name__ == '__main__':
        l = list(range(1000000))
        s_t = time.time()
        normal02(l * 4)
        print('normal: ', time.time() - s_t)
        s_t = time.time()
        multithreading02(l)
        print('multithreading: ', time.time() - s_t)
    

    GIL 不一定有效率,multithreading02normal02 运行了一样多次的运算,multithreading02` 并没有快多少,甚至有时候会慢。运行结果如下

    1999998000000
    normal02:  0.03085780143737793
    1999998000000
    multithreading02:  0.033220767974853516
    
    划重点

    线程安全: GIL 的主要目的是在 CPython(Python 的主流解释器实现)中简化内存管理,避免多线程访问共享数据时的竞争问题。GIL 确保了同一时刻只有一个线程能够执行 Python 代码,从而避免了多线程操作内存时的冲突。

    六 线程锁 Lock

    1 不使用 Lock 的情况
    A = 0
    
    def job1_G():
        global A
        for i in range(10):
            A += 1
            print('job1', A)
    
    def job2_G():
        global A
        for i in range(10):
            A += 10
            print('job2', A)
    
    def job_1_2_G():
        t1 = threading.Thread(target=job1_G)
        t2 = threading.Thread(target=job2_G)
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    

    运行结果无序资源竞争激烈,如下

    job1 1
    job1 job2 12
    job2 22
    job2 32
    job2 42
    job2 52
    2
    job1 53
    job1 54
    job1 55
    job1 56
    job1 57
    job1job2 68 
    58
    job1job2 79
    job2  69
    89
    job2 99
    job2 109
    job1 110
    
    2 使用 Lock 的情况

    划重点

    Lock 在同一时刻只用一个线程访问共享内存,防止 “竞态条件” 问题,它是把互斥锁。

  • 竞态条件: 当多个线程同时操作共享资源时,如果没有适当的同步机制,线程间的执行顺序可能会相互交错,导致数据竞争或错误。
  • 互斥锁(Lock): 互斥锁是一种同步原语,确保在同一时刻只有一个线程可以访问某个资源,其他线程在锁被占用时无法访问受保护的资源。
  • A = 0
    lock = threading.Lock()
    
    def job1_L():
        global A, lock
        lock.acquire()
        try:
            for i in range(10):
                A += 1
                print('job1', A)
        finally:
            lock.release()
    
    def job2_L():
        global A, lock
        # 使用 with 语句可以简化 acquire 和 release 的过程,使代码更简洁清晰。推荐
        with lock:
            for i in range(10):
                A += 10
                print('job2', A)
    
    def job_1_2_L():
        t1 = threading.Thread(target=job1_L)
        t2 = threading.Thread(target=job2_L)
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    

    运行结果有序资源使用合理,如下

    job1 1
    job1 2
    job1 3
    job1 4
    job1 5
    job1 6
    job1 7
    job1 8
    job1 9
    job1 10
    job2 20
    job2 30
    job2 40
    job2 50
    job2 60
    job2 70
    job2 80
    job2 90
    job2 100
    job2 110
    

    七 完整代码示例

    :建议在运行 main.py 对应的代码功能时,逐行使用注释进行操作。

    # This is a sample Python script.
    
    # Press ⌃R to execute it or replace it with your code.
    # Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.
    
    import threading
    import time
    from queue import Queue
    import copy
    
    
    def print_hi(name):
        # Use a breakpoint in the code line below to debug your script.
        print(f'Hi, {name}')  # Press ⌘F8 to toggle the breakpoint.
    
        # 添加线程
        # 获取已激活的线程数
        print("已激活的线程数 ", threading.active_count())
        # 查看所有线程信息
        print("所有线程信息 ", threading.enumerate())
        # 查看现在正在运行的线程
        print("现在正在运行的线程 ", threading.current_thread())
    
    
    def thread_job():
        print(' 1 This is a thread of %s' % threading.current_thread())
    
    
    def my_thread():
        # target 为目标函数,想要调用的任务方法。name 为当前线程名称。
        thread = threading.Thread(target=thread_job, name='我新建的线程')  # 定义线程
        thread.start()  # 让线程开始工作
        # time.sleep(10)
    
    
    def thread_job02():
        print(' 2 This is a thread of %s' % threading.current_thread())
        print("thread_job02 T1 start\n")
        for i in range(10):
            time.sleep(0.1)  # 任务间隔0.1s
        print("thread_job02 T1 finish\n")
    
    
    def job02_thread():
        # join 功能
        # 不加 join 功能
        # 预想的结果
        # T1 start
        # T1 finish
        # all done
        # 实际的结果
        # T1 start
        # all done
        # T1 finish
        added_thread = threading.Thread(target=thread_job02, name='T1')
        added_thread.start()
        # 加 join 功能
        added_thread.join()
        print("job02_thread all done\n")
        # join 控制多个线程的执行顺序
    
    
    def T1_job():
        print("T1_job T1 start\n")
        for i in range(10):
            time.sleep(0.1)
        print("T1_job T1 finish\n")
    
    
    def T2_job():
        print("T2_job T2 start\n")
        print("T2_job T2 finish\n")
    
    
    def T1T2_job_thread():
        thread_1 = threading.Thread(target=T1_job, name='T1')
        thread_2 = threading.Thread(target=T2_job, name='T2')
        thread_1.start()  # 开启T1
        thread_2.start()  # 开启T2
        # 不加 join 打印信息的前后顺序,取决去线程处理数据的速度
        # 在写 join 的时候可以把处理数据量少的写在前面,主要目的是减少主线程或其他依赖线程的等待时间
        thread_2.join()
        thread_1.join()
        print("T1T2_job_thread all done\n")
    
    
    # 参数 一个列表l和一个队列q
    def job(l, q):
        for i in range(len(l)):
            # 给列表元素作平方计算
            l[i] = l[i] ** 2
        q.put(l)  # 多线程调用的函数不能用return返回值
    
    
    def multithreading():
        q = Queue()  # q中存放返回值,代替return的返回值
        threads = []
        data = [[1, 2, 3], [3, 4, 5], [4, 4, 4], [5, 5, 5]]
    
        for i in range(4):
            # 定义四个线程, Thread首字母要大写,被调用的job函数没有括号,只是一个索引,参数在后面
            t = threading.Thread(target=job, args=(data[i], q))
            # 开始线程
            t.start()
            # 把每个线程append到线程列表中
            threads.append(t)
    
        for thread in threads:
            # 阻塞当前线程,主线程暂停执行(或调用 join() 的线程)
            thread.join()
        results = []
        for _ in range(4):
            results.append(q.get())  # q.get()按顺序从q中拿出一个值
        print(results)
    
    
    def job02(l, q):
        res = sum(l)
        q.put(res)
    
    
    def multithreading02(l):
        q = Queue()
        threads = []
        for i in range(4):
            t = threading.Thread(target=job02, args=(copy.copy(l), q), name='T%i' % i)
            t.start()
            threads.append(t)
        [t.join() for t in threads]
        total = 0
        for _ in range(4):
            total += q.get()
        print(total)
    
    
    def normal02(l):
        total = sum(l)
        print(total)
    
    
    def job1_G():
        global A
        for i in range(10):
            A += 1
            print('job1', A)
    
    
    def job2_G():
        global A
        for i in range(10):
            A += 10
            print('job2', A)
    
    
    A = 0
    lock = threading.Lock()
    
    
    def job_1_2_G():
        t1 = threading.Thread(target=job1_G)
        t2 = threading.Thread(target=job2_G)
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    
    
    def job1_L():
        global A, lock
        lock.acquire()
        try:
            for i in range(10):
                A += 1
                print('job1', A)
        finally:
            lock.release()
    
    
    def job2_L():
        global A, lock
        # 使用 with 语句可以简化 acquire 和 release 的过程,使代码更简洁清晰。推荐
        with lock:
            for i in range(10):
                A += 10
                print('job2', A)
    
    
    def job_1_2_L():
        t1 = threading.Thread(target=job1_L)
        t2 = threading.Thread(target=job2_L)
        t1.start()
        t2.start()
        t1.join()
        t2.join()
    
    
    if __name__ == '__main__':
        print_hi('什么是多线程 Threading')
        my_thread()
        job02_thread()
        T1T2_job_thread()
        multithreading()
        
        l = list(range(1000000))
        s_t = time.time()
        normal02(l * 4)
        print('normal02: ', time.time() - s_t)
        s_t = time.time()
        multithreading02(l)
        print('multithreading02: ', time.time() - s_t)
    
        job_1_2_G()
        job_1_2_L()
    
    # See PyCharm help at https://www.jetbrains.com/help/pycharm/
    

    复制粘贴并覆盖到你的 main.py 中运行,运行结果如下。

    Hi, 什么是多线程 Threading
    已激活的线程数  1
    所有线程信息  [<_MainThread(MainThread, started 8006736960)>]
    现在正在运行的线程  <_MainThread(MainThread, started 8006736960)>
     1 This is a thread of <Thread(我新建的线程, started 6150959104)>
     2 This is a thread of <Thread(T1, started 6150959104)>
    thread_job02 T1 start
    
    thread_job02 T1 finish
    
    job02_thread all done
    
    T1_job T1 start
    
    T2_job T2 start
    
    T2_job T2 finish
    
    T1_job T1 finish
    
    T1T2_job_thread all done
    
    [[1, 4, 9], [9, 16, 25], [16, 16, 16], [25, 25, 25]]
    1999998000000
    normal02:  0.042340993881225586
    1999998000000
    multithreading02:  0.027905941009521484
    job1 1
    job1 2
    job1 3
    job1 4
    job1 5
    job1job2 16
    job2 26
    job2 36
    job2 46
    job2 56
    job2 66
    job2 76
    job2 86
    job2 96
    job2 106
     6
    job1 107
    job1 108
    job1 109
    job1 110
    job1 111
    job1 112
    job1 113
    job1 114
    job1 115
    job1 116
    job1 117
    job1 118
    job1 119
    job1 120
    job2 130
    job2 140
    job2 150
    job2 160
    job2 170
    job2 180
    job2 190
    job2 200
    job2 210
    job2 220
    

    八 源码地址

    代码地址:

    国内看 Gitee 之 什么是多线程 Threading.py

    国外看 GitHub 之 什么是多线程 Threading.py

    引用 莫烦 Python

    作者:敲代码不忘补水

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python 多线程编程详解:用 Threading 优雅提升程序并发性能

    发表回复