Python 备忘
代码风格
Python社区在风格上,普遍喜欢使用下划线
todo: 函数名使用驼峰,与js一致
debug
1
2
3
4
|
# 临时输出到文件
# https://www.guru99.com/reading-and-writing-files-in-python.html
with open("/tmp/log.txt", "a+") as f:
print("test1", file=f)
|
异常
创建自定义异常
创建自定义异常
创建新的异常很简单——定义新的类,让它继承自 Exception
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
class NetworkError(Exception):
pass
class HostnameError(NetworkError):
pass
class TimeoutError(NetworkError):
pass
class ProtocolError(NetworkError):
pass
try:
msg = s.recv()
except TimeoutError as e:
...
except ProtocolError as e:
...
# 抛出异常 raise NetworkError('It failed')
|
Future
元编程
积累操控代码的能力,晚绑定
定义上下文管理器的简单方法
定义上下文管理器的简单方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import time
from contextlib import contextmanager
@contextmanager
def timethis(label):
start = time.time()
try:
yield
finally:
end = time.time()
print('{}: {}'.format(label, end - start))
# Example use
with timethis('counting'):
n = 10000000
while n > 0:
n -= 1
|
yield 之前的代码会在上下文管理器中作为 enter() 方法执行, 所有在 yield 之后的代码会作为 exit() 方法执行。 如果出现了异常,异常会在yield语句那里抛出。
多任务
推荐使用 future
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
import concurrent.futures
import time
def do_something(x):
time.sleep(x)
time.sleep(x)
return x
'''
thing1 = do_something(2)
print(thing1)
thing2 = do_something(2)
print(thing2)
'''
# with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# ProcessPoolExecutor 可以 cancel,而ThreadPoolExecutor不行
# callback, 非阻塞
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
future1 = executor.submit(do_something, 10)
time.sleep(1)
print(future1.cancel())
# print(future1.result())
print("end")
'''
# print(future1.result())
print(1)
future2 = executor.submit(do_something, 2)
print(2)
# print(future2.result())
'''
|
线程和进程中的 future
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
from concurrent.futures import Future, ThreadPoolExecutor
import time
def test(f):
result = f.result(timeout=None) # block
print(result) # timeout=2.1
return result
f = Future()
with ThreadPoolExecutor(max_workers=5) as executor:
f_test = executor.submit(test, f)
time.sleep(2)
f.set_result("finish")
print(f_test.result())
# print(future.result())
|
asyncio
参考 /post/%E7%BC%96%E7%A8%8B/async-msg-sync-cmd/
1
2
3
4
5
6
7
8
9
10
|
import asyncio
f = asyncio.Future()
f.set_result("hi")
try:
result = await asyncio.wait_for(f, timeout=1)
print(result)
except asyncio.TimeoutError:
# print('timeout!')
raise asyncio.TimeoutError(f'timeout: 1; message_id: 123')
|
参考