前言

前些日子花了不少时间重构codelab-adapter和增加新特性。

codelab-adapter核心部分是异步的。

陆续写了一些异步代码,读了不少异步编程相关的技术文章,踩了一些坑,也学了一些技巧,在此做个梳理。

Python与异步编程

正如Python中的异步IO:完整的演练一文所言,Async IO是一种并发编程设计,在Python中得到了专门的支持,从Python 3.4到3.7在迅速发展着,目前还在不断变化中。所以抛掉那些老旧的教程,扔掉那些谈论yeild from的教程就对了。

如果你是初学者,专注在新的异步原语(async/await)上, 别去管它的底层是不是yeild实现的,对于实现原理你应该完全抛开。正如你在Python中写下print("hello world")时,不必理解Python(CPython)中的print函数在C语言层面是怎么实现的,专注在你当下正在学习的概念上,别让无关紧要的复杂度压垮你。

并行与并发

前头提到Async IO是一种并发编程设计,我们先来解释下并发

并发并行是常常被同时提及的概念。

按维基百科说法:

并行计算(英语:parallel computing)一般是指许多指令得以同时进行的计算模式。 并发性(英语:Concurrency)是指在一个系统中,拥有多个计算,这些计算有同时执行的特性,而且他们之间有着潜在的交互。

并行性指同时执行多个操作。并发性是一个比并行性更广泛的术语。它表明多个任务能够以重叠的方式运行。

async IO是一种并发编程风格(单线程),它不具有并行性,与js默认采用的编程模型相似。

以下内容假设在你本地 Python>=3.7

hello world

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# python3.7
import asyncio

async def main():
    print('hello')
    await asyncio.sleep(1)
    print('world')

asyncio.run(main())
# hello
# world

可等待对象(Awaitables)

如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象。许多 asyncio API 都被设计为接受可等待对象。

可等待 对象有三种主要类型: 协程(coroutines), 任务(Tasks) 和 Future.

协程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

在官方文档中, “协程” 可用来表示两个紧密关联的概念:

  • 协程函数: 定义形式为 async def 的函数; (async def nested())
  • 协程对象: 调用 协程函数 所返回的对象。(nested())

任务(Tasks)

Tasks用于并发执行协程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested()) 
    # 在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future() 函数。
    # task = asyncio.ensure_future(coro())
    

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    # 可以类比为thread start,携程api与线程api有很多相似之处
    await task

asyncio.run(main())

任务会在 get_running_loop() 返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError

官方例子很糟糕,没有演示并发, 给出一个并发演示。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import asyncio
import time

async def coro(seq) -> list:
    """'IO' wait time is proportional to the max element."""
    await asyncio.sleep(max(seq))
    return list(reversed(seq))

async def main():
    t = asyncio.create_task(coro([3, 2, 1]))
    t2 = asyncio.create_task(coro([10, 5, 0]))  # Python 3.7+
    print('Start:', time.strftime('%X'))
    a = await asyncio.gather(t, t2)
    print('End:', time.strftime('%X'))  # Should be 10 seconds
    print(f'Both tasks done: {all((t.done(), t2.done()))}')
    return a

运行结果

1
2
3
4
5
6
>>> a = asyncio.run(main())
Start: 16:20:11
End: 16:20:21
Both tasks done: True
>>> a
[[1, 2, 3], [0, 5, 10]]

并发执行协程还可用asyncio.gather。 如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。

请求取消 Task 对象。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Expected output:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now

Future

Future 表示一个异步操作的 最终结果。

在 asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调风格的代码。需要注意大多数asyncio调度函数不允许传递关键字参数。为此,请使用functools.partial()

通常情况下 没有必要 在应用层级的代码中创建 Future 对象。Future 对象有时会由库和某些 asyncio API 暴露给用户。用作可等待对象。

Future 和 Promise 很像.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 这个例子并不好,引入了不必要的复杂性(线程)
from threading import Thread

async def fetch(url):
    response = await aiohttp.request('GET', url)
    return await response.text()

def start_background_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
# Create a new loop
new_loop = asyncio.new_event_loop()
# Assign the loop to another thread
t = Thread(target=start_background_loop, args=(new_loop,))
t.start()
# Give it some async work
future = asyncio.run_coroutine_threadsafe(
    fetch("http://www.google.com"), 
    new_loop
)
# Wait for the result
print(future.result())
# Do it again but with a callback
asyncio.run_coroutine_threadsafe(
    fetch("http://www.github.com"),
    new_loop
).add_done_callback(lambda future: print(future.result()))

超时问题: 来自其他线程的日程安排

Task对象与Future对象

asyncio.Task 从 Future 继承了其除 Future.set_result() 和 Future.set_exception() 以外的所有 API。

Queue

为协程设计的queue,与用于线程的queue,两个类在设计上相似。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())

坑与技巧

Controlling Python Async Creep一文给出了很多精彩的总结。

如文中所言,许多情况下,我们可能被迫与同步代码交互。尽管异步编程在Python存在多时,但正式被纳入语言中的时间还不长,所以大量第三方库都是同步风格的代码。于是我们会面临文中所说的异步蠕变(Async Creep)的问题: 异步代码会要求调用它的代码也是异步的。

这个过渡阶段可能会长期存在,让同步代码与异步代码一同工作的技巧,就显得有实际价值了。以下是Cristian Medina总结的一些技巧

等待异步代码块

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import asyncio
import aiohttp
async def fetch(url):
    response = await aiohttp.request('GET', url)
    return await response.text()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
    asyncio.ensure_future(fetch("http://www.google.com")),
    asyncio.ensure_future(fetch("http://www.github.com")),
    asyncio.ensure_future(fetch("http://www.reddit.com"))
))

asyncio.ensure_future()将异步函数转换为协程,asyncio.gather()将它们打包在一起,并发执行。同时loop.run_until_complete()阻塞直到所有调用完成。

使用线程

创建一个作为worker的独立线程。它运行自己的事件循环,并使用线程安全的asyncio方法来使其工作。

我们在前头举过这个例子,当时用于解释Future的回调。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 这个例子并不好,引入了不必要的复杂性(线程)
from threading import Thread

async def fetch(url):
    response = await aiohttp.request('GET', url)
    return await response.text()

def start_background_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()
# Create a new loop
new_loop = asyncio.new_event_loop()
# Assign the loop to another thread
t = Thread(target=start_background_loop, args=(new_loop,))
t.start()
# Give it some async work
future = asyncio.run_coroutine_threadsafe(
    fetch("http://www.google.com"), 
    new_loop
)
# Wait for the result
print(future.result())
# Do it again but with a callback
asyncio.run_coroutine_threadsafe(
    fetch("http://www.github.com"),
    new_loop
).add_done_callback(lambda future: print(future.result()))

run_coroutine_threadsafe返回Future对象。我们可以使用result(timeout)方法wait它。

使用回调风格时,add_done_callback(function)中的回调函数(function)收到future作为参数。

同时支持异步和同步调用的函数

这个技巧最早我在cozmo-python-sdk读到。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import inspect
import requests

def is_async_caller():
    """Figure out who's calling."""
    # Get the calling frame
    caller = inspect.currentframe().f_back.f_back
    # Pull the function name from FrameInfo
    func_name = inspect.getframeinfo(caller)[2]
    # Get the function object
    f = caller.f_locals.get(
        func_name, 
        caller.f_globals.get(func_name)
    )
    # If there's any indication that the function object is a 
    # coroutine, return True. inspect.iscoroutinefunction() should
    # be all we need, the rest are here to illustrate.
    if any([inspect.iscoroutinefunction(f),
            inspect.isgeneratorfunction(f),
            inspect.iscoroutine(f), inspect.isawaitable(f),
            inspect.isasyncgenfunction(f) , inspect.isasyncgen(f)]):
        return True
    else:
        return False
def fetch(url):
    """GET the URL, do it asynchronously if the caller is async"""

    # Figure out which function is calling us
    if is_async_caller():
        print("Calling ASYNC method")
        # Run the async version of this method and
        # print the result with a callback
        asyncio.run_coroutine_threadsafe(
            _async_fetch(url), 
            new_loop
        ).add_done_callback(lambda f: print(f.result()))
    else:
        print("Calling BLOCKING method")
        # Run the synchronous version and print the result
        print(_sync_fetch(url))

def _sync_fetch(url):
    """Blocking GET"""
    return requests.get(url).content

async def _async_fetch(url):
    """Async GET"""
    resp = await aiohttp.request('GET', url)
    return await resp.text()

def call_sync_fetch():
    """Blocking fetch call"""
    fetch("http://www.github.com")

async def call_async_fetch():
    """Asynchronous fetch call (no different from sync call 
       except this function is defined async)"""
    fetch("http://www.github.com")

# Perform a blocking GET
call_sync_fetch()
# Perform an async GET
loop = asyncio.get_event_loop()
loop.run_until_complete(call_async_fetch())

这里的技巧是在is_async_caller()中使用inspect判断函数的调用者是不是协程。这是自省的应用

调试工具

IPython

IPython 7.0, Async REPL

补遗

async/await 与 promise 语义等价

eim 的同步积木(subpub rpc)核心是在Promise外部修改resolve, 在python中对应修改Future。(Future.set_result())

参考