Tornado在2011年的2.1版本加入了gen.engine模块,该模块主要为了解决异步程序编写不够优雅的问题。力图让使用者离callback更远,这也是Tornado厉害的地方。本来内部各种事件处理,callback满天飞,可是在用户眼里,它那个class Handler(web.RequestHandler)下面get的写法和同步写法差不多嘛。和同步的写法一样,获得了更高的性能,我想这也是为什么Tornado能出彩的地方吧(本文代码基于v2.3.0)

起因

主要起因就是不够优雅的callback,假设有一个需求,我们需要向第三方api请求数据,然后返回给客户端。此时写法是这样的

1
2
3
4
5
6
7
8
9
10
class AsyncHandler(RequestHandler):
@asynchronous
def get(self):
http_client = AsyncHTTPClient()
http_client.fetch("http://example.com",
callback=self.on_fetch)
def on_fetch(self, response):
do_something_with_response(response)
self.render("template.html")

这种写法会有什么问题?假设我现在需要对返回的数据进行处理,那么我必须要写到回调函数里面。另外,如果在回调函数里面继续要使用其他的回调逻辑,那么肯定也是需要继续在回调函数里面编写,最后就形成了著名的回调地狱。明显这种方式是很让人恶心的,Tornado在这一版本上改动成了这样
1
2
3
4
5
6
7
8
class GenAsyncHandler(RequestHandler):
@asynchronous
@gen.engine
def get(self):
http_client = AsyncHTTPClient()
response = yield gen.Task(http_client.fetch, "http://example.com")
do_something_with_response(response)
self.render("template.html")

这样就好很多了嘛,更符合人们同步编写代码的直觉。观察改变有三个地方

  • 整个函数添加了gen.engine装饰器
  • 添加了yield语句
  • http_client.fetch被gen.Task封装起来

实现

先说一下这个地方的改动。v2.1.0版本并没有改动任何已有的代码,仅仅是添加了gen.py模块该commitz在这里。我觉得这算是比较厉害的地方了

回顾这里用到的知识点,毫无疑问,Tornado里面最核心的yield终于在这个版本上场了。原来http_client.fetch是需要传入一个callback回调函数进行回调的。那么被gen.Task封装之后,这个回调函数不存在了,可以认为这里是gen.Task自己内部传入了一个回调函数。而只要是函数用到了yield关键字,那么它就是一个生成器对象。生成器对象有两个基本的特性,执行get()并不会立即开始执行、执行send后遇到yield会被暂停

考虑到上面说的gen.py是一个很独立的函数,并没有改动任何已有的代码。那么gen.engine装饰器肯定对get()执行了初始化并执行了send(None)让它开始运行起来。可是遇到yield会停止。同时上面的语义是再次恢复后以前http_client.fetch传递给回调函数的值需要赋值给response变量,生成器的send方法是可以做到这一点的,这里可能并不太好理解。

现在对功能进行划分

  • gen.engine装饰器的主要作用是让生成器运行起来(调用后再执行send(None))
  • gen.Task的作用是将http_client.fetch 封装成Task对象,并传入一个回调,该回调使得send被再次调用
  • Runner 这是一个对用户无感知的类。连接gen.engine和gen.Task

基础实现代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Runner():
def __init__(self, gen):
self.gen = gen()
self.yielded = self.gen.send(None)
self.yielded.start(self)
def result_callback(self, value):
self.result = value
self.run()
def run(self):
try:
self.yielded = self.gen.send(self.result)
except StopIteration:
return
else:
self.yielded.start(self)
class Task():
def __init__(self, func):
self.func = func
def start(self, runner):
self.func(callback=runner.result_callback)
def engine(func):
def inner():
return Runner(func)
return inner

可以看到gen.engine是一个非常简单的装饰器,将生成器传递给Runner并返回。在Runner里面它被执行send(None)。这个时候send(None)返回的是Task(http_client.fetch, “http://example.com")。

Task的封装也可以很简单,它仅仅存在一个start。传入runner并将http_client.fetch的回调设置为runner.result_callback

Runner呢,它负责了让生成器开始运行,并拥有result_callback函数,在callback里面,它调用了自身的run函数。在run里面再次调用send,此时send的值是http_client.fetch返回的值。如果没有触发StopIteration异常则表明再一次返回了Task对象。重复过程执行Task.start

这样它很巧妙的将callback驱动变成了yield驱动,要知道Task必定会调用callback函数,当callback被调用的时候就等同于赋值给了yield左边的变量

改进

上面的代码并没有考虑异常情况,另外可能存在这种情况

1
2
3
4
def get(self):
http_client = AsyncHTTPClient()
response1, response2 = yield [gen.Task(http_client.fetch, url1),
gen.Task(http_client.fetch, url2)]

我们希望两个Task同时执行,而不是等一个得到结果后,再去请求第二个结果。因此需要稍微复杂一些。我们给每一个Task标记一个独立的key,并修改Task的callback部分,让回调的时候知道属于哪一个Task对象。于此同时Task多了两个方法get_resultis_ready方法。在首次得到yielded对象后。如果判断是list对象,那么对该list再次进行封装得到Multi对象,它的is_ready会检查多个Task是不是都得到结果。在被执行回调后执行到Runner.run(),会先检查是否满足is_ready。如果不满足说明还有的Task并没有返回结果,直接返回。等待下一次被回调。均满足才得到全部结果执行send操作

总结

这段代码非常独立,还是很有看头很精彩的。被gen.engine修饰一次意味着存在一个Runner,这个Runner配合着多个Task,主要可以看做Task和Runner互相调用的过程。Task每被回调一次则Runner.run()被调用一次,执行一次send操作,yield往下走一次返回下一个Task对象。另外这段代码里面还提供了很坑爹的Callback、Wait控制方式。思维有点奇葩,一般人不太会去用,实现倒是并不复杂