Python 备忘

代码风格

Python社区在风格上,普遍喜欢使用下划线

todo: 函数名使用驼峰,与js一致

debug

1
2
3
4
# 临时输出到文件
# https://www.guru99.com/reading-and-writing-files-in-python.html
with open("/tmp/log.txt", "a+") as f:
    print("test1", file=f)

异常

创建自定义异常

创建自定义异常

创建新的异常很简单——定义新的类,让它继承自 Exception

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class NetworkError(Exception):
    pass

class HostnameError(NetworkError):
    pass

class TimeoutError(NetworkError):
    pass

class ProtocolError(NetworkError):
    pass

try:
    msg = s.recv()
except TimeoutError as e:
    ...
except ProtocolError as e:
    ...

# 抛出异常 raise NetworkError('It failed')

Future

元编程

积累操控代码的能力,晚绑定

定义上下文管理器的简单方法

定义上下文管理器的简单方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import time
from contextlib import contextmanager

@contextmanager
def timethis(label):
    start = time.time()
    try:
        yield
    finally:
        end = time.time()
        print('{}: {}'.format(label, end - start))

# Example use
with timethis('counting'):
    n = 10000000
    while n > 0:
        n -= 1

yield 之前的代码会在上下文管理器中作为 enter() 方法执行, 所有在 yield 之后的代码会作为 exit() 方法执行。 如果出现了异常,异常会在yield语句那里抛出。

多任务

推荐使用 future

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import concurrent.futures
import time

def do_something(x):
    time.sleep(x)
    time.sleep(x)
    return x

'''
thing1 = do_something(2)
print(thing1)
thing2 = do_something(2)
print(thing2)
'''

# with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# ProcessPoolExecutor 可以 cancel,而ThreadPoolExecutor不行
# callback, 非阻塞
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
    future1 = executor.submit(do_something, 10)
    time.sleep(1)
    print(future1.cancel())
    # print(future1.result())
    print("end")
    '''
    # print(future1.result())
    print(1)
    future2 = executor.submit(do_something, 2)
    print(2)
    # print(future2.result())
    '''

线程和进程中的 future

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from concurrent.futures import Future, ThreadPoolExecutor
import time

def test(f):
    result = f.result(timeout=None) # block
    print(result) # timeout=2.1
    return result
f = Future()

with ThreadPoolExecutor(max_workers=5) as executor:
    f_test = executor.submit(test, f)
    time.sleep(2)
    f.set_result("finish")
    print(f_test.result())
    # print(future.result())

asyncio

参考 /post/%E7%BC%96%E7%A8%8B/async-msg-sync-cmd/

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import asyncio

f = asyncio.Future()
f.set_result("hi")
try:
    result = await asyncio.wait_for(f, timeout=1)
    print(result)
except asyncio.TimeoutError:
    # print('timeout!')
    raise asyncio.TimeoutError(f'timeout: 1; message_id: 123')

参考