深入解析 Python 异步编程中的 `gather`、`as_completed` 和 `wait`

在现代 Python 开发中,异步编程已经成为了构建高性能、可扩展应用的关键技术之一。通过 asyncio 库,Python 提供了强大的异步功能,帮助开发者更好地管理并发任务,提升程序的效率和响应能力。asyncio 中的三个重要函数——gatheras_completedwait,是处理多个异步任务时非常有用的工具。

本文将深入介绍这三个函数,帮助你理解它们的工作原理、用法及适用场景,提升你在异步编程中的能力。

什么是异步编程?

异步编程是一种通过非阻塞方式处理并发任务的编程模型。在 Python 中,异步编程的核心是协程(coroutine)。协程允许你编写并发代码,但不需要像传统线程那样消耗额外的系统资源。asyncio 库是 Python 提供的官方库,用于处理异步编程,它使用事件循环(event loop)来调度和管理协程的执行。

在并发任务的处理中,如何有效地管理多个任务、等待任务的结果以及控制任务的执行顺序是非常重要的。asyncio 提供了多个工具来帮助开发者实现这一目标,其中 gatheras_completedwait 是最常用的三个函数。

1. asyncio.gather: 并行执行多个任务

什么是 gather

asyncio.gather 是一个非常强大的工具,它可以并行执行多个异步任务,并在所有任务完成后收集它们的结果。它接受多个协程任务作为参数,执行所有任务并等待它们完成。当所有任务都执行完毕后,gather 会返回一个包含所有任务结果的列表。

使用场景

  • 适用于并行执行多个任务,并在所有任务完成后收集结果。
  • 如果任务之间没有依赖关系,gather 是并行执行任务时的理想选择。
  • 示例代码

    import asyncio
    
    async def task_1():
        await asyncio.sleep(1)
        return "Task 1 completed"
    
    async def task_2():
        await asyncio.sleep(2)
        return "Task 2 completed"
    
    async def task_3():
        await asyncio.sleep(3)
        return "Task 3 completed"
    
    async def main():
        # 并行执行多个任务
        results = await asyncio.gather(task_1(), task_2(), task_3())
        print(results)  # ['Task 1 completed', 'Task 2 completed', 'Task 3 completed']
    
    # 执行
    asyncio.run(main())
    

    输出结果:

    ['Task 1 completed', 'Task 2 completed', 'Task 3 completed']
    

    在此示例中,asyncio.gather 并行执行了三个任务,并且最终返回了所有任务的执行结果。注意,gather 会等待所有任务完成才返回结果。

    异常处理

    如果某个任务在执行过程中抛出异常,gather 会立即抛出该异常。你可以通过 return_exceptions=True 参数使 gather 返回异常,而不是直接抛出。

    async def task_with_error():
        raise ValueError("An error occurred!")
    
    async def main_with_error():
        try:
            await asyncio.gather(task_1(), task_with_error())
        except Exception as e:
            print(f"Caught an exception: {e}")
    
    # 执行
    asyncio.run(main_with_error())
    

    输出结果:

    Caught an exception: An error occurred!
    

    通过 return_exceptions=True,你可以捕获到异常并继续执行其他任务。

    2. asyncio.as_completed: 按任务完成顺序获取结果

    什么是 as_completed

    asyncio.as_completed 是一个生成器函数,它允许你按任务完成的顺序获取任务的结果,而不是等待所有任务完成后才返回。这意味着它会在任务完成时逐个返回任务的结果。这个特性对于需要处理任务执行顺序的场景非常有用。

    使用场景

  • 适用于需要按任务完成的顺序逐个处理任务结果的场景。
  • 当任务的执行时间不确定且任务间没有明确的顺序时,使用 as_completed 可以更有效地处理结果。
  • 示例代码

    import asyncio
    
    async def task_1():
        await asyncio.sleep(2)
        return "Task 1 completed"
    
    async def task_2():
        await asyncio.sleep(1)
        return "Task 2 completed"
    
    async def main():
        tasks = [task_1(), task_2()]
        # 使用 as_completed 按任务完成顺序获取结果
        for result in asyncio.as_completed(tasks):
            print(await result)
    
    # 执行
    asyncio.run(main())
    

    输出结果:

    Task 2 completed
    Task 1 completed
    

    在此示例中,虽然 task_1task_2 后开始执行,但是由于 task_2 执行时间更短,因此它首先完成,并且 as_completed 会首先返回它的结果。

    异常处理

    gather 类似,as_completed 也可以在任务抛出异常时捕获并抛出。你可以在使用 async for 循环时捕获异常。

    async def task_with_error():
        await asyncio.sleep(1)
        raise ValueError("An error occurred!")
    
    async def main_with_error():
        tasks = [task_1(), task_with_error()]
        for result in asyncio.as_completed(tasks):
            try:
                print(await result)
            except Exception as e:
                print(f"Caught an exception: {e}")
    
    # 执行
    asyncio.run(main_with_error())
    

    输出结果:

    Task 2 completed
    Caught an exception: An error occurred!
    

    3. asyncio.wait: 等待任务完成并进行批量处理

    什么是 wait

    asyncio.wait 是一个更灵活的函数,它允许你等待多个任务,并返回已完成的任务和未完成的任务。你可以通过设置 return_when 参数来控制何时返回结果。例如,设置为 asyncio.ALL_COMPLETED 表示等所有任务完成后返回,而 asyncio.FIRST_COMPLETED 则表示一旦有任务完成就返回。

    使用场景

  • 适用于等待多个任务,并根据任务的完成情况进行批量处理的场景。
  • 你可以根据不同的条件(如任务完成或抛出异常)来灵活控制任务的执行。
  • 示例代码

    import asyncio
    
    async def task_1():
        await asyncio.sleep(1)
        return "Task 1 completed"
    
    async def task_2():
        await asyncio.sleep(2)
        return "Task 2 completed"
    
    async def main():
        tasks = [task_1(), task_2()]
        done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
        for task in done:
            print(await task)
    
    # 执行
    asyncio.run(main())
    

    输出结果:

    Task 1 completed
    Task 2 completed
    

    使用 return_when 参数

    asyncio.wait 允许你根据不同的条件返回已完成的任务。例如,设置 return_when=asyncio.FIRST_COMPLETED 可以让你在第一个任务完成时返回结果。

    async def main():
        tasks = [task_1(), task_2()]
        done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
        for task in done:
            print(await task)
    
    # 执行
    asyncio.run(main())
    

    输出结果:

    Task 1 completed
    

    总结

  • asyncio.gather:适用于并行执行多个任务,等待所有任务完成并返回它们的结果。
  • asyncio.as_completed:适用于按任务完成顺序逐个处理任务的结果,尤其是在任务完成时间不确定的场景下。
  • asyncio.wait:适用于等待多个任务,并灵活地控制返回已完成的任务,支持不同的条件和超时处理。
  • 通过合理选择这三个函数,你可以有效地管理和协调异步任务的执行,控制并发任务的数量,优化程序性能,并处理不同任务的执行顺序。这些工具对于构建高性能的异步应用是非常有帮助的。

    希望本文能帮助你更好地理解和应用 asyncio 中的这三个重要函数。如果你有任何问题或想法,欢迎在评论区交流!

    作者:蜗牛沐雨

    物联沃分享整理
    物联沃-IOTWORD物联网 » 深入解析 Python 异步编程中的 `gather`、`as_completed` 和 `wait`

    发表回复