一步步理解 Python 异步生成器(AsyncGenerator)——从入门到实践

一步步理解 Python 异步生成器(AsyncGenerator)——从入门到实践

  • 前言
  • 1. 什么是生成器?
  • 2. 什么是异步生成器?
  • 3. 代码示例:异步生成器的实际应用
  • 完整代码
  • 代码详解
  • 运行结果
  • 4. 关键点回顾
  • 结语
  • 前言

    在 Python 中,生成器(Generator)是一种非常强大的工具,能够在函数执行的过程中多次返回值,而不需要一次性返回所有结果。异步生成器(AsyncGenerator)则是结合了生成器和异步编程(asyncio)的概念,能够在异步环境中按需生成数据。这篇文章将通过一个简单的例子,帮助初学者理解异步生成器的工作原理,以及 yield 关键字在异步生成器中的作用。

    1. 什么是生成器?

    首先,我们来回顾一下生成器的基本概念。一个生成器函数类似于一个普通函数,不同的是它使用了 yield 关键字而不是 return。每当生成器的 yield 被执行时,函数会暂停执行,并返回一个值。下次调用这个生成器时,函数会从上次暂停的地方继续执行。

    来看一个简单的例子:

    def simple_generator():
        yield 1
        yield 2
        yield 3
    
    gen = simple_generator()
    
    for value in gen:
        print(value)
    

    运行结果:

    1
    2
    3
    

    在这个例子中,每次循环时,生成器都会返回一个新的值,直到所有 yield 都被执行完毕。

    2. 什么是异步生成器?

    异步生成器结合了异步编程和生成器的优点。它可以在异步环境中使用,允许我们在等待某些操作(如网络请求、文件操作)完成时,按需返回数据。

    使用 async def 定义异步生成器,使用 yield 来返回数据,同时使用 await 来执行异步操作。

    3. 代码示例:异步生成器的实际应用

    接下来,我们通过一个示例代码来深入理解异步生成器的使用方式。这段代码模拟了一个异步搜索的过程,其中包括了上下文生成、回调函数处理、以及异步任务的执行。

    完整代码
    import asyncio
    from typing import AsyncGenerator, List, Tuple
    
    # 模拟的上下文生成器类
    class ContextBuilder:
        def build_context(self, conversation_history=None, **kwargs) -> Tuple[List[str], str]:
            context_chunks = ["chunk1", "chunk2", "chunk3"]
            context_records = "Initial Context Record"
            return context_chunks, context_records
    
    # 回调函数模拟类
    class Callback:
        def on_map_response_start(self, context_chunks):
            print("Callback: Starting map response with context chunks:", context_chunks)
    
        def on_map_response_end(self, map_responses):
            print("Callback: Finished map response with results:", map_responses)
    
    class AsyncSearch:
        def __init__(self):
            self.context_builder = ContextBuilder()
            self.context_builder_params = {}
            self.callbacks = [Callback()]
            self.map_llm_params = {}
            self.reduce_llm_params = {}
    
        async def _map_response_single_batch(self, context_data: str, query: str, **kwargs) -> str:
            # 模拟异步处理单个批次数据
            await asyncio.sleep(1)
            return f"Processed {context_data} with query '{query}'"
    
        async def _stream_reduce_response(self, map_responses: List[str], query: str, **kwargs) -> AsyncGenerator[str, None]:
            # 模拟异步流式减少响应
            for response in map_responses:
                await asyncio.sleep(1)
                yield f"Reduced result: {response}"
    
        async def astream_search(self, query: str) -> AsyncGenerator[str, None]:
            """Stream the global search response."""
            context_chunks, context_records = self.context_builder.build_context()
    
            # 如果有回调函数,执行开始回调
            if self.callbacks:
                for callback in self.callbacks:
                    callback.on_map_response_start(context_chunks)
    
            # 异步处理每个上下文块
            map_responses = await asyncio.gather(*[
                self._map_response_single_batch(
                    context_data=data, query=query, **self.map_llm_params
                )
                for data in context_chunks
            ])
    
            # 如果有回调函数,执行结束回调
            if self.callbacks:
                for callback in self.callbacks:
                    callback.on_map_response_end(map_responses)
    
            # 先返回上下文记录
            yield context_records
    
            # 返回减少后的响应
            async for response in self._stream_reduce_response(
                map_responses=map_responses,
                query=query,
                **self.reduce_llm_params,
            ):
                yield response
    
    # 测试代码
    async def main():
        search = AsyncSearch()
        async for result in search.astream_search("example query"):
            print("Received:", result)
    
    # 执行异步主函数
    asyncio.run(main())
    
    代码详解
    1. ContextBuilder 类:这个类负责生成上下文数据,包括 context_chunkscontext_records。在实际应用中,这些数据可能是从数据库或其他服务中获取的。

    2. Callback 类:模拟回调函数,在执行前后打印信息。这展示了如何在复杂操作中使用回调函数。

    3. AsyncSearch 类:核心类,包含了异步生成器的实现。

    4. _map_response_single_batch 方法:这是一个模拟的异步函数,假装在处理某个数据块。实际应用中,这里可能是一些耗时的计算或网络请求。
    5. _stream_reduce_response 方法:这是另一个异步生成器,用于逐步处理和返回数据。
    6. astream_search 方法:这是我们要重点关注的异步生成器。它首先调用了 ContextBuilder,然后执行了一些回调函数,并异步处理数据块。最后,它先返回了 context_records,然后返回了异步处理的结果。
    7. main 函数:测试代码,通过 async for 循环来获取 astream_search 生成器的返回值,并打印出来。

    运行结果

    运行上面的代码,你会看到类似以下的输出:

    Callback: Starting map response with context chunks: ['chunk1', 'chunk2', 'chunk3']
    Callback: Finished map response with results: ['Processed chunk1 with query \'example query\'', 'Processed chunk2 with query \'example query\'', 'Processed chunk3 with query \'example query\'']
    Received: Initial Context Record
    Received: Reduced result: Processed chunk1 with query 'example query'
    Received: Reduced result: Processed chunk2 with query 'example query'
    Received: Reduced result: Processed chunk3 with query 'example query'
    

    4. 关键点回顾

  • 异步生成器的定义:通过 async defyield 实现,可以在异步任务中按需生成数据。
  • yield 之后的代码继续执行:在生成器中,yield 会暂停函数的执行,但下次继续调用时,代码会从上次暂停的地方继续执行。
  • 结合异步编程:异步生成器允许我们在处理复杂任务(如网络请求、数据库查询)时,流式返回部分结果,优化资源的利用。
  • 结语

    这篇文章通过一个简单的示例,向大家介绍了 Python 中的异步生成器,以及如何在实际应用中使用它们。通过理解 yield 和异步编程的结合,大家可以更好地编写高效、可扩展的异步代码。

    如果你觉得这篇文章对你有帮助,欢迎点赞、收藏和分享!如果有任何问题,也欢迎在评论区留言讨论。

    作者:engchina

    物联沃分享整理
    物联沃-IOTWORD物联网 » 一步步理解 Python 异步生成器(AsyncGenerator)——从入门到实践

    发表回复