介绍完StackContext、gen.engine、gen.coroutine之后Tornado3.0以前的核心内容基本已经完结了。本篇介绍一下gen.coroutine中Future的应用,在tornado中生成一个队列的数据结构。如果经常写web的情况,单单只处理用户的输入给出输出,那么要求不会太多,也不会用到队列。可是如果你去写一个爬虫,肯定需要限速啥的,此时队列就是一个很有用的东西,很容易实现生产者消费者模型。Tornado官方的demo中也有一个[爬虫代码(https://github.com/tornadoweb/tornado/blob/master/demos/webspider/webspider.py)]演示如何使用队列(本篇基于v5.0.0)

目标

理解Queue模块给出的例子就可以了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue
q = Queue(maxsize=2)
@gen.coroutine
def consumer():
while True:
item = yield q.get()
try:
print('Doing work on %s' % item)
yield gen.sleep(0.01)
finally:
q.task_done()
@gen.coroutine
def producer():
for item in range(5):
yield q.put(item)
print('Put %s' % item)
@gen.coroutine
def main():
# Start consumer without waiting (since it never finishes).
IOLoop.current().spawn_callback(consumer)
yield producer() # Wait for producer to put all tasks.
yield q.join() # Wait for consumer to finish all tasks.
print('Done')
IOLoop.current().run_sync(main)

控制权转移

在python自带模块里面的queue是给多线程使用的.意味着当队列里面没有元素的时候会阻塞主线程。阻塞主线程这种操作在协程里面当然是不允许的。协程里面需要的是当某事需要进行等待的时候主动让出控制权限,让其他的事件执行。具体来说,如何让出控制权呢。理解上一篇的coroutine就可以发现。yield 一个暂时还没有结果的Future就会导致控制权转移,等Future结果到来yield就会被恢复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from tornado.gen import coroutine,sleep
from tornado.ioloop import IOLoop
@coroutine
def xx():
yield sleep(1)
print("xx")
@coroutine
def x():
xx()
print("x")
# x()
# IOLoop.instance().start()
IOLoop.instance().run_sync(x)

观察这个例子。正常的写法是yield xx(),然后会调用xx(),过一秒之后打印xx再打印x。如果直接使用xx,其实这段代码也没有什么问题。xx返回了一个Future对象。因为没有使用yield,所以它不必等待xx函数执行完再进行下面的操作。xx返回的Future已经开始了运行(它并不会什么都不发生),它依旧在一秒钟之后被执行然后打印xx。

Queue的实现思路

核心思路是

  • get、put操作均返回Future对象
  • 没有被设置值的Future执行yield会有阻塞效果

创建三个队列分别为正常队列_queue、未来get队列_getters,未来put队列_putters。
执行get操作的时候创建一个Future对象。优先从_putters里面获取.获得的是一个值和一个Future对象,执行Future.set_result。此时代表put被阻塞,执行set_result取消阻塞。再考虑从_queue里面取,代表正常取值。如果都没有则放入_getters队列(此时因为Futures没有值,取值的地方被阻塞)。

执行put就是相反的操作,优先从_getters里面取(有值则执行Future.set_result,取消获取过程的阻塞)。再检查正常队列的元素是否超过最大值。否则加入未来队列

那么队列还有task_done和join操作,前者每一个任务执行完毕都会被调用表示该任务完成,后者表示等待加入到队列的所有任务都完成,否则阻塞。顺便说一下tornado的锁是怎么实现的

依旧是依靠没有被设置值的Future被yield会造成阻塞。该Future被设置值则会继续执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Event:
def __init__(self):
self.f = Future()
def set(self):
if not self.f.done():
self.f.set_result(None)
def clear(self):
if self.f.done():
self.f = Future()
def wait(self):
return self.f

注意。这个地方调用set和clear都是不带yield的,只有当最后调用wait才使用yield。

Queue的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class Queue:
def __init__(self, maxsize=None):
self.max_size = maxsize or 2 << 16
self._queue = collections.deque([])
self._getters = collections.deque([])
self._putters = collections.deque([])
self._unfinished_tasks = 0
self._finished = Event()
self._finished.set()
def get(self):
f = Future()
if len(self._queue) > 0:
value = self._queue.popleft()
f.set_result(value)
elif len(self._putters) > 0:
item, future = self._putters.popleft()
f.set_result(item)
future.set_result(None)
else:
self._getters.append(f)
return f
def put(self, value):
self._unfinished_tasks += 1
self._finished.clear()
f = Future()
if len(self._getters) > 0:
future = self._getters.pop()
f.set_result(None)
future.set_result(value)
elif len(self._queue) < self.max_size:
self._queue.append(value)
f.set_result(None)
else:
self._putters.append((value, f))
return f
def task_done(self):
self._unfinished_tasks -= 1
if self._unfinished_tasks == 0:
self._finished.set()
def join(self):
return self._finished.wait()

源码做了更多的适配将_get和_put独立出来。然后使用优先级队列,先进后出队列满足不同的出入顺序要求