上一篇讲了“Tornado源码分析之概述”,本文将介绍IOLoop类的方法以及IOLoop提供了哪些功能。

tornado/ioloop.py 源码: https://github.com/tornadoweb/tornado/blob/master/tornado/ioloop.py

IOLoop 类组织结构

|---IOLoop
        ---__init__(self, impl=None)
        ---instance(cls)
        ---initialized(cls)
        ---add_handler(self, fd, handler, events)
        ---update_handler(self, fd, events)
        ---remove_handler(self, fd)
        ---set_blocking_signal_threshold(self, seconds, action)
        ---set_blocking_log_threshold(self, seconds)
        ---log_stack(self, signal, frame)
        ---start(self)
        ---stop(self)
        ---running(self)
        ---add_timeout(self, deadline, callback)
        ---remove_timeout(self, timeout)
        ---add_callback(self, callback)
        ---_wake(self)
        ---_run_callback(self, callback)
        ---handle_callback_exception(self, callback)
        ---_read_waker(self, fd, events)
        ---_set_nonblocking(self, fd)
        ---_set_close_exec(self, fd)
---|

从上一章的Demo里面可以看到最重要的对外提供的方法有

0)instance()                 // 静态方法 @classmethod

1)add_handler(...)       // 添加事件监听

2)start()                       // 启动

类似于传统的事件驱动方式,这里的使用方式也很简单,从IOLoop类中看起:

先是自己定义了几个EPOLL的宏,就是EPOLL的事件类型

#epoll 的事件类型,类似于这里的宏定义
_EPOLLIN = 0x001
_EPOLLPRI = 0x002
_EPOLLOUT = 0x004
_EPOLLERR = 0x008
_EPOLLHUP = 0x010
_EPOLLRDHUP = 0x2000
_EPOLLONESHOT = (1 << 30)
_EPOLLET = (1 << 31)

# Our events map exactly to the epoll events
#将这几个事件类型重定义一番
NONE = 0
READ = _EPOLLIN
WRITE = _EPOLLOUT
ERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUP

# Constants from the epoll module
_EPOLLIN = 0x001
_EPOLLPRI = 0x002
_EPOLLOUT = 0x004
_EPOLLERR = 0x008
_EPOLLHUP = 0x010
_EPOLLRDHUP = 0x2000
_EPOLLONESHOT = (1 << 30)
_EPOLLET = (1 << 31)

# Our events map exactly to the epoll events
NONE = 0
READ = _EPOLLIN
WRITE = _EPOLLOUT
ERROR = _EPOLLERR | _EPOLLHUP

常用的就是三种,READ,WRITE,ERROR

 

IOLoop 构造函数

# ioloop的构造函数
def __init__(self, impl=None):
    # 选择异步事件循环监听方式,默认是epoll,后面的_impl都是指的是epoll
    self._impl = impl or _poll()
    # 自省,查看 self._impl 中是否有 fileno
    # 如果有,就关闭起exec性质
    if hasattr(self._impl, 'fileno'):
        self._set_close_exec(self._impl.fileno())
    # _set_close_exec 是一个类方法,下面有定义
	# 当 FD_CLOEXEC 设置了以后,exec() 函数执行的时候会自动关闭描述符
	"""     
	def _set_close_exec(self, fd):
		flags = fcntl.fcntl(fd, fcntl.F_GETFD)
		fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)   
	"""
    # handlers 是一个函数集字典
    self._handlers = {}
    self._events = {}
    # 回调函数使用的是列表
    self._callbacks = []
    # 用来记录链接超时
    self._timeouts = []
    self._running = False
    self._stopped = False
    self._blocking_signal_threshold = None

    # Create a pipe that we send bogus data to when we want to wake
    # the I/O loop when it is idle
    # 判断是否是 NT 操作系统
    if os.name != 'nt':
        #创建一个管道 ,返回的为读写两端的文件描述符
        r, w = os.pipe()
        #设置为非阻塞
        self._set_nonblocking(r)
        self._set_nonblocking(w)
        
        self._set_close_exec(r)
        self._set_close_exec(w)
        #分别以读方式和写方式打开管道
        self._waker_reader = os.fdopen(r, "rb", 0)
        self._waker_writer = os.fdopen(w, "wb", 0)
    else:
        #如若不是 NT 系统,改用win32 支持的管道类型
        self._waker_reader = self._waker_writer = win32_support.Pipe()
        r = self._waker_writer.reader_fd
    # 将管道的 read 端与函数 _read_waker 关联,事件类型为 READ
    # 这里也是IO多路复用的一种机制,将管道的描述符也添加进多路复用的IO 管理
    self.add_handler(r, self._read_waker, self.READ)

注意最后的几点,将管道描述符的读端也加入事件循环检查,并设置相应的回调函数,这样做的好处是以便事件循环阻塞而没有相应描述符出现,需要在最大timeout时间之前返回,就可以向这个管道发送一个字符,用来终止阻塞在监听阶段的事件循环监听函数。

看看waker是这样定义的:

def _wake(self):
    try:
        self._waker_writer.write("x")
    except IOError:
        pass

需要唤醒阻塞中的事件循环监听函数的时候,只需要向管道写入一个字符,就可以提前结束循环

instance就是简单的返回一个实例:  

def instance(cls):
    """Returns a global IOLoop instance.

    Most single-threaded applications have a single, global IOLoop.
    Use this method instead of passing around IOLoop instances
    throughout your code.

    A common pattern for classes that depend on IOLoops is to use
    a default argument to enable programs with multiple IOLoops
    but not require the argument for simpler applications:

        class MyClass(object):
            def __init__(self, io_loop=None):
                self.io_loop = io_loop or IOLoop.instance()
    """
    if not hasattr(cls, "_instance"):
        cls._instance = cls()
    return cls._instance

@staticmethod
def instance():
    """Returns a global `IOLoop` instance.

    Most applications have a single, global `IOLoop` running on the
    main thread.  Use this method to get this instance from
    another thread.  To get the current thread's `IOLoop`, use `current()`.
    """
    if not hasattr(IOLoop, "_instance"):
        with IOLoop._instance_lock:
            if not hasattr(IOLoop, "_instance"):
                # New instance after double check
                IOLoop._instance = IOLoop()
    return IOLoop._instance

instance()是一个静态方法,代表此IOLoop是一个单实例方法,一个进程只有一个_instance

在add_handler()里面

#将文件描述符发生相应的事件时的回调函数对应
def add_handler(self, fd, handler, events):
    """Registers the given handler to receive the given events for fd."""
    self._handlers[fd] = stack_context.wrap(handler)
    #在 epoll 中注册对应事件
    #epoll_ctl
    self._impl.register(fd, events | self.ERROR)
#更新相应的事件类型

可以看到,使用字典的方式,每一个fd就对应一个handler,下次事件循环返回的时候按照返回后的fd列表,依次调用相应的callback

|------

在tornado中,函数是通过stack_context.wrap()包装过,可以用来记录上下文

如果需要调用被包装过的函数,需要调用方法 _run_callback(self, callback),这个函数将包装过的callback作为参数出入,然后执行函数

def _run_callback(self, callback):
    try:
        callback()
    except (KeyboardInterrupt, SystemExit):
        raise
    except:
        self.handle_callback_exception(callback)

当函数执行发生异常时,可以记录下函数执行状态  

-------|

_impl.register就是被封装过的epoll的epoll_ctl,参数是EPOLL_CTL_ADD

见同一个文件下的_EPoll类

class _EPoll(object):
    """An epoll-based event loop using our C module for Python 2.5 systems"""
    _EPOLL_CTL_ADD = 1
    _EPOLL_CTL_DEL = 2
    _EPOLL_CTL_MOD = 3

    def __init__(self):
        self._epoll_fd = epoll.epoll_create()

    def fileno(self):
        return self._epoll_fd

    def register(self, fd, events):
        epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_ADD, fd, events)

    def modify(self, fd, events):
        epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_MOD, fd, events)

    def unregister(self, fd):
        epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_DEL, fd, 0)

    def poll(self, timeout):
        return epoll.epoll_wait(self._epoll_fd, int(timeout * 1000))

总结:上面讲了IOLoop中的几个重要函数,后面依次会有分析其他方法,还有其中一些细节值得平常注意的。 

 

start() 和 stop() 函数

start() 函数

def start(self):
    """Starts the I/O loop.

    The loop will run until one of the I/O handlers calls stop(), which
    will make the loop stop after the current event iteration completes.
    """
    if self._stopped:
        self._stopped = False
        return
    self._running = True
    while True:
        # Never use an infinite timeout here - it can stall epoll
        poll_timeout = 0.2

        # Prevent IO event starvation by delaying new callbacks
        # to the next iteration of the event loop.
        callbacks = self._callbacks
        self._callbacks = []
        #先运行注册了的回调函数
        for callback in callbacks:
            self._run_callback(callback)

        if self._callbacks:
            poll_timeout = 0.0

#检查超时事件
#方法是,在timeout这个bisect的排序的列表,每次取出头部最小的一个
#将deadline与当前时间比较,如果 <= 当前时间,就认为超时,然后调用相应的超时处理的回调函数
#这里不好理解的是deadline <= 当前时间 , 如果说deadline 大于当前时间,就代表还没有到
#超时条件
#循环检查,直到超时事件处理完成
#值得一说的是在libevent中是使用了最小堆每次取出当前的最小deadline
#由于最小堆的特性,每次从头取出的都是最小的
#Nginx的网络模块是用的红黑树来做,原理也是一样的
        if self._timeouts:
            now = time.time()
            while self._timeouts and self._timeouts[0].deadline <= now:
                timeout = self._timeouts.pop(0)
                self._run_callback(timeout.callback)
    #处理完了超时时间之后,需要将epoll最大阻塞时间改为小于当前最小超时时间的绝对值
    #不然可能在epoll返回后,本来不属于超时事件的事件被超时
            if self._timeouts:
                milliseconds = self._timeouts[0].deadline - now
                poll_timeout = min(milliseconds, poll_timeout)
#判断“反应堆”是否结束
#结束有两个方式,一个是设置_running 标志位,第二个就是往写管道写入"x"
        if not self._running:
            break
#从注释中可以看出,每次进入epoll等待事件之前都需要把sigalrm清空,以免在
#epoll阻塞期间收到信号,在epoll完成后重新设置
        if self._blocking_signal_threshold is not None:
            # clear alarm so it doesn't fire while poll is waiting for
            # events.
            signal.setitimer(signal.ITIMER_REAL, 0, 0)
                     
#进入epoll循环
        try:
            event_pairs = self._impl.poll(poll_timeout)
        except Exception, e:
    #在 epoll和 select 阻塞过程当中,经常会收到系统或者其他方式发过来的信号,这
    #时候系统的 errno 会被设置为 EINTR ,如果将遇到这样的情况,直接重启epoll就可以
    #如果不是这样的错误,则看做是致命错误
            # Depending on python version and IOLoop implementation,
            # different exception types may be thrown and there are
            # two ways EINTR might be signaled:
            # * e.errno == errno.EINTR
            # * e.args is like (errno.EINTR, 'Interrupted system call')
            if (getattr(e, 'errno', None) == errno.EINTR or
                (isinstance(getattr(e, 'args', None), tuple) and
                 len(e.args) == 2 and e.args[0] == errno.EINTR)):
                continue
            else:
                raise
#将被阻塞的sigalarm 还原 , 第二个参数是最大阻塞阈值
        if self._blocking_signal_threshold is not None:
            signal.setitimer(signal.ITIMER_REAL,
                             self._blocking_signal_threshold, 0)
                     
        # Pop one fd at a time from the set of pending fds and run
        # its handler. Since that handler may perform actions on
        # other file descriptors, there may be reentrant calls to
        # this IOLoop that update self._events
        #将新的事件加入到待处理队列中,现代非阻塞的网络库都使用的是这种方式
        self._events.update(event_pairs)
        #作者在写这段代码的过程当中不是使用的简单的顺序遍历这个队列,而使用的方式是
        #将就绪事件逐个弹出,以防止在处理过程当中就绪事件发生改变
        while self._events:
            fd, events = self._events.popitem()
            #在处理过程当中,常常会遇到客户端异常终止的情况
            #一般情况下如果读取错误,服务端会产生一个 sigpipe信号
            #这时候需要忽略这个信号
            #这里我有一个疑问就是为什么在add_handler 的时候 handler是经过 context.wrap包装过的
            #而在这里是直接调用,按道理应该是通过_running_callback调用,不过这里显然处理了异常情况了
            try:
                self._handlers[fd](fd, events)
            except (KeyboardInterrupt, SystemExit):
                raise
            except (OSError, IOError), e:
                if e.args[0] == errno.EPIPE:
                    # Happens when the client closes the connection
                    pass
                else:
                    logging.error("Exception in I/O handler for fd %d",
                                  fd, exc_info=True)
            except:
                logging.error("Exception in I/O handler for fd %d",
                              fd, exc_info=True)
    # reset the stopped flag so another start/stop pair can be issued
     
    self._stopped = False
    #将定时事件清空
    if self._blocking_signal_threshold is not None:
        signal.setitimer(signal.ITIMER_REAL, 0, 0)

这段代码中值得注意的部分就是在几个方面:

1. 超时事件的处理,timeout是一个排序后的列表,每次都是取得最前面最小的一个

2. 在开始epoll循环的过程当中,设置阻塞sigalarm

3. 在处理事件过程当中忽略sigpipe信号

4. 在处理就绪事件过程当中,是通过每次pop一个来处理,而不是一次遍历

 

stop() 函数

def stop(self):
    """Stop the loop after the current event loop iteration is complete.
    If the event loop is not currently running, the next call to start()
    will return immediately.

    To use asynchronous methods from otherwise-synchronous code (such as
    unit tests), you can start and stop the event loop like this:
      ioloop = IOLoop()
      async_method(ioloop=ioloop, callback=ioloop.stop)
      ioloop.start()
    ioloop.start() will return after async_method has run its callback,
    whether that callback was invoked before or after ioloop.start.
    """
    self._running = False
    self._stopped = True
    self._wake()

简单的设置标志位后,向管道发送"x"停止事件循环

总结:IOLoop差不多就是这些内容,利用python简单和高可读性,看网络模块的实现会让我们更加的专注于实现,而不是繁琐的基础代码的使用过程。

后面将看看IOStream类,是建立在IOLoop的一个上层封装,实现了基本的buffer事件

 

原文破修电脑的