[文档]classEventDispatcher:"""Dispatches event messages. Arguments: connection (kombu.Connection): Connection to the broker. hostname (str): Hostname to identify ourselves as, by default uses the hostname returned by :func:`~celery.utils.anon_nodename`. groups (Sequence[str]): List of groups to send events for. :meth:`send` will ignore send requests to groups not in this list. If this is :const:`None`, all events will be sent. Example groups include ``"task"`` and ``"worker"``. enabled (bool): Set to :const:`False` to not actually publish any events, making :meth:`send` a no-op. channel (kombu.Channel): Can be used instead of `connection` to specify an exact channel to use when sending events. buffer_while_offline (bool): If enabled events will be buffered while the connection is down. :meth:`flush` must be called as soon as the connection is re-established. Note: You need to :meth:`close` this after use. """DISABLED_TRANSPORTS={'sql'}app=None# set of callbacks to be called when :meth:`enabled`.on_enabled=None# set of callbacks to be called when :meth:`disabled`.on_disabled=Nonedef__init__(self,connection=None,hostname=None,enabled=True,channel=None,buffer_while_offline=True,app=None,serializer=None,groups=None,delivery_mode=1,buffer_group=None,buffer_limit=24,on_send_buffered=None):self.app=app_or_default(apporself.app)self.connection=connectionself.channel=channelself.hostname=hostnameoranon_nodename()self.buffer_while_offline=buffer_while_offlineself.buffer_group=buffer_grouporfrozenset()self.buffer_limit=buffer_limitself.on_send_buffered=on_send_bufferedself._group_buffer=defaultdict(list)self.mutex=threading.Lock()self.producer=Noneself._outbound_buffer=deque()self.serializer=serializerorself.app.conf.event_serializerself.on_enabled=set()self.on_disabled=set()self.groups=set(groupsor[])self.tzoffset=[-time.timezone,-time.altzone]self.clock=self.app.clockself.delivery_mode=delivery_modeifnotconnectionandchannel:self.connection=channel.connection.clientself.enabled=enabledconninfo=self.connectionorself.app.connection_for_write()self.exchange=get_exchange(conninfo,name=self.app.conf.event_exchange)ifconninfo.transport.driver_typeinself.DISABLED_TRANSPORTS:self.enabled=Falseifself.enabled:self.enable()self.headers={'hostname':self.hostname}self.pid=os.getpid()def__enter__(self):returnselfdef__exit__(self,*exc_info):self.close()
[文档]defpublish(self,type,fields,producer,blind=False,Event=Event,**kwargs):"""Publish event using custom :class:`~kombu.Producer`. Arguments: type (str): Event type name, with group separated by dash (`-`). fields: Dictionary of event fields, must be json serializable. producer (kombu.Producer): Producer instance to use: only the ``publish`` method will be called. retry (bool): Retry in the event of connection failure. retry_policy (Mapping): Map of custom retry policy options. See :meth:`~kombu.Connection.ensure`. blind (bool): Don't set logical clock value (also don't forward the internal logical clock). Event (Callable): Event type used to create event. Defaults to :func:`Event`. utcoffset (Callable): Function returning the current utc offset in hours. """clock=Noneifblindelseself.clock.forward()event=Event(type,hostname=self.hostname,utcoffset=utcoffset(),pid=self.pid,clock=clock,**fields)withself.mutex:returnself._publish(event,producer,routing_key=type.replace('-','.'),**kwargs)
[文档]defsend(self,type,blind=False,utcoffset=utcoffset,retry=False,retry_policy=None,Event=Event,**fields):"""Send event. Arguments: type (str): Event type name, with group separated by dash (`-`). retry (bool): Retry in the event of connection failure. retry_policy (Mapping): Map of custom retry policy options. See :meth:`~kombu.Connection.ensure`. blind (bool): Don't set logical clock value (also don't forward the internal logical clock). Event (Callable): Event type used to create event, defaults to :func:`Event`. utcoffset (Callable): unction returning the current utc offset in hours. **fields (Any): Event fields -- must be json serializable. """ifself.enabled:groups,group=self.groups,group_from(type)ifgroupsandgroupnotingroups:returnifgroupinself.buffer_group:clock=self.clock.forward()event=Event(type,hostname=self.hostname,utcoffset=utcoffset(),pid=self.pid,clock=clock,**fields)buf=self._group_buffer[group]buf.append(event)iflen(buf)>=self.buffer_limit:self.flush()elifself.on_send_buffered:self.on_send_buffered()else:returnself.publish(type,fields,self.producer,blind=blind,Event=Event,retry=retry,retry_policy=retry_policy)
[文档]defflush(self,errors=True,groups=True):"""Flush the outbound buffer."""iferrors:buf=list(self._outbound_buffer)try:withself.mutex:forevent,routing_key,_inbuf:self._publish(event,self.producer,routing_key)finally:self._outbound_buffer.clear()ifgroups:withself.mutex:forgroup,eventsinself._group_buffer.items():self._publish(events,self.producer,'%s.multi'%group)events[:]=[]# list.clear
[文档]defextend_buffer(self,other):"""Copy the outbound buffer of another instance."""self._outbound_buffer.extend(other._outbound_buffer)
[文档]defclose(self):"""Close the event dispatcher."""self.mutex.locked()andself.mutex.release()self.producer=None