对 Python 协程的理解

同步比较直观,编码和维护都比较容易,但是效率低;异步效率高,对于 callback 这种形式逻辑容易被代码割裂,代码可读性差,而异步协程的方式既看起来直观,又在效率上有保证。所以想在此谈谈网络请求中的异步表现形式——协程,并通过 python 代码展示各自的优缺点。

我的新书《LangChain编程从入门到实践》 已经开售!推荐正在学习AI应用开发的朋友购买阅读!
LangChain编程从入门到实践

基础概念

同步、异步与阻塞、非阻塞

当提到同步与异步,肯定会想到阻塞与非阻塞,网络 I/O 里所讨论的同步与异步,是指对于请求的发起者,是否需要等到请求的结果(同步),还是说请求完毕的时候以某种方式通知请求发起者(异步)。在这个语义环境下,阻塞与非阻塞,是指请求的受理者在处理某个请求的状态,如果在处理这个请求的时候不能做其它事情(请求处理时间不确定),那么称之为阻塞,否则为非阻塞。

简单理解

举个例子,同步的操作如下:浏览器首先发送第一个请求,等待服务器回复后,再发送第二个请求,依次类推,直到所有请求完成。异步的操作如下:浏览器发送第一个请求,可以不用等待服务器返回,可以继续发送第二个请求。

阻塞与非阻塞属于进程的 API 执行动作的方式,例如进行需要 read 数据,阻塞方式操作流程是:如果没有数据,则read会一直等着数据到来,才能进行后续的动作;而非阻塞则是read没有到数据后,则可以进行后续的动作,当有数据的时候再回来读取。通常 Linux 网络API默认都是阻塞的,例如connect、send、recv等。

协程演化

实现了两个同步IO任务taskIO_1()和taskIO_2(),则最后总耗时就是5秒。计算机中CPU的运算速率要远远大于IO速率,如果再要闲置很长时间去等待IO任务完成才能进行下一个任务的CPU计算,这样的任务执行效率很低。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import time
def taskIO_1(t):
print(f'开始运行{taskIO_1.__name__}')
time.sleep(t)
print(f'{taskIO_1.__name__}耗时{t}s')
def taskIO_2(t):
print(f'开始运行{taskIO_2.__name__}')
time.sleep(t)
print(f'{taskIO_2.__name__}耗时{t}s')
if __name__ == "__main__":
start=time.time()
taskIO_1(2)
taskIO_2(3)
print(f'总耗时{time.time()-start}s')

改进策略很容易想到的:能否在上述IO任务执行前暂且中断当前IO任务,进行另一个任务,当该IO任务完成后再唤醒该任务。在Python中生成器中的关键字yield可以实现中断功能,所以刚开始协程是基于生成器的变形进行实现的。

yield from 和 yield

yield在生成器中有中断的功能,可以传出值,也可以从函数外部接收值,而yield from的实现就是简化了yield操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def generator_1(toc: true
titles):
yield toc: true
titles
def generator_2(toc: true
titles):
yield from toc: true
titles

toc: true
titles = ['Python','Java','C++']
for toc: true
title in generator_1(toc: true
titles):
print('生成器1:',toc: true
title)
for toc: true
title in generator_2(toc: true
titles):
print('生成器2:',toc: true
title)

# 等价于yield from toc: true
titles
for toc: true
title in toc: true
titles:
yield toc: true
title 

yield from省去了很多异常的处理,不再需要我们手动编写,其内部已经实现大部分异常处理。详见PEP 380

  • 子生成器:yield from后的generator_1()生成器函数是子生成器
  • 委托生成器:generator_2()是程序中的委托生成器,它负责委托子生成器完成具体任务。
  • 调用方:main()是程序中的调用方,负责调用委托生成器。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    def generator_1(): # 子生成器
    total = 0
    while True:
    x = yield
    print('加',x)
    if not x:
    break
    total += x
    return total
    def generator_2(): # 委托生成器
    while True:
    total = yield from generator_1()
    def main(): # 调用方
    g1 = generator_1()
    g1.send(None)
    g1.send(2)
    g1.send(3)
    try:
    g1.send(None)
    except StopIteration as e:
    print(e.value)
    if __name__ == "__main__":
    main()
    yield from 建立调用方和子生成器的通道,在上述代码中main()每一次在调用send(value)时,value不是传递给了委托生成器generator_2(),而是借助yield from传递给了子生成器generator_1()中的yield,同理子生成器中的数据也是通过yield直接发送到调用方main()中。

    结合@asyncio.coroutine实现协程

    将同步IO任务的代码中修改成协程的用法,在同步IO任务的代码中使用的time.sleep(2)来假设任务执行了2秒,但在协程中yield from后面必须是子生成器函数,而time.sleep()并不是生成器,所以这里需要使用内置模块提供的生成器函数asyncio.sleep()。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    import time
    import asyncio
    @asyncio.coroutine
    def taskIO_1(t):
    print(f'开始运行{taskIO_1.__name__}')
    yield from asyncio.sleep(t)
    print(f'{taskIO_1.__name__}耗时{t}s')
    return taskIO_1.__name__

    @asyncio.coroutine
    def taskIO_2(t):
    print(f'开始运行{taskIO_2.__name__}')
    yield from asyncio.sleep(t)
    print(f'{taskIO_2.__name__}耗时{t}s')
    return taskIO_2.__name__

    @asyncio.coroutine
    def main():
    # 打包任务
    tasks=[taskIO_1(2),taskIO_2(3)]
    # done表示已完成的任务列表,pending表示未完成的任务列表
    done,pending=yield from asyncio.wait(tasks)
    # 遍历已完成任务并调用result()取出结果
    for work in done:
    print(f'协程无序返回值:{work.result()}')

    if __name__ == "__main__":
    start=time.time()
    # get_event_loop()获取了一个标准事件循环loop
    loop=asyncio.get_event_loop()
    try:
    # 运行协程,直到循环事件的所有事件都处理完才能完整结束
    loop.run_until_complete(main())
    finally:
    loop.close()
    print(f'总耗时{time.time()-start}s')
    当轮询到某个事件时(如taskIO_1()),直到遇到该任务中的yield from中断,开始处理下一个事件(如taskIO_2())),当yield from后面的子生成器完成任务时,该事件才再次被唤醒

    使用async和await实现协程

    Python 3.5开始引入了新的语法async和await,并更好地标识异步IO编码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    import time
    import asyncio

    async def taskIO_1(t):
    print(f'开始运行{taskIO_1.__name__}')
    await asyncio.sleep(t)
    print(f'{taskIO_1.__name__}耗时{t}s')
    return taskIO_1.__name__

    async def taskIO_2(t):
    print(f'开始运行{taskIO_2.__name__}')
    await asyncio.sleep(t)
    print(f'{taskIO_2.__name__}耗时{t}s')
    return taskIO_2.__name__

    async def main():
    # 打包任务
    tasks=[taskIO_1(2),taskIO_2(3)]
    # done表示已完成的任务列表,pending表示未完成的任务列表
    done,pending=await asyncio.wait(tasks)
    # 遍历已完成任务并调用result()取出结果
    for work in done:
    print(f'协程无序返回值:{work.result()}')

    if __name__ == "__main__":
    start=time.time()
    # get_event_loop()获取了一个标准事件循环loop
    loop=asyncio.get_event_loop()
    try:
    # 运行协程,直到循环事件的所有事件都处理完才能完整结束
    loop.run_until_complete(main())
    finally:
    loop.close()
    print(f'总耗时{time.time()-start}s')
    显然,协程让我们可以用同步的方式写异步的代码,代码可读性提高,减少采用线程方式之间的切换消耗,提高了并发效率,在 IO 密集型任务中很有用。

    参考链接

  • 怎样理解阻塞非阻塞与同步异步的区别?

对 Python 协程的理解

https://liduos.com/asyn-coroutine.html

作者

莫尔索

发布于

2022-02-04

更新于

2024-05-19

许可协议

评论