[文档]classEventReceiver(ConsumerMixin):"""Capture events. Arguments: connection (kombu.Connection): Connection to the broker. handlers (Mapping[Callable]): Event handlers. This is a map of event type names and their handlers. The special handler `"*"` captures all events that don't have a handler. """app=Nonedef__init__(self,channel,handlers=None,routing_key='#',node_id=None,app=None,queue_prefix=None,accept=None,queue_ttl=None,queue_expires=None):self.app=app_or_default(apporself.app)self.channel=maybe_channel(channel)self.handlers={}ifhandlersisNoneelsehandlersself.routing_key=routing_keyself.node_id=node_idoruuid()self.queue_prefix=queue_prefixorself.app.conf.event_queue_prefixself.exchange=get_exchange(self.connectionorself.app.connection_for_write(),name=self.app.conf.event_exchange)ifqueue_ttlisNone:queue_ttl=self.app.conf.event_queue_ttlifqueue_expiresisNone:queue_expires=self.app.conf.event_queue_expiresself.queue=Queue('.'.join([self.queue_prefix,self.node_id]),exchange=self.exchange,routing_key=self.routing_key,auto_delete=True,durable=False,message_ttl=queue_ttl,expires=queue_expires,)self.clock=self.app.clockself.adjust_clock=self.clock.adjustself.forward_clock=self.clock.forwardifacceptisNone:accept={self.app.conf.event_serializer,'json'}self.accept=accept
[文档]defprocess(self,type,event):"""Process event by dispatching to configured handler."""handler=self.handlers.get(type)orself.handlers.get('*')handlerandhandler(event)
[文档]defcapture(self,limit=None,timeout=None,wakeup=True):"""Open up a consumer capturing events. This has to run in the main process, and it will never stop unless :attr:`EventDispatcher.should_stop` is set to True, or forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`. """for_inself.consume(limit=limit,timeout=timeout,wakeup=wakeup):pass
[文档]defevent_from_message(self,body,localize=True,now=time.time,tzfields=_TZGETTER,adjust_timestamp=adjust_timestamp,CLIENT_CLOCK_SKEW=CLIENT_CLOCK_SKEW):type=body['type']iftype=='task-sent':# clients never sync so cannot use their clock value_c=body['clock']=(self.clock.valueor1)+CLIENT_CLOCK_SKEWself.adjust_clock(_c)else:try:clock=body['clock']exceptKeyError:body['clock']=self.forward_clock()else:self.adjust_clock(clock)iflocalize:try:offset,timestamp=tzfields(body)exceptKeyError:passelse:body['timestamp']=adjust_timestamp(timestamp,offset)body['local_received']=now()returntype,body
def_receive(self,body,message,list=list,isinstance=isinstance):ifisinstance(body,list):# celery 4.0+: List of eventsprocess,from_message=self.process,self.event_from_message[process(*from_message(event))foreventinbody]else:self.process(*self.event_from_message(body))@propertydefconnection(self):returnself.channel.connection.clientifself.channelelseNone