"""Sending/Receiving Messages (Kombu integration)."""importnumbersfromcollectionsimportnamedtuplefromcollections.abcimportMappingfromdatetimeimporttimedeltafromweakrefimportWeakValueDictionaryfromkombuimportConnection,Consumer,Exchange,Producer,Queue,poolsfromkombu.commonimportBroadcastfromkombu.utils.functionalimportmaybe_listfromkombu.utils.objectsimportcached_propertyfromceleryimportsignalsfromcelery.utils.nodenamesimportanon_nodenamefromcelery.utils.safereprimportsafereprfromcelery.utils.textimportindentastextindentfromcelery.utils.timeimportmaybe_make_awarefrom.importroutesas_routes__all__=('AMQP','Queues','task_message')#: earliest date supported by time.mktime.INT_MIN=-2147483648#: Human readable queue declaration.QUEUE_FORMAT=""".> {0.name:<16} exchange={0.exchange.name}({0.exchange.type}) \key={0.routing_key}"""task_message=namedtuple('task_message',('headers','properties','body','sent_event'))defutf8dict(d,encoding='utf-8'):return{k.decode(encoding)ifisinstance(k,bytes)elsek:vfork,vind.items()}
[文档]classQueues(dict):"""Queue name⇒ declaration mapping. Arguments: queues (Iterable): Initial list/tuple or dict of queues. create_missing (bool): By default any unknown queues will be added automatically, but if this flag is disabled the occurrence of unknown queues in `wanted` will raise :exc:`KeyError`. max_priority (int): Default x-max-priority for queues with none set. """#: If set, this is a subset of queues to consume from.#: The rest of the queues are then used for routing only._consume_from=Nonedef__init__(self,queues=None,default_exchange=None,create_missing=True,autoexchange=None,max_priority=None,default_routing_key=None):super().__init__()self.aliases=WeakValueDictionary()self.default_exchange=default_exchangeself.default_routing_key=default_routing_keyself.create_missing=create_missingself.autoexchange=ExchangeifautoexchangeisNoneelseautoexchangeself.max_priority=max_priorityifqueuesisnotNoneandnotisinstance(queues,Mapping):queues={q.name:qforqinqueues}queues=queuesor{}forname,qinqueues.items():self.add(q)ifisinstance(q,Queue)elseself.add_compat(name,**q)def__getitem__(self,name):try:returnself.aliases[name]exceptKeyError:returnsuper().__getitem__(name)def__setitem__(self,name,queue):ifself.default_exchangeandnotqueue.exchange:queue.exchange=self.default_exchangesuper().__setitem__(name,queue)ifqueue.alias:self.aliases[queue.alias]=queuedef__missing__(self,name):ifself.create_missing:returnself.add(self.new_missing(name))raiseKeyError(name)
[文档]defadd(self,queue,**kwargs):"""Add new queue. The first argument can either be a :class:`kombu.Queue` instance, or the name of a queue. If the former the rest of the keyword arguments are ignored, and options are simply taken from the queue instance. Arguments: queue (kombu.Queue, str): Queue to add. exchange (kombu.Exchange, str): if queue is str, specifies exchange name. routing_key (str): if queue is str, specifies binding key. exchange_type (str): if queue is str, specifies type of exchange. **options (Any): Additional declaration options used when queue is a str. """ifnotisinstance(queue,Queue):returnself.add_compat(queue,**kwargs)returnself._add(queue)
[文档]defadd_compat(self,name,**options):# docs used to use binding_key as routing keyoptions.setdefault('routing_key',options.get('binding_key'))ifoptions['routing_key']isNone:options['routing_key']=namereturnself._add(Queue.from_dict(name,**options))
[文档]defformat(self,indent=0,indent_first=True):"""Format routing table into string for log dumps."""active=self.consume_fromifnotactive:return''info=[QUEUE_FORMAT.strip().format(q)for_,qinsorted(active.items())]ifindent_first:returntextindent('\n'.join(info),indent)returninfo[0]+'\n'+textindent('\n'.join(info[1:]),indent)
[文档]defselect_add(self,queue,**kwargs):"""Add new task queue that'll be consumed from. The queue will be active even when a subset has been selected using the :option:`celery worker -Q` option. """q=self.add(queue,**kwargs)ifself._consume_fromisnotNone:self._consume_from[q.name]=qreturnq
[文档]defselect(self,include):"""Select a subset of currently defined queues to consume from. Arguments: include (Sequence[str], str): Names of queues to consume from. """ifinclude:self._consume_from={name:self[name]fornameinmaybe_list(include)}
[文档]defdeselect(self,exclude):"""Deselect queues so that they won't be consumed from. Arguments: exclude (Sequence[str], str): Names of queues to avoid consuming from. """ifexclude:exclude=maybe_list(exclude)ifself._consume_fromisNone:# using all queuesreturnself.select(kforkinselfifknotinexclude)# using selectionforqueueinexclude:self._consume_from.pop(queue,None)
[文档]classAMQP:"""App AMQP API: app.amqp."""Connection=ConnectionConsumer=ConsumerProducer=Producer#: compat alias to ConnectionBrokerConnection=Connectionqueues_cls=Queues#: Cached and prepared routing table._rtable=None#: Underlying producer pool instance automatically#: set by the :attr:`producer_pool`._producer_pool=None# Exchange class/function used when defining automatic queues.# For example, you can use ``autoexchange = lambda n: None`` to use the# AMQP default exchange: a shortcut to bypass routing# and instead send directly to the queue named in the routing key.autoexchange=None#: Max size of positional argument representation used for#: logging purposes.argsrepr_maxsize=1024#: Max size of keyword argument representation used for logging purposes.kwargsrepr_maxsize=1024def__init__(self,app):self.app=appself.task_protocols={1:self.as_task_v1,2:self.as_task_v2,}self.app._conf.bind_to(self._handle_conf_update)@cached_propertydefcreate_task_message(self):returnself.task_protocols[self.app.conf.task_protocol]@cached_propertydefsend_task_message(self):returnself._create_task_sender()
[文档]defQueues(self,queues,create_missing=None,autoexchange=None,max_priority=None):# Create new :class:`Queues` instance, using queue defaults# from the current configuration.conf=self.app.confdefault_routing_key=conf.task_default_routing_keyifcreate_missingisNone:create_missing=conf.task_create_missing_queuesifmax_priorityisNone:max_priority=conf.task_queue_max_priorityifnotqueuesandconf.task_default_queue:queue_arguments=Noneifconf.task_default_queue_type=='quorum':queue_arguments={'x-queue-type':'quorum'}queues=(Queue(conf.task_default_queue,exchange=self.default_exchange,routing_key=default_routing_key,queue_arguments=queue_arguments),)autoexchange=(self.autoexchangeifautoexchangeisNoneelseautoexchange)returnself.queues_cls(queues,self.default_exchange,create_missing,autoexchange,max_priority,default_routing_key,)
[文档]defRouter(self,queues=None,create_missing=None):"""Return the current task router."""return_routes.Router(self.routes,queuesorself.queues,self.app.either('task_create_missing_queues',create_missing),app=self.app)
defTaskConsumer(self,channel,queues=None,accept=None,**kw):ifacceptisNone:accept=self.app.conf.accept_contentreturnself.Consumer(channel,accept=accept,queues=queuesorlist(self.queues.consume_from.values()),**kw)defas_task_v2(self,task_id,name,args=None,kwargs=None,countdown=None,eta=None,group_id=None,group_index=None,expires=None,retries=0,chord=None,callbacks=None,errbacks=None,reply_to=None,time_limit=None,soft_time_limit=None,create_sent_event=False,root_id=None,parent_id=None,shadow=None,chain=None,now=None,timezone=None,origin=None,ignore_result=False,argsrepr=None,kwargsrepr=None,stamped_headers=None,replaced_task_nesting=0,**options):args=argsor()kwargs=kwargsor{}ifnotisinstance(args,(list,tuple)):raiseTypeError('task args must be a list or tuple')ifnotisinstance(kwargs,Mapping):raiseTypeError('task keyword arguments must be a mapping')ifcountdown:# convert countdown to ETAself._verify_seconds(countdown,'countdown')now=noworself.app.now()timezone=timezoneorself.app.timezoneeta=maybe_make_aware(now+timedelta(seconds=countdown),tz=timezone,)ifisinstance(expires,numbers.Real):self._verify_seconds(expires,'expires')now=noworself.app.now()timezone=timezoneorself.app.timezoneexpires=maybe_make_aware(now+timedelta(seconds=expires),tz=timezone,)ifnotisinstance(eta,str):eta=etaandeta.isoformat()# If we retry a task `expires` will already be ISO8601-formatted.ifnotisinstance(expires,str):expires=expiresandexpires.isoformat()ifargsreprisNone:argsrepr=saferepr(args,self.argsrepr_maxsize)ifkwargsreprisNone:kwargsrepr=saferepr(kwargs,self.kwargsrepr_maxsize)ifnotroot_id:# empty root_id defaults to task_idroot_id=task_idstamps={header:options[header]forheaderinstamped_headersor[]}headers={'lang':'py','task':name,'id':task_id,'shadow':shadow,'eta':eta,'expires':expires,'group':group_id,'group_index':group_index,'retries':retries,'timelimit':[time_limit,soft_time_limit],'root_id':root_id,'parent_id':parent_id,'argsrepr':argsrepr,'kwargsrepr':kwargsrepr,'origin':originoranon_nodename(),'ignore_result':ignore_result,'replaced_task_nesting':replaced_task_nesting,'stamped_headers':stamped_headers,'stamps':stamps,}returntask_message(headers=headers,properties={'correlation_id':task_id,'reply_to':reply_toor'',},body=(args,kwargs,{'callbacks':callbacks,'errbacks':errbacks,'chain':chain,'chord':chord,},),sent_event={'uuid':task_id,'root_id':root_id,'parent_id':parent_id,'name':name,'args':argsrepr,'kwargs':kwargsrepr,'retries':retries,'eta':eta,'expires':expires,}ifcreate_sent_eventelseNone,)defas_task_v1(self,task_id,name,args=None,kwargs=None,countdown=None,eta=None,group_id=None,group_index=None,expires=None,retries=0,chord=None,callbacks=None,errbacks=None,reply_to=None,time_limit=None,soft_time_limit=None,create_sent_event=False,root_id=None,parent_id=None,shadow=None,now=None,timezone=None,**compat_kwargs):args=argsor()kwargs=kwargsor{}utc=self.utcifnotisinstance(args,(list,tuple)):raiseTypeError('task args must be a list or tuple')ifnotisinstance(kwargs,Mapping):raiseTypeError('task keyword arguments must be a mapping')ifcountdown:# convert countdown to ETAself._verify_seconds(countdown,'countdown')now=noworself.app.now()eta=now+timedelta(seconds=countdown)ifisinstance(expires,numbers.Real):self._verify_seconds(expires,'expires')now=noworself.app.now()expires=now+timedelta(seconds=expires)eta=etaandeta.isoformat()expires=expiresandexpires.isoformat()returntask_message(headers={},properties={'correlation_id':task_id,'reply_to':reply_toor'',},body={'task':name,'id':task_id,'args':args,'kwargs':kwargs,'group':group_id,'group_index':group_index,'retries':retries,'eta':eta,'expires':expires,'utc':utc,'callbacks':callbacks,'errbacks':errbacks,'timelimit':(time_limit,soft_time_limit),'taskset':group_id,'chord':chord,},sent_event={'uuid':task_id,'name':name,'args':saferepr(args),'kwargs':saferepr(kwargs),'retries':retries,'eta':eta,'expires':expires,}ifcreate_sent_eventelseNone,)def_verify_seconds(self,s,what):ifs<INT_MIN:raiseValueError(f'{what} is out of range: {s!r}')returnsdef_create_task_sender(self):default_retry=self.app.conf.task_publish_retrydefault_policy=self.app.conf.task_publish_retry_policydefault_delivery_mode=self.app.conf.task_default_delivery_modedefault_queue=self.default_queuequeues=self.queuessend_before_publish=signals.before_task_publish.sendbefore_receivers=signals.before_task_publish.receiverssend_after_publish=signals.after_task_publish.sendafter_receivers=signals.after_task_publish.receiverssend_task_sent=signals.task_sent.send# XXX compatsent_receivers=signals.task_sent.receiversdefault_evd=self._event_dispatcherdefault_exchange=self.default_exchangedefault_rkey=self.app.conf.task_default_routing_keydefault_serializer=self.app.conf.task_serializerdefault_compressor=self.app.conf.task_compressiondefsend_task_message(producer,name,message,exchange=None,routing_key=None,queue=None,event_dispatcher=None,retry=None,retry_policy=None,serializer=None,delivery_mode=None,compression=None,declare=None,headers=None,exchange_type=None,timeout=None,confirm_timeout=None,**kwargs):retry=default_retryifretryisNoneelseretryheaders2,properties,body,sent_event=messageifheaders:headers2.update(headers)ifkwargs:properties.update(kwargs)qname=queueifqueueisNoneandexchangeisNone:queue=default_queueifqueueisnotNone:ifisinstance(queue,str):qname,queue=queue,queues[queue]else:qname=queue.nameifdelivery_modeisNone:try:delivery_mode=queue.exchange.delivery_modeexceptAttributeError:passdelivery_mode=delivery_modeordefault_delivery_modeifexchange_typeisNone:try:exchange_type=queue.exchange.typeexceptAttributeError:exchange_type='direct'# convert to anon-exchange, when exchange not set and direct ex.if(notexchangeornotrouting_key)andexchange_type=='direct':exchange,routing_key='',qnameelifexchangeisNone:# not topic exchange, and exchange not undefinedexchange=queue.exchange.nameordefault_exchangerouting_key=routing_keyorqueue.routing_keyordefault_rkeyifdeclareisNoneandqueueandnotisinstance(queue,Broadcast):declare=[queue]# merge default and custom policyretry=default_retryifretryisNoneelseretry_rp=(dict(default_policy,**retry_policy)ifretry_policyelsedefault_policy)ifbefore_receivers:send_before_publish(sender=name,body=body,exchange=exchange,routing_key=routing_key,declare=declare,headers=headers2,properties=properties,retry_policy=retry_policy,)ret=producer.publish(body,exchange=exchange,routing_key=routing_key,serializer=serializerordefault_serializer,compression=compressionordefault_compressor,retry=retry,retry_policy=_rp,delivery_mode=delivery_mode,declare=declare,headers=headers2,timeout=timeout,confirm_timeout=confirm_timeout,**properties)ifafter_receivers:send_after_publish(sender=name,body=body,headers=headers2,exchange=exchange,routing_key=routing_key)ifsent_receivers:# XXX deprecatedifisinstance(body,tuple):# protocol version 2send_task_sent(sender=name,task_id=headers2['id'],task=name,args=body[0],kwargs=body[1],eta=headers2['eta'],taskset=headers2['group'],)else:# protocol version 1send_task_sent(sender=name,task_id=body['id'],task=name,args=body['args'],kwargs=body['kwargs'],eta=body['eta'],taskset=body['taskset'],)ifsent_event:evd=event_dispatcherordefault_evdexname=exchangeifisinstance(exname,Exchange):exname=exname.namesent_event.update({'queue':qname,'exchange':exname,'routing_key':routing_key,})evd.publish('task-sent',sent_event,producer,retry=retry,retry_policy=retry_policy)returnretreturnsend_task_message@cached_propertydefdefault_queue(self):returnself.queues[self.app.conf.task_default_queue]@cached_propertydefqueues(self):"""Queue name⇒ declaration mapping."""returnself.Queues(self.app.conf.task_queues)@queues.setterdefqueues(self,queues):returnself.Queues(queues)@propertydefroutes(self):ifself._rtableisNone:self.flush_routes()returnself._rtable@cached_propertydefrouter(self):returnself.Router()@router.setterdefrouter(self,value):returnvalue@propertydefproducer_pool(self):ifself._producer_poolisNone:self._producer_pool=pools.producers[self.app.connection_for_write()]self._producer_pool.limit=self.app.pool.limitreturnself._producer_poolpublisher_pool=producer_pool# compat alias@cached_propertydefdefault_exchange(self):returnExchange(self.app.conf.task_default_exchange,self.app.conf.task_default_exchange_type)@cached_propertydefutc(self):returnself.app.conf.enable_utc@cached_propertydef_event_dispatcher(self):# We call Dispatcher.publish with a custom producer# so don't need the dispatcher to be enabled.returnself.app.events.Dispatcher(enabled=False)def_handle_conf_update(self,*args,**kwargs):if('task_routes'inkwargsor'task_routes'inargs):self.flush_routes()self.router=self.Router()return