引言:由于都是在工作当中抽出时间看源代码,所以更新速度比较慢,但是还是希望通过对好的源码的分析和探讨,大家相互学习,发现不好的地方共同讨论。
上次讲了IOLoop中的几个重要的方法,inistance() 和 add_handler() .. 今天看看Demo中一个最重要的方法,start(),顺带用stop()收尾
def start(self):"""Starts the I/O loop.The loop will run until one of the I/O handlers calls stop(), whichwill make the loop stop after the current event iteration completes."""if self._stopped:self._stopped = Falsereturnself._running = Truewhile True:# Never use an infinite timeout here - it can stall epollpoll_timeout = 0.2# Prevent IO event starvation by delaying new callbacks# to the next iteration of the event loop.callbacks = self._callbacksself._callbacks = []#先运行注册了的回调函数for callback in callbacks:self._run_callback(callback)if self._callbacks:poll_timeout = 0.0#检查超时事件#方法是,在timeout这个bisect的排序的列表,每次取出头部最小的一个#将deadline与当前时间比较,如果 <= 当前时间,就认为超时,然后调用相应的超时处理的回调函数#这里不好理解的是deadline <= 当前时间 , 如果说deadline 大于当前时间,就代表还没有到#超时条件#循环检查,直到超时事件处理完成#值得一说的是在libevent中是使用了最小堆每次取出当前的最小deadline#由于最小堆的特性,每次从头取出的都是最小的#Nginx的网络模块是用的红黑树来做,原理也是一样的if self._timeouts:now = time.time()while self._timeouts and self._timeouts[0].deadline <= now:timeout = self._timeouts.pop(0)self._run_callback(timeout.callback)#处理完了超时时间之后,需要将epoll最大阻塞时间改为小于当前最小超时时间的绝对值#不然可能在epoll返回后,本来不属于超时事件的事件被超时if self._timeouts:milliseconds = self._timeouts[0].deadline - nowpoll_timeout = min(milliseconds, poll_timeout)#判断“反应堆”是否结束#结束有两个方式,一个是设置_running 标志位,第二个就是往写管道写入"x"if not self._running:break#从注释中可以看出,每次进入epoll等待事件之前都需要把sigalrm清空,以免在#epoll阻塞期间收到信号,在epoll完成后重新设置if self._blocking_signal_threshold is not None:# clear alarm so it doesn't fire while poll is waiting for# events.signal.setitimer(signal.ITIMER_REAL, 0, 0)#进入epoll循环try:event_pairs = self._impl.poll(poll_timeout)except Exception, e:#在 epoll和 select 阻塞过程当中,经常会收到系统或者其他方式发过来的信号,这#时候系统的 errno 会被设置为 EINTR ,如果将遇到这样的情况,直接重启epoll就可以#如果不是这样的错误,则看做是致命错误# Depending on python version and IOLoop implementation,# different exception types may be thrown and there are# two ways EINTR might be signaled:# * e.errno == errno.EINTR# * e.args is like (errno.EINTR, 'Interrupted system call')if (getattr(e, 'errno', None) == errno.EINTR or(isinstance(getattr(e, 'args', None), tuple) andlen(e.args) == 2 and e.args[0] == errno.EINTR)):continueelse:raise#将被阻塞的sigalarm 还原 , 第二个参数是最大阻塞阈值if self._blocking_signal_threshold is not None:signal.setitimer(signal.ITIMER_REAL,self._blocking_signal_threshold, 0)# Pop one fd at a time from the set of pending fds and run# its handler. Since that handler may perform actions on# other file descriptors, there may be reentrant calls to# this IOLoop that update self._events#将新的事件加入到待处理队列中,现代非阻塞的网络库都使用的是这种方式self._events.update(event_pairs)#作者在写这段代码的过程当中不是使用的简单的顺序遍历这个队列,而使用的方式是#将就绪事件逐个弹出,以防止在处理过程当中就绪事件发生改变while self._events:fd, events = self._events.popitem()#在处理过程当中,常常会遇到客户端异常终止的情况#一般情况下如果读取错误,服务端会产生一个 sigpipe信号#这时候需要忽略这个信号#这里我有一个疑问就是为什么在add_handler 的时候 handler是经过 context.wrap包装过的#而在这里是直接调用,按道理应该是通过_running_callback调用,不过这里显然处理了异常情况了try:self._handlers[fd](fd, events)except (KeyboardInterrupt, SystemExit):raiseexcept (OSError, IOError), e:if e.args[0] == errno.EPIPE:# Happens when the client closes the connectionpasselse:logging.error("Exception in I/O handler for fd %d",fd, exc_info=True)except:logging.error("Exception in I/O handler for fd %d",fd, exc_info=True)# reset the stopped flag so another start/stop pair can be issuedself._stopped = False#将定时事件清空if self._blocking_signal_threshold is not None:signal.setitimer(signal.ITIMER_REAL, 0, 0)
这段代码中值得注意的部分就是在几个方面:
1.超时事件的处理,timeout是一个排序后的列表,每次都是取得最前面最小的一个
2.在开始epoll循环的过程当中,设置阻塞sigalarm
3.在处理事件过程当中忽略sigpipe信号
4.在处理就绪事件过程当中,是通过每次pop一个来处理,而不是一次遍历
stop()函数
def stop(self):"""Stop the loop after the current event loop iteration is complete.If the event loop is not currently running, the next call to start()will return immediately.To use asynchronous methods from otherwise-synchronous code (such asunit tests), you can start and stop the event loop like this:ioloop = IOLoop()async_method(ioloop=ioloop, callback=ioloop.stop)ioloop.start()ioloop.start() will return after async_method has run its callback,whether that callback was invoked before or after ioloop.start."""self._running = Falseself._stopped = Trueself._wake()
简单的设置标志位后,向管道发送"x"停止事件循环
总结:IOLoop差不多就是这些内容,利用python简单和高可读性,看网络模块的实现会让我们更加的专注于
实现,而不是繁琐的基础代码的使用过程。
后面将看看IOStream类,是建立在IOLoop的一个上层封装,实现了基本的buffer事件