[文档]classEvents(bootsteps.StartStopStep):"""Service used for sending monitoring events."""requires=(Connection,)def__init__(self,c,task_events=True,without_heartbeat=False,without_gossip=False,**kwargs):self.groups=Noneiftask_eventselse['worker']self.send_events=(task_eventsornotwithout_gossipornotwithout_heartbeat)self.enabled=self.send_eventsc.event_dispatcher=Nonesuper().__init__(c,**kwargs)
[文档]defstart(self,c):# flush events sent while connection was down.prev=self._close(c)dis=c.event_dispatcher=c.app.events.Dispatcher(c.connection_for_write(),hostname=c.hostname,enabled=self.send_events,groups=self.groups,# we currently only buffer events when the event loop is enabled# XXX This excludes eventlet/gevent, which should actually buffer.buffer_group=['task']ifc.hubelseNone,on_send_buffered=c.on_send_event_bufferedifc.hubelseNone,)ifprev:dis.extend_buffer(prev)dis.flush()
def_close(self,c):ifc.event_dispatcher:dispatcher=c.event_dispatcher# remember changes from remote control commands:self.groups=dispatcher.groups# close custom connectionifdispatcher.connection:ignore_errors(c,dispatcher.connection.close)ignore_errors(c,dispatcher.close)c.event_dispatcher=Nonereturndispatcher