前言
前些日子花了不少时间重构codelab-adapter和增加新特性。
codelab-adapter核心部分是异步的。
陆续写了一些异步代码,读了不少异步编程相关的技术文章,踩了一些坑,也学了一些技巧,在此做个梳理。
Python与异步编程
正如Python中的异步IO:完整的演练一文所言,Async IO是一种并发编程设计,在Python中得到了专门的支持,从Python 3.4到3.7在迅速发展着,目前还在不断变化中。所以抛掉那些老旧的教程,扔掉那些谈论yeild from
的教程就对了。
如果你是初学者,专注在新的异步原语(async/await)上, 别去管它的底层是不是yeild
实现的,对于实现原理你应该完全抛开。正如你在Python中写下print("hello world")
时,不必理解Python(CPython)中的print
函数在C语言层面是怎么实现的,专注在你当下正在学习的概念上,别让无关紧要的复杂度压垮你。
并行与并发
前头提到Async IO是一种并发
编程设计,我们先来解释下并发。
并发与并行是常常被同时提及的概念。
按维基百科说法:
并行计算(英语:parallel computing)一般是指许多指令得以同时进行的计算模式。
并发性(英语:Concurrency)是指在一个系统中,拥有多个计算,这些计算有同时执行的特性,而且他们之间有着潜在的交互。
并行性指同时
执行多个操作。并发性是一个比并行性更广泛的术语。它表明多个任务能够
以重叠的方式运行。
async IO是一种并发编程风格(单线程),它不具有并行性,与js默认采用的编程模型相似。
以下内容假设在你本地 Python>=3.7
hello world
1
2
3
4
5
6
7
8
9
10
11
|
# python3.7
import asyncio
async def main():
print('hello')
await asyncio.sleep(1)
print('world')
asyncio.run(main())
# hello
# world
|
如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象。许多 asyncio API 都被设计为接受可等待对象。
可等待 对象有三种主要类型: 协程(coroutines), 任务(Tasks) 和 Future.
协程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import asyncio
async def nested():
return 42
async def main():
# Nothing happens if we just call "nested()".
# A coroutine object is created but not awaited,
# so it *won't run at all*.
nested()
# Let's do it differently now and await it:
print(await nested()) # will print "42".
asyncio.run(main())
|
在官方文档中, “协程” 可用来表示两个紧密关联的概念:
- 协程函数: 定义形式为
async def
的函数; (async def nested()
)
- 协程对象: 调用 协程函数 所返回的对象。(
nested()
)
任务(Tasks)
Tasks用于并发
执行协程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import asyncio
async def nested():
return 42
async def main():
# Schedule nested() to run soon concurrently
# with "main()".
task = asyncio.create_task(nested())
# 在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future() 函数。
# task = asyncio.ensure_future(coro())
# "task" can now be used to cancel "nested()", or
# can simply be awaited to wait until it is complete:
# 可以类比为thread start,携程api与线程api有很多相似之处
await task
asyncio.run(main())
|
任务会在 get_running_loop()
返回的循环中执行,如果当前线程没有在运行的循环则会引发 RuntimeError
官方例子很糟糕,没有演示并发, 给出一个并发演示。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import asyncio
import time
async def coro(seq) -> list:
"""'IO' wait time is proportional to the max element."""
await asyncio.sleep(max(seq))
return list(reversed(seq))
async def main():
t = asyncio.create_task(coro([3, 2, 1]))
t2 = asyncio.create_task(coro([10, 5, 0])) # Python 3.7+
print('Start:', time.strftime('%X'))
a = await asyncio.gather(t, t2)
print('End:', time.strftime('%X')) # Should be 10 seconds
print(f'Both tasks done: {all((t.done(), t2.done()))}')
return a
|
运行结果
1
2
3
4
5
6
|
>>> a = asyncio.run(main())
Start: 16:20:11
End: 16:20:21
Both tasks done: True
>>> a
[[1, 2, 3], [0, 5, 10]]
|
并发
执行协程还可用asyncio.gather。 如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。
请求取消 Task 对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
async def cancel_me():
print('cancel_me(): before sleep')
try:
# Wait for 1 hour
await asyncio.sleep(3600)
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
raise
finally:
print('cancel_me(): after sleep')
async def main():
# Create a "cancel_me" Task
task = asyncio.create_task(cancel_me())
# Wait for 1 second
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("main(): cancel_me is cancelled now")
asyncio.run(main())
# Expected output:
#
# cancel_me(): before sleep
# cancel_me(): cancel sleep
# cancel_me(): after sleep
# main(): cancel_me is cancelled now
|
Future
Future 表示一个异步操作的 最终结果。
在 asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调
风格的代码。需要注意大多数asyncio调度函数不允许传递关键字参数。为此,请使用functools.partial()
通常情况下 没有必要 在应用层级的代码中创建 Future 对象。Future 对象有时会由库和某些 asyncio API 暴露给用户。用作可等待对象。
Future 和 Promise 很像.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
# 这个例子并不好,引入了不必要的复杂性(线程)
from threading import Thread
async def fetch(url):
response = await aiohttp.request('GET', url)
return await response.text()
def start_background_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
# Create a new loop
new_loop = asyncio.new_event_loop()
# Assign the loop to another thread
t = Thread(target=start_background_loop, args=(new_loop,))
t.start()
# Give it some async work
future = asyncio.run_coroutine_threadsafe(
fetch("http://www.google.com"),
new_loop
)
# Wait for the result
print(future.result())
# Do it again but with a callback
asyncio.run_coroutine_threadsafe(
fetch("http://www.github.com"),
new_loop
).add_done_callback(lambda future: print(future.result()))
|
超时问题: 来自其他线程的日程安排
Task对象与Future对象
asyncio.Task 从 Future 继承了其除 Future.set_result() 和 Future.set_exception() 以外的所有 API。
Queue
为协程设计的queue,与用于线程的queue,两个类在设计上相似。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())
|
坑与技巧
Controlling Python Async Creep一文给出了很多精彩的总结。
如文中所言,许多情况下,我们可能被迫与同步代码交互。尽管异步编程在Python存在多时,但正式被纳入语言中的时间还不长,所以大量第三方库都是同步风格的代码。于是我们会面临文中所说的异步蠕变
(Async Creep)的问题: 异步代码会要求调用它的代码也是异步的。
这个过渡阶段可能会长期存在,让同步代码与异步代码一同工作的技巧,就显得有实际价值了。以下是Cristian Medina总结的一些技巧
等待异步代码块
1
2
3
4
5
6
7
8
9
10
11
|
import asyncio
import aiohttp
async def fetch(url):
response = await aiohttp.request('GET', url)
return await response.text()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
asyncio.ensure_future(fetch("http://www.google.com")),
asyncio.ensure_future(fetch("http://www.github.com")),
asyncio.ensure_future(fetch("http://www.reddit.com"))
))
|
asyncio.ensure_future()
将异步函数转换为协程,asyncio.gather()
将它们打包在一起,并发执行。同时loop.run_until_complete()
阻塞直到所有调用完成。
使用线程
创建一个作为worker的独立线程。它运行自己的事件循环,并使用线程安全的asyncio方法来使其工作。
我们在前头举过这个例子,当时用于解释Future的回调。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
# 这个例子并不好,引入了不必要的复杂性(线程)
from threading import Thread
async def fetch(url):
response = await aiohttp.request('GET', url)
return await response.text()
def start_background_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
# Create a new loop
new_loop = asyncio.new_event_loop()
# Assign the loop to another thread
t = Thread(target=start_background_loop, args=(new_loop,))
t.start()
# Give it some async work
future = asyncio.run_coroutine_threadsafe(
fetch("http://www.google.com"),
new_loop
)
# Wait for the result
print(future.result())
# Do it again but with a callback
asyncio.run_coroutine_threadsafe(
fetch("http://www.github.com"),
new_loop
).add_done_callback(lambda future: print(future.result()))
|
run_coroutine_threadsafe
返回Future对象。我们可以使用result(timeout)
方法wait
它。
使用回调风格时,add_done_callback(function)
中的回调函数(function
)收到future作为参数。
同时支持异步和同步调用的函数
这个技巧最早我在cozmo-python-sdk读到。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
import inspect
import requests
def is_async_caller():
"""Figure out who's calling."""
# Get the calling frame
caller = inspect.currentframe().f_back.f_back
# Pull the function name from FrameInfo
func_name = inspect.getframeinfo(caller)[2]
# Get the function object
f = caller.f_locals.get(
func_name,
caller.f_globals.get(func_name)
)
# If there's any indication that the function object is a
# coroutine, return True. inspect.iscoroutinefunction() should
# be all we need, the rest are here to illustrate.
if any([inspect.iscoroutinefunction(f),
inspect.isgeneratorfunction(f),
inspect.iscoroutine(f), inspect.isawaitable(f),
inspect.isasyncgenfunction(f) , inspect.isasyncgen(f)]):
return True
else:
return False
def fetch(url):
"""GET the URL, do it asynchronously if the caller is async"""
# Figure out which function is calling us
if is_async_caller():
print("Calling ASYNC method")
# Run the async version of this method and
# print the result with a callback
asyncio.run_coroutine_threadsafe(
_async_fetch(url),
new_loop
).add_done_callback(lambda f: print(f.result()))
else:
print("Calling BLOCKING method")
# Run the synchronous version and print the result
print(_sync_fetch(url))
def _sync_fetch(url):
"""Blocking GET"""
return requests.get(url).content
async def _async_fetch(url):
"""Async GET"""
resp = await aiohttp.request('GET', url)
return await resp.text()
def call_sync_fetch():
"""Blocking fetch call"""
fetch("http://www.github.com")
async def call_async_fetch():
"""Asynchronous fetch call (no different from sync call
except this function is defined async)"""
fetch("http://www.github.com")
# Perform a blocking GET
call_sync_fetch()
# Perform an async GET
loop = asyncio.get_event_loop()
loop.run_until_complete(call_async_fetch())
|
这里的技巧是在is_async_caller()
中使用inspect
判断函数的调用者是不是协程。这是自省
的应用
调试工具
IPython
IPython 7.0, Async REPL
补遗
async/await 与 promise 语义等价
eim 的同步积木(subpub rpc)核心是在Promise外部修改resolve, 在python中对应修改Future。(Future.set_result()
)
参考