这是Tornado系列的开篇,Tornado作为异步web框架2010年发布1.0版本。可以说Python社区搞了这么多年,为了获得比多线程多进程更高的性能,经历了Twisted、Tornado、yield、gevent、yield from、asyncio。也不知道什么时候才是一个尽头~~。Tornado过了这么多年还算开发活跃(一个大神扛起了一片天啊),不同于多线程模型的一潭死水,看它的代码你会发现它还是紧跟潮流的,而且有一点。它的内部虽然一直在变动,但是提供给用户的使用接口是很稳定的,后续就会发现这个坑有多大
截止目前最新版本的代码是4.5,排除测试总代码量大约为12000行。我个人喜欢看精简版本的代码,所以本文为1.0.0版本。总代码量为4200行,比较容易看懂。毕竟别人优化了八年,期待一个星期精读能搞明白各种工程优化是不现实的。
IOLoop
说到异步非阻塞大多数人都知道是异步IO模型(select、poll、epoll、kqueue)。没错在阻塞模型中比如使用socket.recv
它会主线程造成阻塞。这直接导致了在python中需要使用多线程或者多进程模型才能够同时处理多个请求。可是在异步模型中允许对文件描述符进行监听,操作系统提供功能,当文件的状态发生变化(可读、可写、异常)程序能够获得这一状态。对我们来说就是当明确知道某文件存在什么事件的时候去触发相应的处理函数。注意,这就是一个回调、回调、回调(重要的事情说三遍)
可以把IOLoop当做一个停不下来的死循环,类似下面这样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| import time import bisect from selectors import DefaultSelector
class IOLoop:
def __init__(self): self._callback = set() self._timeout = [] self.ioloop = DefaultSelector()
@classmethod def instance(cls): if not hasattr(cls, '_instance'): cls._instance = cls() return cls._instance
def add_handler(self, fd, event, func): self.ioloop.register(fd, event, func)
def update_handler(self, fd, event): temp = ioloop.get_key(fd) ioloop.unregister(fd) ioloop.register(fd, event, temp.data)
def add_callback(self, func): self._callback.add(func)
def add_timeout(self, t, func): class _T: def __init__(self, t, func): self.t = t self.callback = func
def __lt__(self, other): return self.t < other.t
def __gt__(self, other): return self.t > other.t
t = _T(t, func) bisect.insort(self._timeout, t)
def start(self):
while 1: for i in self._callback.copy(): i() self._callback.remove(i)
for i in self._timeout.copy(): if time.time() > i.t: i.callback() self._timeout.remove(i) else: break
for k, v in self.ioloop.select(timeout=0.2): k.data()
|
你只需要记住以下几点
- IOLoop是一个单例
- IOLoop提供了处理三种情况的方法,分别是回调形式(_callback)、定时器形式(_timeout)、网络IO(selectors)
- 最后是一个while 1的死循环
Callback
正因为IOLoop是一个单例。所以所有的函数最终都通过add_callback、add_timeout、add_handler放置到了IOLoop下面,最终被执行start依次调用,再调用的过程中,这些函数又使用ioloop.IOLoop.instance().add_callback等不断的加入一些事件处理。导致形成了一个callback链条,让我们最终得到结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ import socket from urllib.parse import urlparse
done = False selector = DefaultSelector()
def get(url): parse = urlparse(url) s = socket.socket() s.setblocking(False) try: s.connect((parse.hostname, 80)) except BlockingIOError: pass selector.register(s.fileno(), EVENT_WRITE, lambda: connected(s, parse.path, parse.hostname))
def connected(s, path, host): selector.unregister(s.fileno()) s.send(('GET {} HTTP/1.0\r\nHost:{}\r\n\r\n'.format(path, host)).encode()) buf = [] selector.register(s.fileno(), EVENT_READ, lambda: readable(s, buf))
def readable(s, buf): global done chunk = s.recv(1024) if chunk: buf.append(chunk) else: done = True selector.unregister(s.fileno()) print((b''.join(buf)).decode()) s.close()
get("http://httpbin.org/ip")
while not done: events = selector.select() for k, v in events: k.data()
|
如上例.当我们调用get()开始执行代码的时候它只是开启了第一步创建了一个socket链接并在selector中注册了一个事件,等待READ事件被触发,k.data()被调用,即connected函数被调用,发送数据之后再次注册READ事件。依然等待直到k.data()被调用,即readable被调用。最后直至数据读取完成。表示整个过程结束
优化
观察IOLoop例子中的start死循环可以知道。它其实也是伴随着阻塞的,在self.ioloop.select(timeout=0.2)
中如果持续0.2秒没有等待的IO事件发生,那么会阻塞0.2秒,看似0.2秒也不长,但其实非常大量的事件并不是IO事件,而是在self._callback集合中。因此从效率方面考虑添加_callback的时候触发IO事件,让select不再去阻塞那0.2秒,因此创建了一个管道对象来解决了这个问题,当添加的_callback的时候同时向管道写入一个字符。那么就不存在0.2秒的阻塞了
应用一 PeriodicCallback
ioloop.py下面有一个非常简单的PeriodicCallback类,它的作用是定期将callback加入到IOLoop的_timeout列表中。比如你需要每隔三十分钟去清理一次临时文件夹产生的垃圾文件等等。简易实现大概是这样
1 2 3 4 5 6 7 8 9 10 11 12
| class PeriodicCallback(object): def __init__(self, interval, func): self.interval = interval self.func = func
def start(self): timeout = time.time() + self.interval IOLoop.instance().add_timeout(timeout, self.callback)
def callback(self): self.func() self.start()
|
再执行start之后添加到了_timeout。待时间到达后执行一次然后再调用start,如此反复😳。最后达到了无限循环执行的效果
应用二 自动重启
对于web框架而言,在开发过程中当代码发生变更自动重启机制几乎是都必备的。在Flask中是主线程单独开了一个进程,在进程里面将主函数开一个线程运行。然后执行死循环进行代码变更检测。发现变更则进行进程退出操作。主线程捕获退出状态进行重启。示例代码可以看这里
可以看到自动重启基本由三个部分组成。
- 主代码正常执行
- 检测代码变更逻辑定期执行
- 检测到变更后重启程序
对于第一个条件而言Tornado没什么,就是主线程正常操作,第二个条件就利用了上面说的PeriodicCallback,将检测函数定期执行就好。第三个条件使用os.execv重载整个进程。bingo,没有单独的开启线程,开启进程。单线程完成了自动重启的操作。最简代码应该是这样的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import functools import ioloop import os import sys import types
def start(io_loop=None, check_time=500): io_loop = io_loop or ioloop.IOLoop.instance() modify_times = {} callback = functools.partial(_reload_on_update, modify_times) scheduler = ioloop.PeriodicCallback(callback, check_time, io_loop=io_loop) scheduler.start()
def _reload_on_update(modify_times): for module in sys.modules.values(): if not isinstance(module, types.ModuleType): continue path = getattr(module, "__file__", None) if not path: continue if path.endswith(".pyc") or path.endswith(".pyo"): path = path[:-1] try: modified = os.stat(path).st_mtime except: continue if path not in modify_times: modify_times[path] = modified continue if modify_times[path] != modified: os.execv(sys.executable, [sys.executable] + sys.argv)
|
使用sys.modules查看所有已加载的模块。每500毫秒将文件改动时间和已有数据进行对比。如果不同则执行os.execv覆盖掉当前进程,后续该代码虽然为了跨平台逻辑上有变动。可是整体思路是没有改变的
总结
总结而言IOLoop的逻辑还是很好理解的。它可以被认为是一个独立性非常强的模块。一个单例,整体是一个死循环提供三种情况的处理。在运行过程中不断的调用单例的成员函数,完成对一个事务(比如获得一个网页的内容)的处理。至于为什么没有被阻塞。每当要IO阻塞的时候(比如connect)就注册一个事件回调,然后继续运行不会阻塞的部分。等到事件完成再进行IO操作就不会阻塞了。从而达到整体非阻塞的效果