• Tornado IOLoop start()里的核心调度

    理解IOLoop
    服务器君一共花费 11.556 ms 进行了 2 次数据库查询,努力地为您提供了这个页面。
    广告很萌的

    IOLoop的初始化

    初始化过程中选择 epoll 的实现方式,Linux 平台为 epoll,BSD 平台为 kqueue,其他平台如果安装有C模块扩展的 epoll 则使用 tornado对 epoll 的封装,否则退化为 select。

    def __init__(self, impl=None):
        self._impl = impl or _poll()
        #省略部分代码
        self._waker = Waker()
        self.add_handler(self._waker.fileno(),
                         lambda fd, events: self._waker.consume(),
                         self.READ)
    
    def add_handler(self, fd, handler, events):
        """Registers the given handler to receive the given events for fd."""
        self._handlers[fd] = stack_context.wrap(handler)
        self._impl.register(fd, events | self.ERROR)
    

    在 IOLoop 初始化的过程中创建了一个 Waker 对象,将 Waker 对象 fd 的读端注册到事件循环中并设定相应的回调函数(这样做的好处是当事件循环阻塞而没有响应描述符出现,需要在最大 timeout 时间之前返回,就可以向这个管道发送一个字符)。

    Waker 的使用:一种是在其他线程向 IOLoop 添加 callback 时使用,唤醒 IOLoop 同时会将控制权转移给 IOLoop 线程并完成特定请求。唤醒的方法向管道中写入一个字符'x'。另外,在 IOLoop的stop 函数中会调用self._waker.wake(),通过向管道写入'x'停止事件循环。

    add_handler 函数使用了stack_context 提供的 wrap 方法。wrap 返回了一个可以直接调用的对象并且保存了传入之前的堆栈信息,在执行时可以恢复,这样就保证了函数的异步调用时具有正确的运行环境。

    IOLoop的start方法

    IOLoop 的核心调度集中在 start() 方法中,IOLoop 实例对象调用 start 后开始 epoll 事件循环机制,该方法会一直运行直到 IOLoop 对象调用 stop 函数、当前所有事件循环完成。start 方法中主要分三个部分:一个部分是对超时的相关处理;一部分是 epoll 事件通知阻塞、接收;一部分是对 epoll 返回I/O事件的处理。

    • 为防止 IO event starvation,将回调函数延迟到下一轮事件循环中执行。
    • 超时的处理 heapq 维护一个最小堆,记录每个回调函数的超时时间(deadline)。每次取出 deadline 最早的回调函数,如果callback标志位为 True 并且已经超时,通过 _run_callback 调用函数;如果没有超时需要重新设定 poll_timeout 的值。
    • 通过 self._impl.poll(poll_timeout) 进行事件阻塞,当有事件通知或超时时 poll 返回特定的 event_pairs。
    • epoll 返回通知事件后将新事件加入待处理队列,将就绪事件逐个弹出,通过stack_context.wrap(handler)保存的可执行对象调用事件处理。
    while True:
        poll_timeout = 3600.0
    
        with self._callback_lock:
            callbacks = self._callbacks
            self._callbacks = []
        for callback in callbacks:
            self._run_callback(callback)
    
        # 超时处理
        if self._timeouts:
            now = time.time()
            while self._timeouts:
                if self._timeouts[0].callback is None:
                    # the timeout was cancelled
                    heapq.heappop(self._timeouts)
                elif self._timeouts[0].deadline <= now:
                    timeout = heapq.heappop(self._timeouts)
                    self._run_callback(timeout.callback)
                else:
                    seconds = self._timeouts[0].deadline - now
                    poll_timeout = min(seconds, poll_timeout)
                    break
    
        if self._callbacks:
            # If any callbacks or timeouts called add_callback,
            # we don't want to wait in poll() before we run them.
            poll_timeout = 0.0
    
        if not self._running:
            break
    
        if self._blocking_signal_threshold is not None:
            # clear alarm so it doesn't fire while poll is waiting for events.
            signal.setitimer(signal.ITIMER_REAL, 0, 0)
    
        # epoll阻塞,当有事件通知或超时返回event_pairs
        try:
            event_pairs = self._impl.poll(poll_timeout)
        except Exception, e:
            # 异常处理,省略
    
        # 对epoll返回event_pairs事件的处理
        self._events.update(event_pairs)
        while self._events:
            fd, events = self._events.popitem()
            try:
                self._handlers[fd](fd, events)
            except Exception e:
                # 异常处理,省略
    

    3.0后的一些改动

    Tornado3.0以后 IOLoop 模块的一些改动。

    IOLoop 成为 util.Configurable 的子类,IOLoop 中绝大多数成员方法都作为抽象接口,具体实现由派生类 PollIOLoop 完成。IOLoop 实现了 Configurable 中的 configurable_base 和 configurable_default 这两个抽象接口,用于初始化过程中获取类类型和类的实现方法(即 IOLoop 中 poller 的实现方式)。

    在 Tornado3.0+ 中针对不同平台,单独出 poller 相应的实现,EPollIOLoop、KQueueIOLoop、SelectIOLoop 均继承于 PollIOLoop。下边的代码是 configurable_default 方法根据平台选择相应的 epoll 实现。初始化 IOLoop 的过程中会自动根据平台选择合适的 poller 的实现方法。

    @classmethod
    def configurable_default(cls):
    	if hasattr(select, "epoll"):
    		from tornado.platform.epoll import EPollIOLoop
    		return EPollIOLoop
    	if hasattr(select, "kqueue"):
    		# Python 2.6+ on BSD or Mac
    		from tornado.platform.kqueue import KQueueIOLoop
    		return KQueueIOLoop
    	from tornado.platform.select import SelectIOLoop
    	return SelectIOLoop
    

    其他有很多细节上的改动,详细可参见官方文档What’s new in Tornado 3.0

更多 推荐条目

Welcome to NowaMagic Academy!

现代魔法 推荐于 2013-02-27 10:23   

本章最新发布
随机专题
  1. [PHP程序设计] 命令式编程范式 6 个条目
  2. [C语言程序设计] C语言里的全局变量 2 个条目
  3. [C语言程序设计] 结构体基本知识 1 个条目
  4. [Python程序设计] Django后台管理系统 2 个条目
  5. [PHP程序设计] PHP中的Hash算法 3 个条目
  6. [Linux操作系统] CentOS上使用EPEL Repository 2 个条目
  7. [Python程序设计] urls.py设置技巧 8 个条目
  8. [JavaScript程序设计] jQuery与表单操作 2 个条目
  9. [PHP程序设计] PHP数组探索 4 个条目
  10. [数据库技术] 数据库范式篇 5 个条目
  11. [软件工程与项目管理] 浏览器的HTML解析器 8 个条目
  12. [数据库技术] MySQL中英文混合排序 4 个条目
窗口 -- [八点]