【Python】多线程(线程安全和锁)

线程安全和锁

一、全局解释器锁

首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。

GIL全称global interpreter lock,全局解释器锁。

每个线程在执行的时候都需要先获取GIL,保证同一时刻只有一个线程可以执行代码,即同一时刻只有一个线程使用CPU。在CPython中,每一个Python线程执行前都需要去获得GIL锁 ,获得该锁的线程才可以执行,没有获得的只能等待 ,当具有GIL锁的线程运行完成后,其他等待的线程就会去争夺GIL锁,这就造成了,在Python中使用多线程,但同一时刻下依旧只有一个线程在运行 ,所以Python多线程其实并不是「并行」的,而是「并发」 。

看到下图,图中是Python中GIL的工作实例,其中有3个线程,线程与线程之间是顺序执行的 ,每个线程开始执行时都会去获得GIL,防止其他线程线程运行 ,每执行完一段时间后,就会释放GIL,让别的线程可以去争夺执行权限,如果自己本身也没有执行完,则本身也会参与这次争夺 。

image.png

# 多线程的代码
import threading, time


def add(n):
    sum = 0
    while sum < n:
        sum += 1
    print(f'sum:{sum}')


if __name__ == '__main__':
    start = time.time()
    n = 500000000
    t1 = threading.Thread(target=add, args=[n // 2])
    t2 = threading.Thread(target=add, args=[n // 2])
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print('run time: %s' % str(time.time() - start))

image.png

# 单线程的代码
import time


def add(n):
    sum = 0
    while sum < n:
        sum += 1
    print(f'sum:{sum}')


if __name__ == '__main__':
    start = time.time()
    add(500000000)
    print('run time: %s' % str(time.time() - start))

image.png

总结

  • GIL解决方法:

  • 使用其他语言写的python解释器(不推荐,还是用官方CPython好)
    eg:Jython(java);IronPython(.net);pypy(Python)
  • 不使用多线程,使用多进程-进程里加协程实现多任务来充分利用多核CPU (推荐)
  • 即使存在GIL 在有IO等待操作的程序中,还是多线程快,当然没有资源等待的还是单线程快(科学计算,累加等等)

  • 但需要注意的是线程有了GIL后并不意味着使用Python多线程时不需要考虑线程安全 ,「GIL的存在是为了方便使用C语言编写CPython解释器的编写者,而顶层使用Python时依旧要考虑线程安全」 。

    二、线程安全

    当多个线程同时访问一个对象时,不管如何计算,如果调用这个对象的行为都可以获得正确的结果,那就称这个对象时线程安全的。 如果出现了“脏数据”。则线程不安全。

    脏数据 :产生脏数据的原因是,当一个线程在对数据进行修改时,修改到一半时另一个线程读取了未经修改的数据并进行修改。如何避免脏数据的产生呢?一个办法就是用join方法,即先让一个线程执行完毕再执行另一个线程。但这样的本质是把多线程变成了单线程,失去了多线程的意义。另一个办法就是用线程锁。

    import threading
    
    g_number = 0
    
    
    def hello():
        global g_number
        for i in range(1000000):  # 加的次数越大越容易出现资源竞争问题
            g_number += 1
        print(f'thd1运行的结果为:{g_number}')
    
    
    def world():
        global g_number
        for i in range(1000000):
            g_number += 1
        print(f'thd2运行的结果为:{g_number}')
    
    
    if __name__ == '__main__':
        thd1 = threading.Thread(target=hello)
        thd2 = threading.Thread(target=world)
    
        thd1.start()
        thd2.start()
    
        # 阻塞等待
        thd1.join()
        thd2.join()
        print(g_number)  # 结果随机 可能小于等于2000000
    
    

    三、锁

    锁是Python提供给我们能够自行操控线程切换的一种手段,使用锁可以让线程的切换变的有序。

    一旦线程的切换变的有序后,各个线程之间对数据的访问、修改就变的可控,所以若要保证线程安全,就必须使用锁。

    threading模块中提供了5种最常见的锁,下面是按照功能进行划分:

  • 同步锁:lock(一次只能放行一个)
  • 递归锁:rlock(一次只能放行一个)
  • 条件锁:condition(一次可以放行任意个)
  • 事件锁:event(一次全部放行)
  • 信号量锁:semaphore(一次可以放行特定个)
  • 1、同步锁

    同一时刻的一个进程下的一个线程只能使用一个cpu,要确保这个线程下的程序在一段时间内被cpu执,那么就要用到同步锁。只需要在对公共数据的操作前后加上上锁和释放锁的操作即可。

    死锁: 指两个或两个以上的线程或进程在执行程序的过程中,因争夺资源而相互等待的一个现象。

    import threading
    
    g_number = 0
    lock = threading.Lock()
    
    def hello():
        global g_number
        for i in range(1000000):  # 加的次数越大越容易出现资源竞争问题
            with lock:
                g_number += 1
        print(f'thd1运行的结果为:{g_number}')
    
    
    def world():
        global g_number
        for i in range(1000000):
            with lock:
                g_number += 1
        print(f'thd2运行的结果为:{g_number}')
    
    
    if __name__ == '__main__':
        thd1 = threading.Thread(target=hello)
        thd2 = threading.Thread(target=world)
    
        thd1.start()
        thd2.start()
    
        # 阻塞等待
        thd1.join()
        thd2.join()
        print(g_number)  # 结果随机 可能小于等于2000000
    
    

    2、递归同步锁

    在同步锁的基础上可以做到连续重复使用多次acquire()后再重复使用多次release()的操作,但是一定要注意加锁次数和解锁次数必须一致,否则也将引发死锁现象。

    递归锁RLock:它内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。

    3、条件锁

    条件锁是在递归锁的基础上增加了能够暂停线程运行的功能。并且我们可以使用wait()与notify()来控制线程执行的个数。

    注意:条件锁可以自由设定一次放行几个线程。

    import threading
    
    currentRunThreadNumber = 0
    maxSubThreadNumber = 10
    
    
    def task():
        global currentRunThreadNumber
        thread_name = threading.currentThread().name
    
        with condLock:
            print("线程开始启动,并马上进入等待状态 : %s" % thread_name)
            condLock.wait()  # 暂停线程运行、等待唤醒
            print("线程唤醒了,开始运行后面的代码 : %s" % thread_name)
            currentRunThreadNumber += 1
    
    
    if __name__ == "__main__":
        condLock = threading.Condition()
    
        for i in range(maxSubThreadNumber):
            subThreadIns = threading.Thread(target=task)
            subThreadIns.start()
    
        while currentRunThreadNumber < maxSubThreadNumber:
            notifyNumber = int(
                input("请输入要唤醒几个线程:"))
    
            with condLock:
                condLock.notify(notifyNumber)  # 放行
    
        print("main thread run end")
    
    

    4、事件锁

    事件锁是基于条件锁来做的,它与条件锁的区别在于一次只能放行全部,不能放行任意个数量的子线程继续运行。

    我们可以将事件锁看为红绿灯,当红灯时所有子线程都暂停运行,并进入“等待”状态,当绿灯时所有子线程都恢复“运行”。

    image.png

    import threading
    
    maxSubThreadNumber = 3
    
    
    def task():
        thread_name = threading.currentThread().name
        print("线程开始启动,并马上进入等待状态 : %s" % thread_name)
        eventLock.wait()  # 暂停运行,等待绿灯
        print("第一次绿灯打开,线程往下走:%s" % thread_name)
        eventLock.wait()  # 暂停运行,等待绿灯
        print("第二次绿灯打开,线程往下走:%s" % thread_name)
    
    
    if __name__ == "__main__":
        eventLock = threading.Event()
        for i in range(maxSubThreadNumber):
            subThreadIns = threading.Thread(target=task)
            subThreadIns.start()
        eventLock.set()  # 设置为绿灯
        eventLock.clear()  # 设置为红灯
        eventLock.set()
    
    
    

    5、信号量锁

    Semaphore()

    信号量锁也是根据条件锁来做的,它与条件锁和事件锁的区别如下:

  • 条件锁:一次可以放行任意个处于“等待”状态的线程
  • 事件锁:一次可以放行全部的处于“等待”状态的线程
  • 信号量锁:通过规定,成批的放行特定(指定)个处于“上锁”状态的线程
  • import threading
    import time
    
    maxSubThreadNumber = 6
    
    
    def task():
        thread_name = threading.currentThread().name
        with semaLock:
            print("线程获得锁,开始运行: %s" % thread_name)
            time.sleep(3)
    
    
    if __name__ == "__main__":
    
        semaLock = threading.Semaphore(2)
    
        for i in range(maxSubThreadNumber):
            subThreadIns = threading.Thread(target=task)
            subThreadIns.start()
    

    作者:道友老李

    物联沃分享整理
    物联沃-IOTWORD物联网 » 【Python】多线程(线程安全和锁)

    发表回复