1. 什么是异步编程

异步编程是一种编程范式,允许程序在等待某些操作(如 I/O 操作、网络请求)完成时,不阻塞整个程序的执行流程。通过异步编程,程序可以在等待任务完成的同时,继续执行其他任务,从而提高资源利用率和响应性。


2. 同步 vs. 异步

同步编程

  • 按顺序执行:一个任务完成后,才执行下一个任务。
  • 阻塞:等待某个任务完成期间,程序无法执行其他任务。
  • 示例:同步 HTTP 请求

    import requests
    
    def fetch_url(url):
        response = requests.get(url)
        print(f"Fetched {url} with status {response.status_code}")
    
    def main():
        urls = [
            "https://www.example.com",
            "https://www.python.org",
            "https://www.openai.com"
        ]
        for url in urls:
            fetch_url(url)
    
    if __name__ == "__main__":
        main()
    

    异步编程

  • 并发执行:多个任务可以同时进行,不互相阻塞。
  • 非阻塞:程序在等待某个任务时,可以执行其他任务。
  • 示例:异步 HTTP 请求

    import asyncio
    import aiohttp
    
    async def fetch_url(session, url):
        async with session.get(url) as response:
            print(f"Fetched {url} with status {response.status}")
    
    async def main():
        urls = [
            "https://www.example.com",
            "https://www.python.org",
            "https://www.openai.com"
        ]
        async with aiohttp.ClientSession() as session:
            tasks = [fetch_url(session, url) for url in urls]
            await asyncio.gather(*tasks)
    
    if __name__ == "__main__":
        asyncio.run(main())
    

    3. Python 异步编程的关键组件

    3.1 协程(Coroutines)

  • 定义:使用 async def 定义的函数。
  • 特性:可以暂停和恢复执行,使用 await 关键字。
  • 示例:简单协程

    import asyncio
    
    async def greet(name):
        print(f"Hello, {name}!")
        await asyncio.sleep(1)
        print(f"Goodbye, {name}!")
    

    3.2 事件循环(Event Loop)

  • 作用:管理和调度协程的执行。
  • 获取方式:使用 asyncio.get_event_loop()asyncio.run() 自动管理。
  • 3.3 任务(Tasks)

  • 定义:协程的封装,允许并发执行。
  • 创建方式asyncio.create_task(coroutine)asyncio.ensure_future(coroutine)
  • 示例:创建任务

    async def main():
        task1 = asyncio.create_task(greet("Alice"))
        task2 = asyncio.create_task(greet("Bob"))
        await task1
        await task2
    

    3.4 Future 对象

  • 定义:表示一个将来会完成的操作。
  • 用途:协程之间的通信和结果传递。
  • 创建方式asyncio.Future()
  • 示例:使用 Future

    # 这里的fut是一个asyncio.Future对象,用于存储结果
    async def set_future(fut, delay, value):
        await asyncio.sleep(delay)
        fut.set_result(value) # 设置Future的结果,通知等待它的协程任务继续执行,调用这个函数,状态就从pending变成done,等待它的所有协程会被事件循环恢复执行
    
    async def main():
        fut = asyncio.Future() # 创建一个Future对象,用于表示一个尚未完成的操作,初始状态下,fut是”挂起“pending状态
        asyncio.create_task(set_future(fut, 2, "Future Result"))# 启动一个并发任务,运行set_future协程,任务将在2秒后调用set_result设置结果
        result = await fut # 挂起当前协程main,直到fut的结果被设置
        print(result)
    

    4. 关键语法:asyncawait

    async

  • 用途:定义一个协程函数。
  • 示例
  • async def my_coroutine():
        pass
    

    await

  • 用途:等待一个协程或可等待对象完成。
  • 限制:只能在协程内部使用。
  • 示例
  • async def my_coroutine():
        await asyncio.sleep(1)
    

    5. asyncio.run() 的作用

  • 功能:
  • 创建并运行事件循环。
  • 执行顶层协程。
  • 关闭事件循环。
  • 语法
  • asyncio.run(main())
    
  • 示例
  • async def main():
        print("Hello, Async World!")
    
    asyncio.run(main())
    

    6. 常用异步库

    6.1 aiohttp

  • 用途:异步 HTTP 客户端和服务器。
  • 安装
  • pip install aiohttp
    
  • 示例:异步 HTTP 客户端
  • import asyncio
    import aiohttp
    
    async def fetch(session, url):
        async with session.get(url) as response:
            return await response.text()
    
    async def main():
        async with aiohttp.ClientSession() as session:
            html = await fetch(session, 'https://www.example.com')
            print(html)
    
    asyncio.run(main()) # 创建并运行事件循环,执行顶层协程main(),直到协程完成
    

    6.2 aiofiles

  • 用途:异步文件操作。
  • 安装
  • pip install aiofiles
    
  • 示例:异步读取文件
  • import aiofiles
    import asyncio
    
    async def read_file(filepath):
        async with aiofiles.open(filepath, mode='r') as f:
            contents = await f.read()
            print(contents)
    
    asyncio.run(read_file('example.txt'))
    

    7. 示例代码

    7.1 简单的协程

    import asyncio
    
    async def greet(name):
        print(f"Hello, {name}!")
        await asyncio.sleep(1)
        print(f"Goodbye, {name}!")
    
    async def main():
        await greet("Alice")
        await greet("Bob")
    
    asyncio.run(main())
    

    输出

    Hello, Alice!
    Goodbye, Alice!
    Hello, Bob!
    Goodbye, Bob!
    

    7.2 并发执行多个协程

    import asyncio
    
    async def say_after(delay, message):
        await asyncio.sleep(delay)
        print(message)
    
    async def main():
        task1 = asyncio.create_task(say_after(2, "Hello after 2 seconds"))
        task2 = asyncio.create_task(say_after(1, "Hello after 1 second"))
        task3 = asyncio.create_task(say_after(3, "Hello after 3 seconds"))
    
        print("Tasks started")
        await task1
        await task2
        await task3
        print("All tasks completed")
    
    asyncio.run(main())
    

    输出

    Tasks started
    Hello after 1 second
    Hello after 2 seconds
    Hello after 3 seconds
    All tasks completed
    

    7.3 异步 HTTP 请求

    import asyncio
    import aiohttp
    
    async def fetch(session, url):
        async with session.get(url) as response:
            status = response.status
            text = await response.text()
            print(f"Fetched {url} with status {status}")
            return text
    
    async def main():
        urls = [
            "https://www.example.com",
            "https://www.python.org",
            "https://www.openai.com"
        ]
        async with aiohttp.ClientSession() as session:
            tasks = [fetch(session, url) for url in urls]
            results = await asyncio.gather(*tasks)
            print("All fetches completed")
    
    asyncio.run(main())
    

    输出(部分示例):

    Fetched https://www.example.com with status 200
    Fetched https://www.python.org with status 200
    Fetched https://www.openai.com with status 200
    All fetches completed
    

    8. 最佳实践

    8.1 避免阻塞操作

  • 不要在协程中使用同步阻塞代码(如 time.sleep())。
  • 使用异步版本(如 await asyncio.sleep())。
  • 错误示例

    import asyncio
    import time
    
    async def blocking_task():
        time.sleep(2)  # 阻塞事件循环
        print("Blocking task completed")
    
    async def main():
        await blocking_task()
    
    asyncio.run(main())
    

    修正示例

    import asyncio
    
    async def non_blocking_task():
        await asyncio.sleep(2)  # 非阻塞
        print("Non-blocking task completed")
    
    async def main():
        await non_blocking_task()
    
    asyncio.run(main())
    

    8.2 正确处理异常

  • 使用 try-except 捕获协程中的异常
  • 在 Future 和 Task 中设置异常
  • 示例

    import asyncio
    
    async def faulty_task():
        await asyncio.sleep(1)
        raise ValueError("An error occurred in the task")
    
    async def main():
        try:
            await faulty_task()
        except ValueError as e:
            print(f"Caught exception: {e}")
    
    asyncio.run(main())
    

    输出

    Caught exception: An error occurred in the task
    

    8.3 资源管理

  • 使用异步上下文管理器async with)管理资源,如文件和网络连接。
  • 确保资源被正确关闭
  • 示例

    import aiofiles
    import asyncio
    
    async def write_file(filepath, content):
        async with aiofiles.open(filepath, mode='w') as f:
            await f.write(content)
        print(f"Wrote to {filepath}")
    
    asyncio.run(write_file('async_example.txt', 'Hello, Async World!'))
    

    在 Python 中,关键字 with 主要用于简化资源管理和上下文管理(context management),帮助我们在处理文件、网络连接、锁、事务等时,确保资源能够被正确地初始化和释放。以下是对 with 关键字及其背后运行机制的详细讲解。


    一、with 关键字的核心作用

    1. 上下文管理(Context Management)
      with 语句能让代码块在进入时和离开时执行特定的初始化和清理操作。典型场景包括:
    2. 文件操作:自动关闭文件
    3. 锁管理:自动加锁和解锁
    4. 网络或数据库连接:自动打开和关闭连接
    5. 事务管理:自动提交或回滚
    6. 简化资源释放
      没有 with 语句时,通常需要在 try-finally 块中手动调用 close()release() 等方法。with 帮助我们省去手动管理的繁琐并避免遗漏,减少因异常或流程复杂而导致资源未被释放的风险。

    二、with 的基本用法

    1. 以文件操作为例

    with open('example.txt', 'r') as f:
        data = f.read()
        print(data)
    
  • 逻辑解析
    1. open('example.txt', 'r') 返回一个文件对象。
    2. with 语句调用文件对象的上下文管理协议,进入 with 块时会执行文件对象的 __enter__ 方法,将其返回值赋给变量 f
    3. with 语句块中执行 data = f.read() 等操作。
    4. 无论是否发生异常,离开 with 块时都会自动执行文件对象的 __exit__ 方法,负责关闭文件。
  • 优点:即使在读取文件时出现错误或抛出异常,文件也能被正确关闭,避免资源泄露。
  • 2. 并列管理多个资源

    with open('file1.txt', 'r') as f1, open('file2.txt', 'r') as f2:
        data1 = f1.read()
        data2 = f2.read()
    
  • 逻辑解析:
    1. 同时打开两个文件,并以 f1f2 分别引用。
    2. 无论后续操作是否出错,离开 with 块时都会自动关闭 f1f2

  • 三、上下文管理协议:__enter____exit__

    要让一个对象能在 with 中使用,需要实现上下文管理协议,即定义 __enter____exit__ 方法。Python 通过这两个方法来完成进入/离开上下文时的资源初始化与释放。

    1. __enter__ 方法

  • 调用时机:在进入 with 块之前自动调用。
  • 返回值:通常是需要管理的资源对象本身,或者一个与资源交互的代理对象。该返回值会被赋给 as 后面的变量。
  • 2. __exit__ 方法

  • 调用时机:在离开 with 块时自动调用,无论是正常完成代码块还是发生异常都会执行。

  • 参数

  • exc_type:异常类型,如果没有异常则为 None
  • exc_val:异常值,如果没有异常则为 None
  • exc_tb:异常追踪信息(traceback),如果没有异常则为 None
  • 返回值:如果返回 True,表示异常已被处理,不会再往上抛出;如果返回 False(或不返回),异常会继续向外传播。

  • 3. 自定义上下文管理器示例

    class MyResource:
        def __enter__(self):
            print("Entering context...")
            return self  # 返回资源或相关对象
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            if exc_type:
                print(f"An exception occurred: {exc_val}")
            print("Exiting context... Cleaning up resources.")
            # 如果需要吞掉异常,可以返回 True,否则返回 None 或 False
            return False
    
    # 使用示例
    with MyResource() as resource:
        print("In the with-block")
        # 这里可以做资源相关的操作
    
  • 输出

    Entering context...
    In the with-block
    Exiting context... Cleaning up resources.
    
  • 如果在 with 块内部发生异常,会被传递给 __exit__exc_typeexc_valexc_tb 参数;你可以选择处理或抛出。


  • 四、常见使用场景

    1. 文件操作

    2. 自动打开和关闭文件,避免资源泄漏。
    3. 锁(线程、进程同步)

    4. 在多线程或多进程环境下使用锁时,可以用 with 确保在锁定后自动解锁。
    5. from threading import Lock
      
      lock = Lock()
      
      with lock:
          # 在锁保护的代码块中操作共享数据
          pass
      
    6. 数据库事务

    7. 当执行数据库操作时,可以将事务封装在 with 块中,自动提交或回滚。
    8. class Transaction:
          def __enter__(self):
              # 开启事务
              pass
      
          def __exit__(self, exc_type, exc_val, exc_tb):
              if exc_type:
                  # 回滚
                  pass
              else:
                  # 提交
                  pass
      
    9. 网络连接

    10. 在网络通信时,使用 with 可以确保连接打开后在离开时自动关闭。
    11. 异步上下文管理

    12. Python 3.5+ 支持 async with 用于异步上下文管理,比如在 aiohttp 库中使用 ClientSession
    13. import aiohttp
      import asyncio
      
      async def main():
          async with aiohttp.ClientSession() as session:
              async with session.get("https://www.example.com") as response:
                  print(await response.text())
      
      asyncio.run(main())
      

    五、与 try/finally 的对比

    使用 with 语句可以被视为是 try/finally 块的一种语法糖和扩展,用于自动处理进入和退出时的逻辑。

  • try/finally

    file = open('example.txt', 'r')
    try:
        data = file.read()
    finally:
        file.close()
    
  • with

    with open('example.txt', 'r') as file:
        data = file.read()
    
  • 两者都能确保资源被关闭或释放,但 with 更加简洁,不需要显式地调用 close()release() 等方法,而且可以更好地处理异常情况。


    六、常见问题

    1. with 后面可以写多个上下文吗?
      可以,例如:

      with open('file1.txt') as f1, open('file2.txt') as f2:
          # 同时管理两个文件
          pass
      
    2. 能否在自定义类中使用 with
      可以,只需在类中实现 __enter____exit__ 方法。

    3. with 会处理所有异常吗?
      with 不会吞掉异常,除非 __exit__ 方法显式返回 True。否则异常会继续向上抛出。

    4. __exit__ 方法不返回 True 会怎样?
      不返回 True 或返回 False 意味着该异常没有被 with 内部处理,继续往外传播。

    5. with 能自动 retry(重试)吗?
      不能,with 只负责资源的初始化和释放,并不包含重试逻辑。如果需要重试,需要你自己在代码逻辑中实现。


    七、总结

  • with 是 Python 中管理资源的优雅方式
    用于在进入代码块前完成资源的初始化,离开时自动进行清理操作。
  • 借助上下文管理协议
    只需要定义 __enter____exit__,就能让自定义类在 with 中使用,实现自动管理资源的功能。
  • 常见应用场景
  • 文件读写:自动关闭文件
  • 锁管理:加锁解锁
  • 数据库事务:自动提交或回滚
  • 网络连接:打开连接后自动关闭
  • 异步上下文:async with 在异步代码中同样能安全地管理资源
  • 总体而言,with 语句能大幅简化资源管理的代码,减少错误和泄露风险,使 Python 的代码更加简洁可读。在日常开发中,凡是涉及到“获取资源-使用资源-释放资源”模式的逻辑,都可以考虑使用 with 来确保资源的安全释放和正确管理。

    作者:xnuscd

    物联沃分享整理
    物联沃-IOTWORD物联网 » python异步

    发表回复