Python协程并发编程详解

文章目录

  • 1 协程
  • 1.1 简介
  • 1.2 协程优势&分类
  • 1.2.1 优势
  • 1.2.2 分类
  • 1.3 generator协程
  • 1.4 gevent协程
  • 1.5 asyncio
  • 1.5.1 简介
  • 1.5.2 asyncio函数
  • 1.5.3 async\await
  • 1.5.4 asyncio基本操作
  • 1.5.4.1 asyncio协程对象
  • 1.5.4.2 task对象
  • 1.5.4.3 future对象
  • 1.5.4.4 绑定回调
  • 1.5.4.5 异步多任务
  • 1.5.4.6 asyncio.gather和asyncio.wait 区别
  • 1 协程

    1.1 简介

    协程,又称微线程,纤程。英文名Coroutine
    协程的概念很早就提出来了,但直到最近几年才在某些语言(如Lua)中得到广泛应用。
    子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。
    所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。
    子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。
    协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。

    注意,在一个子程序中中断,去执行其他子程序,不是函数调用,有点类似CPU的中断。比如子程序A、B:

    def A():
        print '1'
        print '2'
        print '3'
    
    def B():
        print 'x'
        print 'y'
        print 'z'
    
    假设由协程执行,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,结果可能是:
    
    1
    2
    x
    y
    3
    z
    

    但是在A中是没有调用B的,所以协程的调用比函数调用理解起来要难一些。

    看起来A、B的执行有点像多线程,但协程的特点在于是一个线程执行,那和多线程比,协程有何优势?

    1.2 协程优势&分类

    1.2.1 优势

  • 最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
  • 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。
  • 因为协程是一个线程执行,那怎么利用多核CPU呢,最简单的方法是多进程+协程,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。
  • 1.2.2 分类

    协程有两种,一种无栈协程,python中以 asyncio 为代表, 一种有栈协程,python 中 以 gevent 为代表

    有栈线程 无栈线程 备注
    示例 lua thread
    python gevent
    C# yield return
    C# async\await
    python asyncio
    是否拥有单独的上下文 上下文包括寄存器、栈帧
    局部变量保存位置 无栈协程的局部变量保存在堆上,比如generator的数据成员
    优点 1. 每个协程有单独的上下文,可以在任意的嵌套函数中任何地方挂起此协程。
    2. 不需要编译器做语法支持,通过汇编指令即可实现
    1. 不需要为每个协程保存单独的上下文,内存占用低。
    2. 切换成本低,性能更高
    缺点 1. 需要提前分配一定大小的堆内存保存每个协程上下文,所以会出现内存浪费或者栈溢出。
    2. 上下文拷贝和切换成本高,性能低于无栈协程
    1. 需要编译器提供语义支持,比如C# yield return语法糖。
    2. 只能在这个生成器内挂起此协程,无法在嵌套函数中挂起此协程。
    3. 关键字有一定传染性,异步代码必须都有对应的关键字。作为对比,有栈协程只需要做对应的函数调用
    无栈协程无法在嵌套函数中挂起此协程,有栈协程由于是通过保存和切换上下文包括寄存器和执行栈实现,可以在协程函数的嵌套函数内部yield这个协程并唤醒。

    1.3 generator协程

    Python对协程的支持还非常有限,用在generator中的yield可以一定程度上实现协程。虽然支持不完全,但已经可以发挥相当大的威力了。

    来看例子:
    传统的生产者-消费者模型是一个线程写消息,一个线程取消息,通过锁机制控制队列和等待,但一不小心就可能死锁。
    如果改用协程,生产者生产消息后,直接通过yield跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高:

    import time
    
    def consumer():
        r = ''
        while True:
            n = yield r
            if not n:
                return
            print('[CONSUMER] Consuming %s...' % n)
            time.sleep(1)
            r = '200 OK'
    
    def produce(c):
        next(c)
        n = 0
        while n < 5:
            n = n + 1
            print('[PRODUCER] Producing %s...' % n)
            r = c.send(n)
            print('[PRODUCER] Consumer return: %s' % r)
        c.close()
    
    if __name__=='__main__':
        c = consumer()
        produce(c)
    
    
    执行结果:
    [PRODUCER] Producing 1...
    [CONSUMER] Consuming 1...
    [PRODUCER] Consumer return: 200 OK
    [PRODUCER] Producing 2...
    [CONSUMER] Consuming 2...
    [PRODUCER] Consumer return: 200 OK
    [PRODUCER] Producing 3...
    [CONSUMER] Consuming 3...
    [PRODUCER] Consumer return: 200 OK
    [PRODUCER] Producing 4...
    [CONSUMER] Consuming 4...
    [PRODUCER] Consumer return: 200 OK
    [PRODUCER] Producing 5...
    [CONSUMER] Consuming 5...
    [PRODUCER] Consumer return: 200 OK
    

    注意到consumer函数是一个generator(生成器),把一个consumer传入produce后:

  • 首先调用c.next()启动生成器;
  • 然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
  • consumer通过yield拿到消息,处理,又通过yield把结果传回;
  • produce拿到consumer处理的结果,继续生产下一条消息;
  • produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
  • 整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为协程,而非线程的抢占式多任务。

    1.4 gevent协程

    Python通过yield提供了对协程的基本支持,但是不完全。而第三方的gevent为Python提供了比较完善的协程支持。
    gevent是第三方库,通过greenlet实现协程,其基本思想是:

    当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

    由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成:

    from gevent import monkey; spawn
    monkey.patch_all()  # 替换标准库以使用 gevent 兼容的版本
    
    def f(n):
        for i in range(n):
            print(gevent.getcurrent(), i)
    
    g1 = spawn(f, 5)
    g2 = spawn(f, 5)
    g3 = spawn(f, 5)
    g1.join()
    g2.join()
    g3.join()
    
    运行结果:
    <Greenlet at 0x10e49f550: f(5)> 0
    <Greenlet at 0x10e49f550: f(5)> 1
    <Greenlet at 0x10e49f550: f(5)> 2
    <Greenlet at 0x10e49f550: f(5)> 3
    <Greenlet at 0x10e49f550: f(5)> 4
    <Greenlet at 0x10e49f910: f(5)> 0
    <Greenlet at 0x10e49f910: f(5)> 1
    <Greenlet at 0x10e49f910: f(5)> 2
    <Greenlet at 0x10e49f910: f(5)> 3
    <Greenlet at 0x10e49f910: f(5)> 4
    <Greenlet at 0x10e49f4b0: f(5)> 0
    <Greenlet at 0x10e49f4b0: f(5)> 1
    <Greenlet at 0x10e49f4b0: f(5)> 2
    <Greenlet at 0x10e49f4b0: f(5)> 3
    <Greenlet at 0x10e49f4b0: f(5)> 4
    

    可以看到,3个greenlet是依次运行而不是交替运行。

    要让greenlet交替运行,可以通过gevent.sleep()交出控制权:

    def f(n):
        for i in range(n):
            print (gevent.getcurrent(), i)
            gevent.sleep(0)
    
    执行结果:
    
    <Greenlet at 0x10cd58550: f(5)> 0
    <Greenlet at 0x10cd58910: f(5)> 0
    <Greenlet at 0x10cd584b0: f(5)> 0
    <Greenlet at 0x10cd58550: f(5)> 1
    <Greenlet at 0x10cd584b0: f(5)> 1
    <Greenlet at 0x10cd58910: f(5)> 1
    <Greenlet at 0x10cd58550: f(5)> 2
    <Greenlet at 0x10cd58910: f(5)> 2
    <Greenlet at 0x10cd584b0: f(5)> 2
    <Greenlet at 0x10cd58550: f(5)> 3
    <Greenlet at 0x10cd584b0: f(5)> 3
    <Greenlet at 0x10cd58910: f(5)> 3
    <Greenlet at 0x10cd58550: f(5)> 4
    <Greenlet at 0x10cd58910: f(5)> 4
    <Greenlet at 0x10cd584b0: f(5)> 4
    

    3个greenlet交替运行,把循环次数改为500000,让它们的运行时间长一点,然后在操作系统的进程管理器中看,线程数只有1个。
    当然,实际代码里,我们不会用gevent.sleep()去切换协程,而是在执行到IO操作时,gevent自动切换,代码如下:

    from gevent import monkey
    monkey.patch_all()
    
    import gevent
    import urllib.request
    
    
    def f(url):
        print('GET: {}'.format(url))
        resp = urllib.request.urlopen(url)
        data = resp.read()
        print('{} bytes received from {}.'.format(len(data), url))
    
    gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://github.com/'),
    ])
    
    运行结果:
    
    GET: https://www.python.org/
    GET: https://www.yahoo.com/
    GET: https://github.com/
    45661 bytes received from https://www.python.org/.
    14823 bytes received from https://github.com/.
    304034 bytes received from https://www.yahoo.com/.
    

    从结果看,3个网络操作是并发执行的,而且结束顺序不同,但只有一个线程。

    注意gevent只能在Unix/Linux下运行,在Windows下不保证正常安装和运行。

    1.5 asyncio

    1.5.1 简介

    asyncio 是用来编写并发代码的库,使用 async/await 语法。
    asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。
    asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。

    1.5.2 asyncio函数

    使用协程中的一般概念:

  • event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足某些条件的时候,函数就会被循环执行
  • coroutine:协程对象,我们可以将协程对象注册到事件循环中,它会被事件循环调用。可以使用async关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象
  • task:任务,它是对协程对象的进一步封装,包含了任务的各个状态
  • future:代表将来执行或还没有执行的任务,实际上和task没有本质区别
  • async:定义一个协程,不会立即执行
  • await:用来挂起阻塞方法的执行
  • 事件循环函数(包括循环的创建、运行和停止)

  • asyncio.get_running_loop():函数返回当前 OS 线程中正在运行的事件循环。
  • asyncio.get_event_loop():函数获取当前事件循环。
  • asyncio.set_event_loop(loop):函数将 loop 设置为当前 OS 线程的当前事件循环。
  • asyncio.new_event_loop():函数创建一个新的事件循环。
  • loop.run_until_complete(future):函数运行直到 future (Future 的实例) 被完成。
  • loop.run_forever():函数运行事件循环直到 stop() 被调用。
  • loop.stop():函数停止事件循环。
  • loop.is_running():函数返回 True 如果事件循环当前正在运行。
  • loop.is_closed():函数如果事件循环已经被关闭,返回 True 。
  • loop.close():函数关闭事件循环。
  • loop.create_future():函数创建一个附加到事件循环中的 asyncio.Future 对象。
  • loop.create_task(coro, *, name=None):函数安排一个 协程 的执行。返回一个 Task 对象。
  • loop.set_task_factory(factory):函数设置一个 task 工厂 , 被用于 loop.create_task()
  • loop.get_task_factory():函数返回一个任务工厂,或者如果是使用默认值则返回 None。
  • 1.5.3 async\await

    async 关键字:

  • async 用于声明一个函数为异步函数。这意味着该函数在执行过程中可能会遇到需要等待的操作(如I/O操作、网络请求、等待用户输入等),而不会阻塞整个程序的执行。
  • 异步函数内部可以包含await表达式,用于等待其他异步操作完成。
  • await 关键字:

  • await用于等待一个异步操作(即一个FutureTask、或其他可等待对象,如另一个异步函数)的完成。它只能在异步函数内部使用。
  • await表达式被调用时,它会暂停当前异步函数的执行,直到等待的异步操作完成。在等待期间,事件循环会继续运行,并可能执行其他异步任务。
  • 一旦等待的异步操作完成,await表达式会获取其结果(或捕获其抛出的异常),然后异步函数会从await表达式之后继续执行。
  • 挂起与恢复:

  • await一个异步操作时,当前协程(即异步函数)会被挂起,控制权交还给事件循环。事件循环可以调度其他协程运行,包括正在等待的异步操作本身。
  • 当异步操作完成时,事件循环会恢复挂起的协程,并从await表达式之后继续执行。
  • 关于await后面跟的函数的性质:

  • await后面必须跟一个可等待对象,这通常是一个异步函数(用async def定义)的调用结果,因为它会返回一个Future或Task对象,这些对象是可等待的。
  • 如果await后面跟的不是异步函数或可等待对象,那么Python解释器会抛出一个TypeError,因为非异步函数或不可等待对象不能被await。
  • 调用非异步函数时,如果该函数执行时间较长,那么它会阻塞当前线程(在asyncio中,这通常意味着阻塞当前协程的事件循环),直到函数执行完成。因此,在异步编程中,应该尽量避免在异步函数内部调用非异步的、可能阻塞的函数。
  • 1.5.4 asyncio基本操作

    1.5.4.1 asyncio协程对象

    使用async定义一个协程对象,并创建一个事件循环对象

    import asyncio
    #定义协程对象
    async def get_request(url):
        print("正在请求的url是:",url)
        print('请求成功的url:',url)
        return url
    #得到协程对象
    coroutine_obj=get_request('www.baidu.com')
    #创建一个事件循环对象
    loop=asyncio.get_event_loop()
    #将协程对象注册到loop中,并启动loop
    loop.run_until_complete(coroutine_obj)
    loop.close()
    
    1.5.4.2 task对象

    task对象需要loop对象基础上建立起来

    import asyncio
    #定义协程对象
    async def get_request(url):
        print("正在请求的url是:",url)
        print('请求成功的url:',url)
        return url
    #得到协程对象
    coroutine_obj=get_request('www.baidu.com')
    
    #创建一个事件循环对象
    loop=asyncio.get_event_loop()
    #基于loop创建了一个task对象
    task=loop.create_task(coroutine_obj)
    print(task)
    #基于loop注册任务
    loop.run_until_complete(task)
    print(task)
    loop.close()
    
    1.5.4.3 future对象

    主要函数:

  • asyncio.Future(*, loop=None):函数是一个 Future 代表一个异步运算的最终结果。线程不安全。
  • asyncio.isfuture(obj):函数用来判断如果 obj 为一个 asyncio.Future 类的示例、 asyncio.Task 类的实例或者一个具有 _asyncio_future_blocking 属性的对象,返回 True。
  • asyncio.ensure_future(obj, *, loop=None):函数创建新任务。
  • asyncio.wrap_future(future, *, loop=None):函数将一个 concurrent.futures.Future 对象封装到 asyncio.Future 对象中。
  • Future 对象相关函数:

  • fut.result():函数返回 Future 的结果。
  • fut.set_result(result):函数将 Future 标记为 完成 并设置结果。
  • fut.set_exception(exception):函数将 Future 标记为 完成 并设置一个异常。
  • fut.done():函数如果 Future 为已 完成 则返回 True 。
  • fut.cancelled():函数是如果 Future 已取消则返回 True
  • fut.add_done_callback(callback, *, context=None):函数添加一个在 Future 完成 时运行的回调函数。
  • fut.remove_done_callback(callback):函数从回调列表中移除 callback 。
  • fut.cancel():函数取消 Future 并调度回调函数。
  • fut.exception():函数返回 Future 已设置的异常。
  • fut.get_loop():函数返回 Future 对象已绑定的事件循环。
  • future对象与task对象不同的是创建基于asyncio空间来创建的

    import asyncio
    #定义协程对象
    async def get_request(url):
        print("正在请求的url是:",url)
        print('请求成功的url:',url)
        return url
    #得到协程对象
    coroutine_obj=get_request('www.baidu.com')
    
    #创建一个事件循环对象
    loop=asyncio.get_event_loop()
    #基于loop创建了一个task对象
    future=asyncio.ensure_future(coroutine_obj)
    print(future)
    loop.run_until_complete(future)
    print(future)
    loop.close()
    

    或者示例:

    import sys
    import asyncio
    import time
     
    # 一个对future进行赋值的函数
    async def slow_operation(future, num):
        await asyncio.sleep(1)
        # 给future赋值
        future.set_result('Future'+ str(num) +' is done!')
     
    def main():
        loop = asyncio.get_event_loop()
        # 创建一个future
        future1 = loop.create_future()
        # 使用ensure_future 创建Task
        asyncio.ensure_future(slow_operation(future1, 1))
     
        future2 = loop.create_future()
        asyncio.ensure_future(slow_operation(future2, 2))
     
        # gather Tasks,并通过run_uniti_complete来启动、终止loop
        loop.run_until_complete(asyncio.gather(future1, future2))
     
        print(future1.result())
        print(future2.result())
     
        loop.close()
     
    if __name__ == "__main__":
        main()
    
    1.5.4.4 绑定回调

    在使用task或者future绑定回调时,需要先定义回调函数
    回调函数中返回的result方法就是任务对象中封装的协程对象对应的函数返回值
    注意:回调函数必须有返回值,不然result方法就没有值

    def callback_func(task):
        print(task.result())
    

    在使用task或者future绑定回调时,都可以使用方法绑定task.add_done_callback(callback_func)

    import asyncio
    #定义协程对象
    async def get_request(url):
        print("正在请求的url是:",url)
        print('请求成功的url:',url)
        return url
    #得到协程对象
    coroutine_obj=get_request('www.baidu.com')
    loop=asyncio.get_event_loop()
    future=asyncio.ensure_future(coroutine_obj)
    #把回调函数绑定到任务对象中
    future.add_done_callback(callback_func)
    loop.run_until_complete(future)
    loop.close()
    
    1.5.4.5 异步多任务

    在一个异步函数中,可以不止一次挂起,也就是可以用多个await
    多任务时,对于run_until_complete方法需要这样用asyncio.wait()方法处理:loop.run_until_complete(asyncio.wait(task_list))
    代码示例:

    import time
    import asyncio
    async def get_request(url):
        print("正在请求的url是:",url)
        #在异步协程中如果出现了同步模块相关代码,那么就无法实现异步
        # time.sleep(2)
        #当在asyncio中遇到阻塞操作就必须进行手动挂起
        await asyncio.sleep(2)
        print('请求成功的url:',url)    
    start_time=time.time()
    urls=['www.baidu.com','www.sogou.com','www.goubanjia.com']
    
    #任务列表
    task_list=[]
    for url in urls:
        coroutine_obj=get_request(url)
        future=asyncio.ensure_future(coroutine_obj)
        task_list.append(future)
    loop=asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(task_list))
    loop.close()
    print(time.time()-start_time)
    
    1.5.4.6 asyncio.gather和asyncio.wait 区别

    asyncio.gatherasyncio.wait都是asyncio库中用于处理异步任务(协程)的重要函数,但它们之间存在一些关键的区别。以下是这两个函数之间的主要差异:

  • 返回值
  • asyncio.gather:该函数并发运行多个协程,并等待它们全部完成。它返回一个Future对象,当所有协程都完成时,这个Future对象的结果是一个包含所有协程返回值的列表(列表的顺序与传入的协程列表相同)。如果任何一个协程抛出异常,则gather会立即取消所有其他仍在运行的协程,并重新抛出那个异常。
  • asyncio.wait:该函数也是用于并发运行多个协程,但它返回的是一个包含两个集合的元组:done和pending。done是一个已完成的协程(Future或Task)列表,而pending是一个尚未完成的协程列表。这意味着需要从done列表中的每个协程中调用result()方法来获取结果,而pending列表中的协程则需要你进一步处理或等待。
  • 使用场景
  • asyncio.gather:当需要同时运行多个协程并立即获取它们的结果时,gather是一个很好的选择。它简化了结果收集的过程,并允许同步的方式处理异步操作的结果。
  • asyncio.wait:当需要更细粒度的控制时,比如在某个超时后取消未完成的协程,或者要分别处理已完成和未完成的协程时,wait是一个更合适的选择。然而,使用wait需要更多的手动操作来收集结果或处理未完成的协程。
  • 额外功能
  • asyncio.gather:除了基本的并发执行和结果收集功能外,gather还支持一些额外的参数,如return_exceptions(默认为False),当设置为True时,如果协程抛出异常,异常会被捕获并作为结果列表中的一项返回,而不是中断整个gather操作。
  • asyncio.waitwait函数也接受一些参数,如timeout(指定等待时间,默认为None,表示无限期等待)和return_when(指定在什么条件下返回,如FIRST_COMPLETED、ALL_COMPLETED等)。这些参数提供了对协程执行过程的更多控制。
  • 以下是使用asyncio.gather和asyncio.wait的示例代码:

    import asyncio  
      
    async def task(name, delay):  
        print(f"{name} started")  
        await asyncio.sleep(delay)  
        return f"{name} finished"  
      
    async def main():  
        # 使用 asyncio.gather  
        tasks = [task("A", 1), task("B", 2), task("C", 3)]  
        results = await asyncio.gather(*tasks)  
        print(results)  # 输出: ['A finished', 'B finished', 'C finished']  
      
        # 使用 asyncio.wait  
        tasks = [task("D", 1), task("E", 2), task("F", 3)]  
        done, pending = await asyncio.wait(tasks)  
        for d in done:  
            print(d.result())  # 分别打印每个已完成协程的结果  
        # 处理 pending 列表中的协程(这里只是打印出来)  
        for p in pending:  
            print(f"Pending: {p}")  
      
    asyncio.run(main())
    

    作者:爱吃牛肉的大老虎

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python协程并发编程详解

    发表回复