前言
2024-04-03更新: Dynatalk 实现了本文中的想法
在 CodeLab Scratch 有这样一个需求: Scratch 积木执行同步语义,积木背后的通信是采用异步的 pub/sub。
解决这件事的关键代码我们在下边有列出。
本文试图在 JavaScript 之外的语言中也实现这种想法。它看起来只是一个简易的 RPC client(实现上,确实也是参考了Scratch jsonrpc),如果你想寻找 RPC 库,市面上有大把实现,本文不值一读。
如果你也对消息感兴趣,对于按照自己的想法构建事物感兴趣,而不是沉迷于各类模式和技巧,那么可将本文视为海滩上的一粒鹅软石,它并不珍贵,也不精美,它只是希望引起你的兴趣:引起你对打磨它的那些潮水、沙粒、软石,甚至大海的兴趣。
当然它也是我自己的一个临时 playground/workspace,作为编程技能的一个小练习,在 JavaScript/Python/Squeak 之间切换,以熟悉它们的异同点。
请记住,我们的 view point 在消息上。在这种视角上,RPC 是利用消息的一种形式,如此一来,这种经典模式并不存在固态的结构,它只是对消息的一种看法。
思路
思路是这样的,执行同步指令的时候,发送(pub)一个指令出去(在远端执行),接着返回一个 Promise,直到 Promise 完成之后,指令才往下执行,如此一来,就完成了同步的语义。
那么如何知道 Promise 是否完成呢?
将这些 Promise 采集在一起,每当收到(sub)远端发来的消息(有些消息是发出去的指令运行的结果),就去解决对应的 Promise。
如何知道收到的消息与 Promise 的对应关系呢?
使用 message id!
JavaScript
我们首先在 JavaScript(CodeLab Scratch) 里实现了 建立在异步消息之上的同步指令
我们构建了一个 js object: this._promises = {}
, 用于存放指令 Promise 的 resolve。
并且设置了一个超时时间,如果超过这个时间,Promise 还没有被解决,则打印错误日志。
1
2
3
4
5
6
7
8
9
10
|
get_reply_message(messageID, timeout=5000) {
return new Promise((resolve, reject) => {
this._promises[messageID] = resolve;
setTimeout(() => {
if (this._promises[messageID]){
console.error(`timeout(${timeout}ms)`)
}
}, timeout);
});
}
|
在每次收到 Adapter 的消息的时候,都检查下是否有对应的 Promise,如果有,就解决它。
1
2
3
4
5
|
if (typeof message_id !== "undefined") {
this._promises[message_id] &&
this._promises[message_id](content) &&
delete this._promises[message_id];
}
|
完整的源码细节,参考:codelab_adapter_base.js
Python
Promise 在 Python 里的对应物是 Future。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
import asyncio
import uuid
_futures = {}
async def in_coming_message(msg):
'''
-> message
msg
content
message_id
in a task
'''
inCommingMessageId = msg['message_id']
future = _futures.get(inCommingMessageId)
if future:
future.set_result(f"{msg['content']}-{msg['message_id']}")
async def pub_cmd(message_id):
# message_id = uuid.uuid4().hex
future = asyncio.Future()
_futures[message_id] = future
# todo timeout
return await future
async def mock_sub():
'''
模拟sub接收消息
'''
await asyncio.sleep(2)
for i in [1, 2, 3]:
msg = {}
msg['content'] = "reply"
msg["message_id"] = i
await in_coming_message(msg)
await asyncio.sleep(1)
async def main():
# loop = asyncio.get_running_loop()
asyncio.create_task(mock_sub())
print(await pub_cmd(1))
print(await pub_cmd(2))
print(await pub_cmd(3))
asyncio.run(main())
|
可以看出,要实现我们的目标: 建立在异步消息之上的同步指令
, 并不一定需要使用 asyncio.Future
, 使用多线程应该也不难,进一步思考,似乎只要能够 发送消息,之后根据收到的信号(消息),解除阻塞
,就能实现我们的目标。(实际上,是自己 DIY 了 Future/Promise)
如果你打算自己在Python中实现 Future,可以参考 ROS 1.0 Python 工具箱。
2023-08-02 更新
以下是在线程中实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
# import threading
from concurrent.futures import Future, ThreadPoolExecutor
_futures = {}
def in_coming_message(msg):
'''
-> message
msg
content
message_id
in a task
'''
inCommingMessageId = msg['message_id']
future = _futures.get(inCommingMessageId)
if future:
future.set_result(f"{msg['content']}-{msg['message_id']}")
def _send_and_wait(timeout):
with ThreadPoolExecutor(max_workers=1) as executor:
message_id = random.randint(1, 100000000)
future = Future()
task = executor.submit(lambda f: f.result(timeout=timeout), future)
_futures[message_id] = future # todo: threadsafe
# message["message_id"] = message_id
# send the message
return task.result() # block and wait the result or TimeoutError
|
Squeak
Squeak 对并发有很好的支持。(并发的关键是处理多个任务的能力,并行的关键是同时处理多个任务的能力,Squeak 对多核心的利用目前似乎还不好(并行))
Squeak 支持多种并发模型:
我们在此使用Squeak Actors, 因其实现了 Promise 的概念, 这个库实现的Promises与 Promises/A+ 相似。
值得注意的是在Squeak Actors中, 每个Actors都是 Smalltalk Process,而 Squeak 的Process也只是一个对象,可以在环境里随时与之交互!Squeak 在tools里还提供了 Process Browser
!
我们暂不使用 Actors 编程模型,只使用它携带的原始 Promise
先展示Promise的简单用法:
1
2
3
|
p := Promise new.
[Transcript show: (p wait)] fork
p resolveWith: 1
|
接下来,构建一个类来实现我们的想法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
Object subclass: #SyncCmdPromise
instanceVariableNames: 'promise_resolves'
classVariableNames: ''
poolDictionaries: ''
category: 'mylab'.
"以下是方法"
initialize
promise_resolves := Dictionary new.
promise_resolves
^promise_resolves
pub_cmd: cmd
"to pub message: cmd"
| uuid p |
"uuid := UUID new asString."
uuid := cmd.
p := Promise new.
promise_resolves at:uuid put:p.
^p.
|
写一个方法,模拟接收消息,并 resolve promise
1
2
3
4
5
6
7
8
9
10
11
|
mock_sub: message_id_list
"使用线程处理外部消息,目前做法草率,存在线程安全问题,线程间不要共享状态,只做简单通信,用于控制时序(采纳ZeroMQ的观点)"
[message_id_list do: [:message_id | (Delay forSeconds: 2) wait. (promise_resolves includesKey: message_id) ifTrue: [
(promise_resolves at:message_id) resolveWith: 'reply ', message_id
].
].] fork.
Transcript show: (self pub_cmd: 'cmd1') wait.Beeper beep.
Transcript show: (self pub_cmd: 'cmd2') wait.Beeper beep.
Transcript show: (self pub_cmd: 'cmd3') wait.Beeper beep.
|
完整测试它:
1
2
|
s := SyncCmdPromise new.
s mock_sub: #('cmd1' 'cmd2' 'cmd3')
|
ps: 如果做实验时,UI 不小心被 wait 阻塞住了,可以按下 Interrupt Key 打破 UI 阻塞。
在不同系统上的规则为:
|
|
Macintosh |
Cmd-. |
Linux |
Alt-. |
Windows |
Alt-. |
Snap!
2023-08-02 更新
基本思路:
Demo
以上思路,也可以在 Scratch 里实现: 既可以在 Scratch 表层(积木层)实现(使用 CodeLab Scratch 中的 MQTT 或者 EIM 扩展都能实现),也可以在 Scratch 的下层(JavaScript)里实现。
总结
在不同语言中实现上述想法的时候,我观察到 消息
(异步) + Promise/Future
是一种非常强大的模式,灵活且简单。
消息
(尤其是pub/sub模式)对并发提供了很好的支持(时序无关),而 Promise/Future
很适合描述时序相关的任务,它们彼此没有耦合,可以随意组合使用。
这块我准备长期探索, 而 Squeak 的极端晚绑定是探索这些新想法的理想之地。
The Big Idea is Messaging. – Alan Kay
参考