python asyncio (协程、异步编程)

文章目录

  • 1. 简介
  • 2. 什么是协程
  • 3. 为何异步
  • 4. 如何异步
  • 4.1 简单示例
  • 4.2 事件循环
  • 4.3 协程函数和协程对象
  • 4.4 await 关键字
  • 4.5 Task 对象
  • 4.6 asyncio 的 Future 对象
  • 4.7 concurrent.futures 的 Future 对象
  • 4.8 异步迭代器
  • 4.9 异步上下文管理器
  • 4.10 uvloop
  • 4.11 异步库
  • 1. 简介

    asyncio (Asynchronous I/O)模块是一个异步代码库,它提供了一种基于协程(coroutine)和事件循环(event loop)的编程模型。是很多python异步架构的基础,多用于处理高并发网络请求方面的问题

    2. 什么是协程

    协程(coroutine) 也被称为轻量级线程,是一种可以在单线程中实现多任务的编程方式(在用户态切换上下文)。它是一种特殊的函数或者方法,可以在执行过程中暂停,将控制权交还给调用者。
    这使得协程能够在执行过程中等待某些事件发生,比如等待 IO 操作完成,而不会阻塞其他任务的执行

    3. 为何异步

    1. 提高程序性能:异步编程可以充分利用计算资源,减少因为等待I/O操作而导致的阻塞
    2. 增强用户体验:在设计网络操作,用户界面或任何涉及等待耗时操作的情况下,异步编程可以使程序响应更加灵敏
    3. 节省资源:相对于创建大量线程或进程来处理并发任务,异步编程使用的资源更少

    4. 如何异步

    注意:下面使用的例子都是基于 Python3.5+ 来实现的,不适用于之前的 Python 版本。

    4.1 简单示例

    在学习异步之前,先看如下两个例子的对比,可以更直观的看出使用异步的优势。
    例子一:

    import time
    
    def func1():
        print('1')
        time.sleep(1)
        print('2')
    
    def func2():
        print('3')
        time.sleep(1)
        print('4')
    
    func1()
    func2()
    

    输出如下:这里立即输出了 1,等待了 1s 后,又输出了 2 和 3,然后又等待了 1s, 最后输出了 4

    1  # 这里立马输出 1,等待 1s 后才输出 2
    2  
    3  # 2 和 3 基本同时输出,然后等待 1s 后才输出 4
    4
    

    例子二:

    import asyncio
    
    
    async def func1():
        print('1')
        await asyncio.sleep(1)
        print('2')
    
    async def func2():
        print('3')
        await asyncio.sleep(1)
        print('4')
    
    task_list = [func1(), func2()]
    asyncio.run( asyncio.wait(task_list))
    

    输出如下:这里立即输出了 3 和 1,等待了 1s 后,输出了 4 和 2

    3 # 这里 3 和 1基本同时输出,等待 1s 后,4 和 2
    1
    4
    2
    

    第一个例子想必都能看懂,函数依次调用,遇到 time.sleep() 函数,则等待指定时间,然后继续执行,第一个例子大概用时 2s

    第二个例子则是使用了 asyncio 异步库,遇到 asyncio.sleep() 函数,则切换任务执行(先暂停执行这个函数,切换到别的函数执行,当指定等待的时间间隔过去后,在继续执行这个函数),第二个例子大概用时 1s

    4.2 事件循环

    可以简单的理解为一个 while True 循环(死循环),它会循环检测执行某些代码。
    只有当任务列表中的所有任务全部执行完,才会退出循环.

    import asyncio
    
    async def func():
        print("123456")
    
    # 获取一个事件循环的函数,异步事件循环可以管理和调度异步任务的执行
    loop = asyncio.get_event_loop()
    # 将任务放到 '任务列表',它会一直运行事件循环,直到 result 被完成为止
    # 这里的 result 应该是一个可等待对象(协程对象 或者 Future对象)
    # loop.run_until_complete(result)  # 这行是伪代码,真正的代码示例如下
    loop.run_until_complete(func())
    
    # 输出如下:
    123456
    

    4.3 协程函数和协程对象

    协程函数:使用 async def 语法定义,例如:async def func():
    协程对象:调用 协程函数 返回一个协程对象.

    # 这是一个协程函数
    async def func():
        pass
    
    # result 是一个协程对象 (注意:函数内部代码不会执行,只是返回一个协程对象)
    result = func()
    

    如果想要运行协程函数内部代码,必须要将协程对象交给事件循环来处理

    import asyncio
    
    async def func():
        print("执行协程函数 func")
    
    result = func()
    
    """
    loop = asyncio.get_event_loop()    # 获取事件循环
    loop.run_until_comlete( result )   # 运行事件循环
    """
    
    # 在 python3.7 之后,可以省略上面两行,直接执行下面一行函数即可
    asyncio.run(result)
    

    4.4 await 关键字

    await 关键字会暂停当前协程的执行(这个关键字只能在协程函数中使用),让出事件循环的控制权,以便其它协程或异步任务可以执行,一旦被等待的对象完成,await 表达式将返回对象的结果,并且当前协程将继续执行。

    await + 可等待的对象(协程对象、Task对象、Future对象)

    同一个协程任务中,多个await, 会依次等待可等待对象执行完成;不同协程任务中,遇到await会交替执行。

    例子一:

    import asyncio
    
    
    async def func():
        print("func() start")
        await asyncio.sleep(2)   # 这里模拟IO操作
        print("func() end")
        return "我是 func()"
    
    
    async def main():
        print("执行协程函数 main()")
    
        # 遇到IO操作挂起当前协程(任务),等待IO操作完成之后在继续往下执行,当前协程挂起,事件循环可以去执行其他的协程(任务)
        result = await func()
        print("io请求结果:", result)
    
    asyncio.run(main())
    

    执行结果如下:

    执行协程函数 main()
    func() start
    func() end
    io请求结果: 我是 func()
    

    例子二:
    一个协程函数中可以有多个 await,但是 await 是等待对应的函数得到结果后,才继续向下执行(同一协程任务中)

    import asyncio
    
    
    async def func(flags):
        print("func() start")
        await asyncio.sleep(2)   # 这里模拟IO操作
        print("func() end")
        return "我是 func(" + flags + ")"
    
    
    async def main():
        print("执行协程函数 main()")
    
        # 遇到IO操作挂起当前协程(任务),等待IO操作完成之后在继续往下执行,当前协程挂起,事件循环可以去执行其他的协程(任务)
        result = await func("1")
        print("IO第一次的请求结果:", result)
    
        result = await func("2")
        print("IO第二次的请求结果:", result)
    
    asyncio.run(main())
    

    执行结果如下:

    执行协程函数 main()
    func() start
    func() end
    IO第一次的请求结果: 我是 func(1)
    func() start
    func() end
    IO第二次的请求结果: 我是 func(2)
    

    4.5 Task 对象

    Task 对象用于表示一个可并发执行的异步任务。它是 asyncio 中的一个重要概念,用于管理协程的执行。Task对象用于包装协程。

    说简单点就是让协程加入事件循环中等待被调度执行,或者说在事件循环中添加任务等待被调度执行。事件循环会在适当的时机执行协程,并在协程遇到阻塞操作时挂起,转而执行其他可运行的任务,当阻塞操作完成,事件循环会恢复挂起的协程执行。

    asyncio.create_task() 函数是将协程对象包装成Task对象,该任务将被添加到默认的事件循环中,如果当前没有活动的事件循环,则会引发 RuntimeError 异常。

    除了asyncio.create_task() 函数,还可以用低层级的 loop.create_task() 或 asyncio.ensure_future() 函数。
    ● asyncio.create_task() 是一个全局函数,不依赖特定的事件循环。且它在Python3.7 中才被引入
    ● loop.create_task() 是事件循环的方法,需要先获取事件循环的引用
    ● asyncio.ensure_future() 虽然也是一个全局函数,但是它返回的是Future对象。python3.7 之前使用这个函数
    例子一:
    这里和 3.4 await 关键字: 例子二 中不同的是,这里通过 asyncio.create_task() 函数将多个协程任务添加到事件循环中,这里事件循环中是不同的协程任务,所以这里会交替执行。

    import asyncio
    
    async def func(arg):
        print(1)
        await asyncio.sleep(2)
        print(2)
        return "返回值: " + arg
    async def main():
        print("开始创建Task对象")
    
        # 创建 Task 对象, 将当前执行 func 函数任务添加到事件循环
        task1 = asyncio.create_task(func("task1"))
        task2 = asyncio.create_task(func("task2"))
    
        print("创建Task对象结束")
    
        # 当执行某协程遇到IO操作时, 会自动切换到其他任务
        # 此处的 await 是等待Task对象全都执行完毕并获取结果
        ret1 = await task1
        ret2 = await task2
        print(ret1, ret2)
    
    asyncio.run(main())
    

    执行结果如下:

    开始创建Task对象
    创建Task对象结束
    1
    1
    2
    2
    返回值: task1 返回值: task2
    

    例子二:
    asyncio.wait()asyncio模块中的一个函数,用于等待一组可等待对象完成,它返回一个Future对象,表示等待任务完成的结果。可以将任务列表(或者其他可迭代对象)作为参数传递给asyncio.wait() 函数。

    import asyncio
    
    async def func(arg):
        print(1)
        await asyncio.sleep(2)
        print(2)
        return "返回值" + arg
    
    async def main():
        print("开始创建Task对象")
    
        # 创建任务列表 task_list,列表中的每个元素都是一个Task对象
        task_list = [
            asyncio.create_task(func("task1")),
            asyncio.create_task(func("task2"))
        ]
    
        print("创建Task对象结束")
    
        # done: 是完成的 Task 对象,
        # pending: 是未完成的 Task 对象
        done, pending = await asyncio.wait(task_list, timeout=None)
        print(done)
        print(pending)
    
    asyncio.run( main() )
    

    执行结果如下

    开始创建Task对象
    创建Task对象结束
    1
    1
    2
    2
    # 这是已经完成的Task对象
    {<Task finished name='Task-2' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值task1'>, <Task finished name='Task-3' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值task2'>}
    # 这是未完成的Task对象,set() 表示集合为空(没有未完成的任务)
    set()
    

    asyncio.create_task() 函数有一个 name 的字段可以为创建的任务指定一个可选的名称,这样可以更容易和的识别和追踪特定的任务。

    import asyncio
    
    async def func():
        print(1)
        await asyncio.sleep(2)
        print(2)
        return "返回值"
    
    async def main():
        print("开始创建Task对象")
    
        # 创建 Task 对象, 将当前执行 func 函数任务添加到事件循环
        # name 给对象任务列表起名字,方便区分哪个任务结束
        task_list = [
            asyncio.create_task(func(), name="n1"),
            asyncio.create_task(func(), name="n2")
        ]
    
        print("创建Task对象结束")
    
        # done 是完成的 Task 对象,
        # pending 是未完成的 Task 对象
        done, pending = await asyncio.wait(task_list, timeout=None)
        print(done)
    
    
    asyncio.run( main() )
    
    # 输出如下:
    开始创建Task对象
    创建Task对象结束
    1
    1
    2
    2
    {<Task finished name='n1' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值'>, <Task finished name='n2' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值'>}
    

    例子三:
    例子三例子二 相比,task_list 中直接写任务名称,没有通过 asyncio.create_task() 创建Task对象,这是因为create_task() 函数创建任务之后会自动添加到事件循环中。这里定义的 task_list 列表还没有创建事件循环。并且 task_list 列表中的任务也并不需要手动调用 asyncio.create_task() 函数,因为 asyncio.task() 函数会隐式的将协程对象转换为任务(Task对象)。

    asyncio.wait() 的参数中,可以直接包含协程对象,而不必显示的将其转换为任务,它会自动的将协程对象封装成任务,并进行等待和管理。

    import asyncio
    
    async def func(arg):
        print(1)
        await asyncio.sleep(2)
        print(2)
        return "返回值" + arg
    
    task_list = [
        func("task1"),
        func("task2")
    ]
    
    """
    这里这样写是错误的,因为asyncio.create_task()是将协程对象包装成任务,并添加到
    事件循环中,但是这里事件循环还没有创建(asyncio.run() 才会创建事件循环),所以会报错
    task_list = [
        asyncio.create_task(func("task1")),
        asyncio.create_task(func("task2"))
    ]
    """
    done, pending = asyncio.run( asyncio.wait(task_list))
    print(done)
    

    执行结果如下:

    1
    1
    2
    2
    {<Task finished name='Task-2' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值task2'>, <Task finished name='Task-3' coro=<func() done, defined at D:\PycharmProjects\asyncio\main.py:3> result='返回值task1'>}
    

    4.6 asyncio 的 Future 对象

    asyncio 中的 Future 对象是一个更偏向底层的可等待对象,代表异步任务的最终结果。通常不会直接用到这个对象,而是直接使用Task对象来完成任务的创建和状态的追踪。

    它是 Task 对象的基类,Task 继承 Future, Task 对象内部 await 结果的处理基 于Future对象来的。

    import asyncio
    
    async def main():
        # 获取当前事件循环
        loop = asyncio.get_running_loop()
    
        # 创建一个任务(Future对象),这个任务什么都不做
        fut = loop.create_future()
    
        # 等待任务结果(Future 对象),没结果会一直等待下去
        await fut
    
    asyncio.run( main() )
    
    

    例子一:

    import asyncio
    
    async def set_after(fut: asyncio.Future):
        await asyncio.sleep(2)
        #
        fut.set_result("success")
    
    
    async def main():
        # 获取当前事件循环
        loop = asyncio.get_running_loop()
    
        # 创建一个任务(Future对象),没有绑定任何行为,这个任务永远不知道什么时候结束
        fut = loop.create_future()
    
        # 创建一个任务(Task 对象),绑定了 set_after 函数,函数内部在等待 2s 后,会给 fut 赋值
        # 即手动设置 Future 任务的最终结果,那么 fut 就可以结束了
        await loop.create_task(set_after(fut))
    
        # 等待 Future 对象获取最终结果,否则一直等待下去
        # 这里等待 Future 对象的时候,遇到阻塞操作(因为 Future 对象没有结果会一直等待下去), 所以会切换到 set_after 任务去执行
        # 而 set_after 任务在等待 2s 后,会给 Future 对象赋值,所以此时 Future 对象返回, 代码执行结束
        data = await fut
        print(data)
    asyncio.run( main() )
    

    例子二:
    asyncio 模块中,还可以使用 asyncio.Future()函数来创建一个 Future 对象。它与 loop.create_future() 创建的 Future 对象不同的是,loop.create_future() 创建的对象与特定的事件循环相关联,这意味着,该 Future 对象只能在创建它的事件循环中使用,而 asyncio.Future() 创建的 Future 对象没有特定事件循环想关联,所以可以在任何事件循环中使用。

    import asyncio
    
    async def task1(future):
        await asyncio.sleep(2)
        future.set_result('Task 1 completed')
    
    async def task2(future):
        await asyncio.sleep(3)
        future.set_result('Task 2 completed')
    
    async def main():
        loop = asyncio.get_running_loop()
        fut = loop.create_future()
    
        # 启动两个任务,并共享同一个 Future 对象
        asyncio.create_task(task1(fut))
        asyncio.create_task(task2(fut))
    
        # 等待 Future 对象的结果
        result = await fut
        print(result)
    
    asyncio.run(main())
    

    在上述示例中,我们定义了两个任务函数 task1() 和 task2(),它们分别会在一段时间后设置共享的Future对象的结果。在main()函数中,我们获取当前的事件循环对象,并使用loop.create_future()创建了一个 Future 对象 fut。然后,通过 asyncio.create_task() 函数启动了两个任务,它们共享同一个 Future 对象 fut。最后,使用 await 关键字等待 fut 的结果,并输出结果。

    在这个示例中,loop.create_future() 的优势是可以将一个 Future 对象传递给多个协程,使得多个协程可以共享同一个 Future 对象,并在不同的时间点设置该对象的结果。这样可以实现更灵活的协同处理,例如在一个协程中等待多个任务完成后再继续执行。

    4.7 concurrent.futures 的 Future 对象

    首先,这个对象和 asyncio.Future 对象没有任何关系,它是使用线程池、进程池实现异步操作时用到的对象.

    import time
    from concurrent.futures import Future
    from concurrent.futures.thread import ThreadPoolExecutor
    from concurrent.futures.process import ProcessPoolExecutor
    
    
    def func(value):
        time.sleep(1)
        print(value)
    
    # 创建线程池
    pool = ThreadPoolExecutor(max_workers=5)
    
    # 创建进程池
    # pool = ProcessPoolExecutor(max_workers=5)
    
    for i in range(10):
        fut = pool.submit(func, i)
        print(fut)
    

    他们为不同的应用场景设计,但是 Python 提供了一个将 futures.Future 对象包装成 asyncio.Future 对象的函数 asyncio.warp_future

    import time
    import asyncio
    import concurrent.futures
    
    def func1():
        # 某个耗时操作
        time.sleep(2)
        return "123"
    
    async def main():
        loop = asyncio.get_running_loop()
    
        # 1. Run in the default loop's executor ( 默认ThreadPoolExecutor )
        # 第一步:内部会先调用 ThreadPoolExecutor 的 submit 方法去线程池中申请一个线程去执行func1函数,并返回一个concurrent.futures.Future对象
        # 第二步:调用asyncio.wrap_future将concurrent.futures.Future对象包装为asycio.Future对象。
        # 因为concurrent.futures.Future对象不支持await语法,所以需要包装为 asycio.Future对象 才能使用。
        fut = loop.run_in_executor(None, func1)
        result = await fut
        print('default thread pool', result)
    
        # 2. Run in a custom thread pool:
        # with concurrent.futures.ThreadPoolExecutor() as pool:
        #     result = await loop.run_in_executor(
        #         pool, func1)
        #     print('custom thread pool', result)
    
        # 3. Run in a custom process pool:
        # with concurrent.futures.ProcessPoolExecutor() as pool:
        #     result = await loop.run_in_executor(
        #         pool, func1)
        #     print('custom process pool', result)
    
    asyncio.run(main())
    

    loop.run_in_executor() 函数上面注释已经说清楚了,其实就是内部调用了 线程池或者进程池的submit 方法,然后调用了 asyncio.wrap_future(),将起包装成了一个 asyncio 的 Future 对象。

    loop.run_in_executor() 的作用就是将阻塞的函数委托给线程或进程池,在异步环境中运行它,以避免阻塞事件循环。它返回一个asyncio.Future对象,表示在执行器中运行的函数的结果。

    注意:我通过查看代码,发现里面调用的是 futures.wrap_future() 函数,而不是 asyncio.wrap_future()函数,可能不同版本之间不一样吧,但是这两个函数的功能是一样的,都是将concurrent.futures.Future对象包装为asyncio.Future对象,以便在 asyncio 的事件循环中进行异步处理,两者之间的主要区别在于模块的不同,一个是concurrent.futures模块中的函数,另一个是asyncio模块中的函数。

    下面这个例子是 asyncio 模块加上不支持异步模块的混合使用。

    import requests
    import asyncio
    import os
    
    
    async def download_images(url):
        # 发送网络请求,下载图片,遇到网络IO,自动切换其他任务
        print("开始下载:", url)
    
        loop = asyncio.get_event_loop()
        # requests 模块默认不支持异步操作,所以使用线程池来配合实现
        future = loop.run_in_executor(None, requests.get, url)
    
        response = await future
        print("下载完成")
    
        file_name = url.rsplit("_")[-1]
        with open(file_name, mode = 'wb') as f:
            f.write(response.content)
    
    
    if __name__ == '__main__':
        url_list = [
            "https://www3.autoimg.cn/newsdfs/g26/M04/B7/07/1488x0_1_autohomecar__CjIFVmR2EjmAV2uOAAJsOzOueIg341.jpg",
            "https://car2.autoimg.cn/cardfs/product/g24/M06/76/C9/1488x0_1_autohomecar__Chtk3WQQgK-AA6DDACIn_NIMSLw990.jpg",
            "https://www2.autoimg.cn/newsdfs/g26/M08/82/C6/1488x0_1_autohomecar__ChxkjmR2BUuADxtRAAVgtryBmXQ532.jpg"
        ]
    
        tasks = [ download_images(url) for url in url_list]
    
        loop = asyncio.get_event_loop()
        loop.run_until_complete(asyncio.wait(tasks))
    

    4.8 异步迭代器

    异步迭代器: 实现了 __aiter__()__anext__() 方法的对象。__anext__ 必须返回一个 awaitable 对象。async for 会处理异步迭代器的__anext__()方法所返回的可等待对象,直到其引发一个 StopAsyncIteration 异常。由 PEP 492 引入。

    awaitable 对象 是指在异步编程中可以使用 await 关键字进行等待操作的对象

    异步可迭代对象: 可在 async for 语句中被使用的对象。必须通过它的 __aiter__() 方法返回一个 asynchronous iterator。由 PEP 492 引入。

    以上的概念从该链接复制:python 术语对照表

    import asyncio
    
    
    class Reader(object):
        """ 自定义异步迭代器, 同时也是异步可迭代对象"""
    
        def __init__(self):
            self.count = 0
    
        async def readline(self):
            self.count += 1
            if self.count == 100:
                return None
            return self.count
    
        def __aiter__(self):
            return self
    
        async def __anext__(self):
            val = await self.readline()
            if val is None:
                raise StopAsyncIteration
            return val
    
    
    async def func():
        obj = Reader()
        # async for 语句必须写在协程函数里面
        async for item in obj:
            print(item)
    
    
    asyncio.run(func())
    

    4.9 异步上下文管理器

    上下文管理器 是Python 中用于管理资源的一种机制,它提供了一种方便的方式来管理资源的获取和释放,无论是在正常情况下还是发生异常的情况。它可以使用 with 语句来创建一个上下文,并确保在离开该上下文时正确处理资源.

    异步上下文管理器 它通过定义 __aenter__()__aexit__() 方法来对 async with 语句中的环境进行控制。由 PEP 492 引入。

    import asyncio
    
    class AsyncContextManger:
        def __init__(self, conn):
            self.conn = conn
    
        async def do_something(self):
            # 异步操作数据库
            return 666
    
        async def __aenter__(self):
            # 异步链接数据库
            self.conn = await asyncio.sleep(1)
            return self
        
        async def __aexit__(self, exc_type, exc, tb):
            # 异步关闭数据库链接
            await asyncio.sleep(1)
            
    # async with 方法需要在异步函数内进行使用
    # 当使用是会执行当前类中得__aenter__ 这个方法返回什么那么f就是什么[可以进行设置数据库链接]
    # 当上下文完成后 就会自动使用__aexit__方法[关闭数据库链接]
    async def func():
        async with AsyncContextManger() as f:
            result = await f.do_something()
            print (result)
    
    asyncio.run(func())
    

    4.10 uvloop

    uvloopasyncio 的事件循环的替代方案。是第三方的人员写的。它的事件循环效率是大于默认 asyncio 的事件循环的, 性能更高.

    注意:windows 下面并没有 uvloop,在 Linux 上面是可以安装成功的

    pip3 install uvloop
    
    import asyncio
    import uvloop
    
    # TODO:将 asyncio 里面的事件循环替换为 uvloop
    asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
    
    # 编写 asyncio 的代码
    
    # 内部循环事件会自动化变为 uvloop
    asyncio.run( ... )
    

    4.11 异步库

    在一些情况下,我们可能需要将 asyncio与其他异步框架一起使用,asyncio 支持与其它库一起使用,例如aioredis、aiomysql、aiohttp 等,这些库都实现了 asyncio 的协议,并且能够与 asyncio 无缝的协作。

    pip3 install aioredis     # 异步操作redis的库
    pip3 install aiomysql     # 异步操作mysql的库
    pip3 install aiohttp      # 异步编写http的库
    

    作者:_Rabbit_

    物联沃分享整理
    物联沃-IOTWORD物联网 » python asyncio (协程、异步编程)

    发表回复