Tornado ioloop

ioloop 即io事件处理轮询

本文就从下面这个 Tornado 官方的例子程序开始:

1
2
3
4
5
6
7
8
9
10
11
12
13
import tornado.ioloop
import tornado.web
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world!")
def make_app():
return tornado.web.Application([
(r"/", MainHandler),
])
if __name__ == '__main__':
app = make_app()
app.listen(8879)
tornado.ioloop.IOLoop().current().start()

tornado.ioloop.IOLoop().current().start() 为起点,切入 ioloop 的处理过程。首先程序来到类 IOLoopcurrent 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# tornado/ioloop.py
class IOLoop(Configurable):
"""A level-triggered I/O loop."""
# Global lock for creating global IOLoop instance
_instance_lock = threading.Lock()
"""使用threading.local(),使_current在每个独立的线程中成为归属于各个线程的独立变量,而且各个线程之间互不影响。"""
_current = threading.local()
@staticmethod
def current(instance=True):
"""Returns the current thread's `IOLoop`.
If an `IOLoop` is currently running or has been marked as
current by `make_current`, returns that instance. If there is
no current `IOLoop`, returns `IOLoop.instance()` (i.e. the
main thread's `IOLoop`, creating one if necessary) if ``instance``
is true.
In general you should use `IOLoop.current` as the default when
constructing an asynchronous object, and use `IOLoop.instance`
when you mean to communicate to the main thread from a different
one.
.. versionchanged:: 4.1
Added ``instance`` argument to control the fallback to
`IOLoop.instance()`.
"""
current = getattr(IOLoop._current, "instance", None)
if current is None and instance:
return IOLoop.instance()
return current
@staticmethod
def instance():
"""Returns a global `IOLoop` instance.
Most applications have a single, global `IOLoop` running on the
main thread. Use this method to get this instance from
another thread. In most other cases, it is better to use `current()`
to get the current thread's `IOLoop`.
"""
if not hasattr(IOLoop, "_instance"):
with IOLoop._instance_lock:
if not hasattr(IOLoop, "_instance"):
# New instance after double check
IOLoop._instance = IOLoop()
return IOLoop._instance
@classmethod
def configurable_default(cls):
if hasattr(select, "epoll"):
from tornado.platform.epoll import EPollIOLoop
return EPollIOLoop
if hasattr(select, "kqueue"):
# Python 2.6+ on BSD or Mac
from tornado.platform.kqueue import KQueueIOLoop
return KQueueIOLoop
from tornado.platform.select import SelectIOLoop
return SelectIOLoop
@classmethod
def configurable_base(cls):
return IOLoop

IOLoop 的父类为 Configurable,其使用 new() 方法进行初始化(new()方法在init()方法之前被调用,它可以决定一个类是否使用init()方法进行初始化)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class Configurable(object):
"""Base class for configurable interfaces.
A configurable interface is an (abstract) class whose constructor
acts as a factory function for one of its implementation subclasses.
The implementation subclass as well as optional keyword arguments to
its initializer can be set globally at runtime with `configure`.
By using the constructor as the factory method, the interface
looks like a normal class, `isinstance` works as usual, etc. This
pattern is most useful when the choice of implementation is likely
to be a global decision (e.g. when `~select.epoll` is available,
always use it instead of `~select.select`), or when a
previously-monolithic class has been split into specialized
subclasses.
Configurable subclasses must define the class methods
`configurable_base` and `configurable_default`, and use the instance
method `initialize` instead of ``__init__``.
"""
__impl_class = None # type: type
__impl_kwargs = None # type: Dict[str, Any]
def __new__(cls, *args, **kwargs):
base = cls.configurable_base()
init_kwargs = {}
if cls is base:
impl = cls.configured_class()
if base.__impl_kwargs:
init_kwargs.update(base.__impl_kwargs)
else:
impl = cls
init_kwargs.update(kwargs)
instance = super(Configurable, cls).__new__(impl)
# initialize vs __init__ chosen for compatibility with AsyncHTTPClient
# singleton magic. If we get rid of that we can switch to __init__
# here too.
instance.initialize(*args, **init_kwargs)
return instance

Configurable 类中的 new() 方法中,根据具体实现类(子类)继承实现的几个重要方法来完成本身的初始化,new() 方法其实是一个工厂方法。其中,cls.configured_class() 方法获取本次运行环境下的使用的轮询方式(比如epoll/kqueue/select):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Configurable(object):
......
@classmethod
def configured_class(cls):
# type: () -> type
"""Returns the currently configured class."""
base = cls.configurable_base()
if cls.__impl_class is None:
base.__impl_class = cls.configurable_default()
return base.__impl_class
class IOLoop(Configurable):
@classmethod
def configurable_default(cls):
if hasattr(select, "epoll"):
from tornado.platform.epoll import EPollIOLoop
return EPollIOLoop
if hasattr(select, "kqueue"):
# Python 2.6+ on BSD or Mac
from tornado.platform.kqueue import KQueueIOLoop
return KQueueIOLoop
from tornado.platform.select import SelectIOLoop
return SelectIOLoop

测试环境为centos,返回值为 EPollIOLoop 。我们来看下其实现:

1
2
3
class EPollIOLoop(PollIOLoop):
def initialize(self, **kwargs):
super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)

EPollIOLoopPollIOLoop 继承而来,而且impl默认值为 select.epoll()。文章最开始的例子中的 tornado.ioloop.IOLoop().current().start() 其实就是类 PollIOLoopstart() 方法,我们看下该类的 initialize()start() 方法:

PollIOLoop(IOLoop):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""Base class for IOLoops built around a select-like function.
For concrete implementations, see `tornado.platform.epoll.EPollIOLoop`
(Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or
`tornado.platform.select.SelectIOLoop` (all platforms).
"""
def initialize(self, impl, time_func=None, **kwargs):
super(PollIOLoop, self).initialize(**kwargs)
self._impl = impl
if hasattr(self._impl, 'fileno'):
set_close_exec(self._impl.fileno())
self.time_func = time_func or time.time
self._handlers = {}
self._events = {}
self._callbacks = []
self._callback_lock = threading.Lock()
self._timeouts = []
self._cancellations = 0
self._running = False
self._stopped = False
self._closing = False
self._thread_ident = None
self._blocking_signal_threshold = None
self._timeout_counter = itertools.count()
# Create a pipe that we send bogus data to when we want to wake
# the I/O loop when it is idle
self._waker = Waker()
self.add_handler(self._waker.fileno(),
lambda fd, events: self._waker.consume(),
self.READ)
def start(self):
if self._running:
raise RuntimeError("IOLoop is already running")
self._setup_logging()
if self._stopped:
self._stopped = False
return
old_current = getattr(IOLoop._current, "instance", None)
IOLoop._current.instance = self
self._thread_ident = thread.get_ident()
self._running = True
# signal.set_wakeup_fd closes a race condition in event loops:
# a signal may arrive at the beginning of select/poll/etc
# before it goes into its interruptible sleep, so the signal
# will be consumed without waking the select. The solution is
# for the (C, synchronous) signal handler to write to a pipe,
# which will then be seen by select.
#
# In python's signal handling semantics, this only matters on the
# main thread (fortunately, set_wakeup_fd only works on the main
# thread and will raise a ValueError otherwise).
#
# If someone has already set a wakeup fd, we don't want to
# disturb it. This is an issue for twisted, which does its
# SIGCHLD processing in response to its own wakeup fd being
# written to. As long as the wakeup fd is registered on the IOLoop,
# the loop will still wake up and everything should work.
old_wakeup_fd = None
if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':
# requires python 2.6+, unix. set_wakeup_fd exists but crashes
# the python process on windows.
try:
old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
if old_wakeup_fd != -1:
# Already set, restore previous value. This is a little racy,
# but there's no clean get_wakeup_fd and in real use the
# IOLoop is just started once at the beginning.
signal.set_wakeup_fd(old_wakeup_fd)
old_wakeup_fd = None
except ValueError:
# Non-main thread, or the previous value of wakeup_fd
# is no longer valid.
old_wakeup_fd = None
try:
while True:
# Prevent IO event starvation by delaying new callbacks
# to the next iteration of the event loop.
with self._callback_lock:
callbacks = self._callbacks
self._callbacks = []
# Add any timeouts that have come due to the callback list.
# Do not run anything until we have determined which ones
# are ready, so timeouts that call add_timeout cannot
# schedule anything in this iteration.
due_timeouts = []
if self._timeouts:
now = self.time()
while self._timeouts:
if self._timeouts[0].callback is None:
# The timeout was cancelled. Note that the
# cancellation check is repeated below for timeouts
# that are cancelled by another timeout or callback.
heapq.heappop(self._timeouts)
self._cancellations -= 1
elif self._timeouts[0].deadline <= now:
due_timeouts.append(heapq.heappop(self._timeouts))
else:
break
if (self._cancellations > 512 and
self._cancellations > (len(self._timeouts) >> 1)):
# Clean up the timeout queue when it gets large and it's
# more than half cancellations.
self._cancellations = 0
self._timeouts = [x for x in self._timeouts
if x.callback is not None]
heapq.heapify(self._timeouts)
for callback in callbacks:
self._run_callback(callback)
for timeout in due_timeouts:
if timeout.callback is not None:
self._run_callback(timeout.callback)
# Closures may be holding on to a lot of memory, so allow
# them to be freed before we go into our poll wait.
callbacks = callback = due_timeouts = timeout = None
if self._callbacks:
# If any callbacks or timeouts called add_callback,
# we don't want to wait in poll() before we run them.
poll_timeout = 0.0
elif self._timeouts:
# If there are any timeouts, schedule the first one.
# Use self.time() instead of 'now' to account for time
# spent running callbacks.
poll_timeout = self._timeouts[0].deadline - self.time()
poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
else:
# No timeouts and no callbacks, so use the default.
poll_timeout = _POLL_TIMEOUT
if not self._running:
break
if self._blocking_signal_threshold is not None:
# clear alarm so it doesn't fire while poll is waiting for
# events.
signal.setitimer(signal.ITIMER_REAL, 0, 0)
try:
event_pairs = self._impl.poll(poll_timeout)
except Exception as e:
# Depending on python version and IOLoop implementation,
# different exception types may be thrown and there are
# two ways EINTR might be signaled:
# * e.errno == errno.EINTR
# * e.args is like (errno.EINTR, 'Interrupted system call')
if errno_from_exception(e) == errno.EINTR:
continue
else:
raise
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL,
self._blocking_signal_threshold, 0)
# Pop one fd at a time from the set of pending fds and run
# its handler. Since that handler may perform actions on
# other file descriptors, there may be reentrant calls to
# this IOLoop that modify self._events
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
fd_obj, handler_func = self._handlers[fd]
handler_func(fd_obj, events)
except (OSError, IOError) as e:
if errno_from_exception(e) == errno.EPIPE:
# Happens when the client closes the connection
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func = None
finally:
# reset the stopped flag so another start/stop pair can be issued
self._stopped = False
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL, 0, 0)
IOLoop._current.instance = old_current
if old_wakeup_fd is not None:
signal.set_wakeup_fd(old_wakeup_fd)

initialize() 方法中做了实际的初始化工作,后续再详细展开细节的研究,暂时略过。

start() 方法中重点看下 event_pairs = self._impl.poll(poll_timeout) ,其作用就是使 epoll 进入等待状态。当有IO任务处于就绪状态时,该方法返回 event_pairsself._events.update(event_pairs) 将就绪任务记录到 self._events ,根据fd从 self._handlers 中对应取出 handler_func 处理函数,并且将该函数作为就绪事件的回调函数,执行真正的IO处理工作。