"""Creating events, and event exchange definition."""importtimefromcopyimportcopyfromkombuimportExchange__all__=('Event','event_exchange','get_exchange','group_from',)EVENT_EXCHANGE_NAME='celeryev'#: Exchange used to send events on.#: Note: Use :func:`get_exchange` instead, as the type of#: exchange will vary depending on the broker connection.event_exchange=Exchange(EVENT_EXCHANGE_NAME,type='topic')
[文档]defEvent(type,_fields=None,__dict__=dict,__now__=time.time,**fields):"""Create an event. Notes: An event is simply a dictionary: the only required field is ``type``. A ``timestamp`` field will be set to the current time if not provided. """event=__dict__(_fields,**fields)if_fieldselsefieldsif'timestamp'notinevent:event.update(timestamp=__now__(),type=type)else:event['type']=typereturnevent
[文档]defgroup_from(type):"""Get the group part of an event type name. Example: >>> group_from('task-sent') 'task' >>> group_from('custom-my-event') 'custom' """returntype.split('-',1)[0]
[文档]defget_exchange(conn,name=EVENT_EXCHANGE_NAME):"""Get exchange used for sending events. Arguments: conn (kombu.Connection): Connection used for sending/receiving events. name (str): Name of the exchange. Default is ``celeryev``. Note: The event type changes if Redis is used as the transport (from topic -> fanout). """ex=copy(event_exchange)ifconn.transport.driver_typein{'redis','gcpubsub'}:# quick hack for Issue #436ex.type='fanout'ifname!=ex.name:ex.name=namereturnex