【Python】深入理解 Python 的 asyncio 库

深入理解 Python 的 asyncio 库

  • 1、引言
  • 2、进程,线程,协程
  • 多进程
  • 多线程
  • 协程
  • 总结
  • 3、asyncio 的基本用法
  • 3.1 异步函数和 await
  • 3.2 任务 (Tasks)
  • 4、本质
  • 4.1 协程
  • 4.2 事件循环
  • 4.3 任务和 Future
  • 5、应用场景
  • 5.1 网络 I/O
  • 5.2 并发任务
  • 5.3 事件驱动的程序
  • 6、高级用法
  • 6.1 超时控制
  • 6.2 取消任务
  • 6.3 信号量 (Semaphore)
  • 6.4 使用 asyncio.Queue
  • 7、实战一下
  • 8、设计模式
  • 9. 注意事项
  • 9.1 线程与协程
  • 9.2 阻塞代码
  • 9.3 异常处理
  • 1、引言

    asyncio 是 Python 标准库中的一个库,提供了对异步 I/O、事件循环、协程和任务等异步编程模型的支持。它使得编写高效、可扩展的并发代码变得更加容易。本文将详细讲解 asyncio 的用法、本质、应用场景、注意事项以及一些高级技巧

    2、进程,线程,协程

    1. 线程
      线程是操作系统调度的基本单位,同一个进程中的多个线程共享相同的内存空间。线程之间的切换由操作系统内核负责。

    2. 协程是由程序自身调度的函数,可以在执行过程中暂停和恢复,协程的切换由程序自身完成,而不是依赖操作系统。

    3. 进程是操作系统资源分配的基本单位,每个进程有独立的内存空间,进程之间不能直接共享内存,需要通过进程间通信(IPC)。

      举个例子,一个经典案例:
      小明在家需要完成以下事情:
      电饭锅煮饭大约30分钟
      洗衣机洗衣服大约40分钟
      写作业大约50分钟

      多进程

      多个小明分别完成任务
      多进程的情况就像是有三个小明,每个小明独立负责一项任务:
      小明1:负责煮饭
      小明2:负责洗衣服
      小明3:负责写作业
      每个小明独立完成自己的任务,互不干扰,各自有自己的独立空间和工具(资源)。
      特点:
      任务完全独立,互不干扰
      各自有自己的资源,切换时开销较大
      适合CPU密集型任务

      多线程

      一个小明在不同任务之间切换

      多线程的情况就像是只有一个小明,但他可以在不同任务之间快速切换:

      小明启动电饭锅煮饭,然后去启动洗衣机洗衣服
      在等待饭煮好和衣服洗好期间,小明开始写作业
      小明可以在煮饭和洗衣服期间继续写作业,但需要来回切换注意力
      特点:

      共享同一资源空间,切换时有开销,但比进程小
      适合I/O密集型任务
      需要管理资源竞争问题

      协程

      一个小明按计划有序地完成任务

      协程的情况就像是只有一个小明,但他能非常有计划地按顺序完成各个任务,并且知道什么时候可以暂时停下一个任务去做另一个任务:

      小明启动电饭锅煮饭,知道这需要30分钟,于是他暂时搁置煮饭任务
      然后小明启动洗衣机洗衣服,知道这需要40分钟,于是他暂时搁置洗衣服任务
      在等待煮饭和洗衣服的过程中,小明开始写作业
      小明按照一个时间表来有序地切换任务,而不会同时做多件事,但看起来像是同时进行的
      特点:
      单线程内管理多任务,没有线程切换的开销
      适合I/O密集型任务
      需要程序自己管理任务的切换

      总结

      多进程:像有多个小明独立完成任务,各自分配独立的资源。
      多线程:像一个小明在多个任务之间快速切换,但需要管理资源共享和竞争,区别协程的主要地方在电饭锅会主动把小明叫过去(~)。
      协程:像一个小明有序地、按计划地完成任务,自己管理切换,没有线程切换的开销。

    3、asyncio 的基本用法

    3.1 异步函数和 await

    异步函数使用 async def 声明,await 关键字用于等待一个异步操作完成

    import asyncio
    
    async def say_hello():
        print("Hello")
        await asyncio.sleep(1)
        print("World")
    
    # 运行异步函数
    asyncio.run(say_hello())
    

    PS: 1、async声明后,不能直接say_hello运行函数
    2、现在运行还是会等待1s后执行print(“World”)
    3、协程本质上是事件循环。不是说你用了async await就会成异步,是需要程序员自己定义任务有哪些协程的。

    3.2 任务 (Tasks)

    任务用于调度和管理协程的执行。

    async def greet(name):
        print(f"Hello, {name}")
        await asyncio.sleep(1)
        print(f"Goodbye, {name}")
    
    async def main():
        task1 = asyncio.create_task(greet("Alice"))
        task2 = asyncio.create_task(greet("Bob"))
    
        await task1
        await task2
    
    asyncio.run(main())
    

    4、本质

    4.1 协程

    协程是可以暂停和恢复的函数。与传统的函数不同,协程可以在执行过程中暂停,以便其他协程可以运行。Python 使用 async def 声明协程函数,使用 await 暂停协程的执行。

    4.2 事件循环

    事件循环是 asyncio 的核心,用于调度和执行协程。事件循环负责处理异步任务、I/O 事件、定时器等。

    4.3 任务和 Future

    任务是协程的高级抽象,用于调度协程的执行。Future 是表示异步操作结果的低级抽象,可以与任务一起使用。

    5、应用场景

    5.1 网络 I/O

    异步编程非常适合处理网络 I/O 操作,如 HTTP 请求、WebSocket、数据库查询等。

    import aiohttp
    
    async def fetch_url(url):
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.text()
    
    async def main():
        urls = ["http://example.com", "http://example.org"]
        results = await asyncio.gather(*[fetch_url(url) for url in urls])
        for result in results:
            print(result)
    
    asyncio.run(main())
    

    5.2 并发任务

    asyncio 可以用于并发执行多个 I/O 密集型任务,如爬虫、数据处理等。

    async def task1():
        await asyncio.sleep(1)
        print("Task 1 done")
    
    async def task2():
        await asyncio.sleep(2)
        print("Task 2 done")
    
    async def main():
        await asyncio.gather(task1(), task2())
    
    asyncio.run(main())
    

    5.3 事件驱动的程序

    asyncio 适合编写需要响应多个事件的程序,如 GUI 应用、实时聊天服务器等。

    6、高级用法

    6.1 超时控制

    使用 asyncio.wait_for 可以设置协程的超时时间

    async def long_running_task():
        await asyncio.sleep(10)
    
    async def main():
        try:
            await asyncio.wait_for(long_running_task(), timeout=5)
        except asyncio.TimeoutError:
            print("The task took too long!")
    
    asyncio.run(main())
    

    6.2 取消任务

    可以取消正在运行的任务。

    async def task():
        try:
            while True:
                print("Running...")
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            print("Task was cancelled")
    
    async def main():
        t = asyncio.create_task(task())
        await asyncio.sleep(3)
        t.cancel()
        await t
    
    asyncio.run(main())
    

    6.3 信号量 (Semaphore)

    async def limited_task(semaphore, name):
        async with semaphore:
            print(f"Running {name}")
            await asyncio.sleep(2)
    
    async def main():
        semaphore = asyncio.Semaphore(2)
        tasks = [limited_task(semaphore, f"Task {i}") for i in range(5)]
        await asyncio.gather(*tasks)
    
    asyncio.run(main())
    

    6.4 使用 asyncio.Queue

    使用队列在协程之间传递数据。

    async def producer(queue):
        for i in range(5):
            await asyncio.sleep(1)
            await queue.put(i)
            print(f"Produced {i}")
    
    async def consumer(queue):
        while True:
            item = await queue.get()
            if item is None:
                break
            print(f"Consumed {item}")
            queue.task_done()
    
    async def main():
        queue = asyncio.Queue()
        producer_task = asyncio.create_task(producer(queue))
        consumer_task = asyncio.create_task(consumer(queue))
    
        await producer_task
        await queue.put(None)  # 用于通知消费者结束
        await consumer_task
    
    asyncio.run(main())
    
    

    7、实战一下

  • 爬取一组网页的内容
  • 并发地进行爬取操作
  • 统计每个网页的单词数量
  • 使用信号量限制并发请求数量
  • 使用队列在任务之间传递数据
    PS:asyncio只支持aiohttp,不支持requests
  • import asyncio
    import aiohttp
    from aiohttp import ClientSession
    from collections import Counter
    from typing import List
    
    # 要爬取的URL列表
    # 笔趣阁链接:它搞盗版 我来练习 我们都有光明的未来
    URLS = [
        'https://m.xbiqugew.com/book/53099/40832438.html',
        'https://m.xbiqugew.com/book/53099/40832438_2.html',
        'https://m.xbiqugew.com/book/53099/40822138.html',
        'https://m.xbiqugew.com/book/53099/40818457.html',
        'https://m.xbiqugew.com/book/53099/40808676.html',
        'https://m.xbiqugew.com/book/53099/40806884.html'
    ]
    
    # 限制并发请求数量
    
    async def fetch(url: str, session: ClientSession, sem: asyncio.Semaphore) -> str:
        async with sem:
            async with session.get(url) as response:
                return await response.text()
    
    async def count_words(url: str, session: ClientSession, sem: asyncio.Semaphore) -> None:
        content = await fetch(url, session, sem)
        words = content.split()
        word_count = Counter(words)
        print(f"URL: {url}, Word count: {len(words)}, Most common words: {word_count.most_common(5)}")
    
    async def main(urls: List[str]) -> None:
        sem = asyncio.Semaphore(3)
        async with aiohttp.ClientSession() as session:
            tasks = [count_words(url, session, sem) for url in urls]
            await asyncio.gather(*tasks)
    
    if __name__ == '__main__':
        asyncio.run(main(URLS))
    

    解释:
    1、什么时候该用async,什么时候该用await?设计理念是什么?我在代码里该怎么设计和使用?
    (1)async: 用于定义一个异步函数(协程)。异步函数在执行过程中可能会暂停,等待其他异步操作完成,但不会阻塞事件循环。
    await: 用于暂停异步函数的执行,等待另一个协程或异步操作完成后再继续执行。
    (2)await后面必须跟一个协程(future), 就可以阻塞当前协程, 切换到这个新协程里执行。可以理解为在这里调用函数
    (3)async 不等于 async with。async with 语句用于创建一个异步上下文管理器,aiohttp.ClientSession() 返回的是一个异步上下文管理器,因此需要在其前面加上 async with 关键字。这样做的目的是确保在异步环境中正确创建和管理 aiohttp.ClientSession() 对象。
    (4)asyncio.gather() 本身是一个协程 所以前面要加await
    (5)fetch这个函数就是耗时函数 所以添加async来定义为异步。
    async with sem,async with session.get(url) as response 这两个地方用async with是因为后面跟着的函数返回的都是异步上下文管理器
    (6)await response.text() 用await是因为 这里需要等待才能需要结果。相当于我告诉程序这里可以先执行其他函数。
    2、got Future attached to a different loop 这个报错的逻辑是什么?
    我们要限制一个协程的并发数的时候,可以在调用协程之前,先初始化一个Semaphore对象。然后把这个对象传到需要限制并发的协程里面,在协程里面,使用异步上下文管理器包住你的正式代码。
    报错的逻辑就是你初始化了但没有传到协程里面。导致不在同一个协程里。

    8、设计模式

    七中的练习适用于一些脚本,日常需求,生产的话将其改造的面向对象一点。

    我们将把代码分为多个类,每个类负责不同的功能。主要包括:

    Fetcher 类:负责发送 HTTP 请求并获取网页内容。
    WordCounter 类:负责统计网页的单词数量。
    Crawler 类:协调整个爬虫过程。
    Logger 类:负责日志记录。

    import asyncio
    import aiohttp
    from aiohttp import ClientSession
    from collections import Counter
    from typing import List, Dict
    
    class Logger:
        _instance = None
    
        def __new__(cls, *args, **kwargs):
            if not cls._instance:
                cls._instance = super(Logger, cls).__new__(cls, *args, **kwargs)
            return cls._instance
    
        def log(self, message: str):
            print(message)
    
    class Fetcher:
    
        async def fetch(self, url: str, session: ClientSession, sem: asyncio.Semaphore) -> str:
            async with sem:
                async with session.get(url) as response:
                    Logger().log(f"Fetching URL: {url}")
                    return await response.text()
    
    class WordCounter:
        @staticmethod
        def count(content: str) -> Dict[str, int]:
            words = content.split()
            word_count = Counter(words)
            return word_count
    
    class Crawler:
        def __init__(self, urls: List[str], concurrency: int):
            self.urls = urls
            self.concurrency = concurrency
            self.fetcher = Fetcher()
            self.results = []
    
        async def count_words_in_url(self, url: str, session: ClientSession, sem: asyncio.Semaphore):
            content = await self.fetcher.fetch(url, session, sem)
            word_count = WordCounter.count(content)
            Logger().log(f"URL: {url}, Word count: {len(content.split())}, Most common words: {word_count.most_common(5)}")
            self.results.append((url, word_count))
    
        async def run(self):
            sem = asyncio.Semaphore(self.concurrency)
            async with aiohttp.ClientSession() as session:
                tasks = [self.count_words_in_url(url, session, sem) for url in self.urls]
                await asyncio.gather(*tasks)
    
    if __name__ == '__main__':
        URLS = [
            'https://m.xbiqugew.com/book/53099/40832438.html',
            'https://m.xbiqugew.com/book/53099/40832438_2.html',
            'https://m.xbiqugew.com/book/53099/40822138.html',
            'https://m.xbiqugew.com/book/53099/40818457.html',
            'https://m.xbiqugew.com/book/53099/40808676.html',
            'https://m.xbiqugew.com/book/53099/40806884.html'
        ]
        crawler = Crawler(URLS, concurrency=5)
        asyncio.run(crawler.run())
    
    

    9. 注意事项

    9.1 线程与协程

    协程是单线程的,不会并行运行。在 I/O 密集型任务中,协程表现出色,但在 CPU 密集型任务中,需要配合线程或进程。

    9.2 阻塞代码

    在异步函数中避免使用阻塞操作,如 time.sleep,应使用 await asyncio.sleep。

    9.3 异常处理

    在异步代码中需要特别注意异常处理,避免未捕获的异常导致程序崩溃。

    async def risky_operation():
        try:
            # 执行可能引发异常的异步操作
            await some_async_function()
        except Exception as e:
            print(f"An error occurred: {e}")
    
    asyncio.run(risky_operation())
    

    作者:qq_40375355

    物联沃分享整理
    物联沃-IOTWORD物联网 » 【Python】深入理解 Python 的 asyncio 库

    发表回复