# 协程

# import asyncio

:Python 的标准库,用于实现异步 IO 。

# 定义

  • 什么是异步 IO ?

    • 很多程序需要调用操作系统的 API ,执行 IO 操作,比如从磁盘读取一个文件。
      • 普通 IO 通信时,每次调用 API ,都会让程序阻塞一段时间。等到操作系统执行完 IO 操作,API 才会返回结果,程序才能开始做其它事。
      • 异步 IO 通信时,每次调用 API ,程序不必阻塞等待 API 返回结果,而是可以趁机做其它事。然后每隔一定时间(比如 100ms ),程序会检查操作系统是否执行完 IO 操作。如果执行完了,程序才回来检查 IO 操作的结果。
    • 使用异步 IO 通信,可以减少程序等待 IO 的耗时,腾出时间做其它事。比如在等待一个 IO 操作结束的期间,开始其它 IO 操作。
    • Python 中,基于协程来实现异步 IO 。创建 n 个协程,就可以并发执行 n 个 IO 操作。
  • 什么是协程?

    • 协程(coroutine)是指协同程序,是一种特殊的函数。
    • 原理:
      • 执行 n 个普通函数时,必须按顺序逐个执行。前一个函数执行完毕,才开始执行下一个函数。
      • 执行 n 个协程函数时,可以在执行一个函数时暂停,转去执行另一个函数。看起来像同时执行多个函数,实际上依然是先后执行,属于同一个线程。
      • 一个协程函数,可以在执行途中暂停,等会再继续执行。这种特性像 Python 的生成器函数。
    • 优点:
      • 创建协程的开销、协程之间切换执行的开销,比线程更小。因此处理 IO 密集型任务时,多协程的性能更高,支持更大并发量。
      • 创建多线程时,CPU 随时可能从一个线程切换执行另一个线程,用户不能控制这些线程的执行顺序,除非使用线程锁。而创建多个协程时,它们实际上是顺序执行的,用户可以控制执行顺序。
    • 缺点:
      • 创建多个协程时,依然属于同一个线程,因此不适合处理 CPU 密集型任务。
  • 定义函数时,添加关键字 async ,就会将该函数声明为协程。

    >>> import asyncio
    >>> async def main():
    ...    print('hello')
    ...
    >>> main()              # 直接调用 async 函数,并不会运行它,而是返回一个 coroutine 对象
    <coroutine object main at 0x000001D2E6385EC0>
    >>> asyncio.run(main()) # 使用 asyncio.run() 运行一个协程
    hello
    
  • 使用关键字 await 可以运行一个协程。

    >>> async def test():
    ...     return 0
    ...
    >>> async def main():
    ...     result = await test()
    ...     print(result)
    ...
    >>> asyncio.run(main())
    0
    
    • await 只能在 async 函数中使用,否则抛出 SyntaxError 异常。
    • await 只能调用实现了 __await__() 方法的对象,称为可等待对象,比如 coroutine、task 对象。

# 并行

  • 上例演示了如何运行单个协程。实际上,并发运行大量协程,才能体现协程的性能优势。

  • 使用 asyncio.gather() 可以并发运行多个协程。

    >>> async def test(name, num):
    ...     for i in range(num):
    ...         print(name, i)
    ...
    >>> async def main():
    ...     coroutine_list = [test('A', 3), test('B', 3)]   # 本例创建了 2 个协程。如果追求性能,则可以几万个协程
    ...     result = await asyncio.gather(*coroutine_list)  # 这一步是并发运行多个协程,等它们全部运行完毕,才继续执行后续代码
    ...     print(result)
    ...
    >>> asyncio.run(main())
    A 0
    A 1
    A 2
    B 0   # 虽然 asyncio.gather() 能够并发运行多个协程,但这里并没有并发运行。协程 A 运行完毕,才开始运行协程 B
    B 1
    B 2
    [None, None]
    
  • 执行一个协程函数时,如果遇到 await ,才允许 Python 解释器切换运行其它协程。因此上例应该改为:

    >>> async def test(name, num):
    ...     for i in range(num):
    ...         print(name, i)
    ...         await asyncio.sleep(0)  # 让当前协程睡眠几秒,从而允许切换运行其它协程
    ...
    >>> async def main():
    ...     coroutine_list = [test('A', 3), test('B', 3)]
    ...     result = await asyncio.gather(*coroutine_list)
    ...     print(result)
    ...
    >>> asyncio.run(main())
    A 0
    B 0   # 此时协程 A 尚未运行完毕,就切换运行协程 B
    A 1
    B 1
    A 2
    B 2
    [None, None]
    
  • Python3.11 增加了 asyncio.TaskGroup() ,作为 asyncio.gather() 的替代方案。

    async def main():
        async with asyncio.TaskGroup() as group:  # 创建一个 TaskGroup ,等其中所有协程运行完毕,才继续执行后续代码
            task1 = group.create_task(test(1))    # 将 coroutine 对象封装为一个 task 对象。需要保留对该 task 对象的引用,以免它尚未运行完毕,就被垃圾回收
            task2 = group.create_task(test(2))
    
    • asyncio.gather() 运行的任一协程抛出异常时,它依然会继续运行其它协程。
    • asyncio.TaskGroup() 运行的任一协程抛出异常时,它会终止运行其它协程。
  • 可以限制并行运行的协程数量:

    >>> async def test(semaphore):
    ...     async with semaphore:   # 在 semaphore 限制下运行协程
    ...         print('hello')
    ...         await asyncio.sleep(1)
    ...         print('world')
    ...
    >>> async def main():
    ...     semaphore = asyncio.Semaphore(3)   # 限制最大并行数量
    ...     coroutine_list = [test(semaphore) for i in range(5)]
    ...     return await asyncio.gather(*coroutine_list)
    ...
    >>> asyncio.run(main())
    hello
    hello
    hello
    world
    world
    world
    hello
    hello
    world
    world
    [None, None, None, None, None]
    

# import aiohttp

:Python 的第三方库,基于 asyncio 进行异步的 HTTP 通信。

  • 官方文档 (opens new window)

  • 安装:pip install aiohttp

  • 例:运行 HTTP 服务器

    from aiohttp import web
    
    async def index(request):
        uri = request.match_info.get('uri', '')
        return web.Response(text='location /{}'.format(uri), content_type='text/html')
    
    app = web.Application()
    app.add_routes([web.get('/', index),
                    web.get('/{uri}', index)])
    web.run_app(app)
    
  • 例:运行 HTTP 客户端

    >>> import aiohttp
    >>> import asyncio
    >>> async def test(session):
    ...     async with session.get('http://baidu.com') as response:
    ...         print(response.status)
    ...         print(await response.text())
    ...
    >>> async def main():
    ...     async with aiohttp.ClientSession() as session:          # 打开一个客户端连接池
    ...         coroutine_list = [test(session) for i in range(3)]  # 创建 3 个协程,并发运行
    ...         result = await asyncio.gather(*coroutine_list)
    ...         print(result)
    ...
    >>> asyncio.run(main())
    200
    <html>
    <meta http-equiv="refresh" content="0;url=http://www.baidu.com/">
    </html>
    
    200
    <html>
    <meta http-equiv="refresh" content="0;url=http://www.baidu.com/">
    </html>
    
    200
    <html>
    <meta http-equiv="refresh" content="0;url=http://www.baidu.com/">
    </html>
    
    [None, None, None]