"""The ``RPC`` result backend for AMQP brokers.RPC-style result backend, using reply-to and one queue per client."""importtimeimportkombufromkombu.commonimportmaybe_declarefromkombu.utils.compatimportregister_after_forkfromkombu.utils.objectsimportcached_propertyfromceleryimportstatesfromcelery._stateimportcurrent_task,task_join_will_blockfrom.importbasefrom.asynchronousimportAsyncBackendMixin,BaseResultConsumer__all__=('BacklogLimitExceeded','RPCBackend')E_NO_CHORD_SUPPORT="""The "rpc" result backend does not support chords!Note that a group chained with a task is also upgraded to be a chord,as this pattern requires synchronization.Result backends that supports chords: Redis, Database, Memcached, and more."""
[文档]classBacklogLimitExceeded(Exception):"""Too much state history to fast-forward."""
[文档]classRPCBackend(base.Backend,AsyncBackendMixin):"""Base class for the RPC result backend."""Exchange=kombu.ExchangeProducer=kombu.ProducerResultConsumer=ResultConsumer#: Exception raised when there are too many messages for a task id.BacklogLimitExceeded=BacklogLimitExceededpersistent=Falsesupports_autoexpire=Truesupports_native_join=Trueretry_policy={'max_retries':20,'interval_start':0,'interval_step':1,'interval_max':1,}
[文档]classConsumer(kombu.Consumer):"""Consumer that requires manual declaration of queues."""auto_declare=False
[文档]classQueue(kombu.Queue):"""Queue that never caches declaration."""can_cache_declaration=False
def__init__(self,app,connection=None,exchange=None,exchange_type=None,persistent=None,serializer=None,auto_delete=True,**kwargs):super().__init__(app,**kwargs)conf=self.app.confself._connection=connectionself._out_of_band={}self.persistent=self.prepare_persistent(persistent)self.delivery_mode=2ifself.persistentelse1exchange=exchangeorconf.result_exchangeexchange_type=exchange_typeorconf.result_exchange_typeself.exchange=self._create_exchange(exchange,exchange_type,self.delivery_mode,)self.serializer=serializerorconf.result_serializerself.auto_delete=auto_deleteself.result_consumer=self.ResultConsumer(self,self.app,self.accept,self._pending_results,self._pending_messages,)ifregister_after_forkisnotNone:register_after_fork(self,_on_after_fork_cleanup_backend)def_after_fork(self):# clear state for child processes.self._pending_results.clear()self.result_consumer._after_fork()def_create_exchange(self,name,type='direct',delivery_mode=2):# uses direct to queue routing (anon exchange).returnself.Exchange(None)def_create_binding(self,task_id):"""Create new binding for task with id."""# RPC backend caches the binding, as one queue is used for all tasks.returnself.binding
[文档]defon_task_call(self,producer,task_id):# Called every time a task is sent when using this backend.# We declare the queue we receive replies on in advance of sending# the message, but we skip this if running in the prefork pool# (task_join_will_block), as we know the queue is already declared.ifnottask_join_will_block():maybe_declare(self.binding(producer.channel),retry=True)
[文档]defdestination_for(self,task_id,request):"""Get the destination for result by task id. Returns: Tuple[str, str]: tuple of ``(reply_to, correlation_id)``. """# Backends didn't always receive the `request`, so we must still# support old code that relies on current_task.try:request=requestorcurrent_task.requestexceptAttributeError:raiseRuntimeError(f'RPC backend missing task request for {task_id!r}')returnrequest.reply_to,request.correlation_idortask_id
[文档]defon_reply_declare(self,task_id):# Return value here is used as the `declare=` argument# for Producer.publish.# By default we don't have to declare anything when sending a result.pass
[文档]defon_result_fulfilled(self,result):# This usually cancels the queue after the result is received,# but we don't have to cancel since we have one queue per process.pass
[文档]defstore_result(self,task_id,result,state,traceback=None,request=None,**kwargs):"""Send task return value and state."""routing_key,correlation_id=self.destination_for(task_id,request)ifnotrouting_key:returnwithself.app.amqp.producer_pool.acquire(block=True)asproducer:producer.publish(self._to_result(task_id,state,result,traceback,request),exchange=self.exchange,routing_key=routing_key,correlation_id=correlation_id,serializer=self.serializer,retry=True,retry_policy=self.retry_policy,declare=self.on_reply_declare(task_id),delivery_mode=self.delivery_mode,)returnresult
[文档]defon_out_of_band_result(self,task_id,message):# Callback called when a reply for a task is received,# but we have no idea what to do with it.# Since the result is not pending, we put it in a separate# buffer: probably it will become pending later.ifself.result_consumer:self.result_consumer.on_out_of_band_result(message)self._out_of_band[task_id]=message
[文档]defget_task_meta(self,task_id,backlog_limit=1000):buffered=self._out_of_band.pop(task_id,None)ifbuffered:returnself._set_cache_by_message(task_id,buffered)# Polling and using basic_getlatest_by_id={}prev=Noneforaccinself._slurp_from_queue(task_id,self.accept,backlog_limit):tid=self._get_message_task_id(acc)prev,latest_by_id[tid]=latest_by_id.get(tid),accifprev:# backends aren't expected to keep history,# so we delete everything except the most recent state.prev.ack()prev=Nonelatest=latest_by_id.pop(task_id,None)fortid,msginlatest_by_id.items():self.on_out_of_band_result(tid,msg)iflatest:latest.requeue()returnself._set_cache_by_message(task_id,latest)else:# no new state, use previoustry:returnself._cache[task_id]exceptKeyError:# result probably pending.return{'status':states.PENDING,'result':None}
poll=get_task_meta# XXX compatdef_set_cache_by_message(self,task_id,message):payload=self._cache[task_id]=self.meta_from_decoded(message.payload)returnpayloaddef_slurp_from_queue(self,task_id,accept,limit=1000,no_ack=False):withself.app.pool.acquire_channel(block=True)as(_,channel):binding=self._create_binding(task_id)(channel)binding.declare()for_inrange(limit):msg=binding.get(accept=accept,no_ack=no_ack)ifnotmsg:breakyieldmsgelse:raiseself.BacklogLimitExceeded(task_id)def_get_message_task_id(self,message):try:# try property first so we don't have to deserialize# the payload.returnmessage.properties['correlation_id']except(AttributeError,KeyError):# message sent by old Celery version, need to deserialize.returnmessage.payload['task_id']
[文档]defreload_task_result(self,task_id):raiseNotImplementedError('reload_task_result is not supported by this backend.')
[文档]defreload_group_result(self,task_id):"""Reload group result, even if it has been previously fetched."""raiseNotImplementedError('reload_group_result is not supported by this backend.')
[文档]defsave_group(self,group_id,result):raiseNotImplementedError('save_group is not supported by this backend.')
[文档]defrestore_group(self,group_id,cache=True):raiseNotImplementedError('restore_group is not supported by this backend.')
[文档]defdelete_group(self,group_id):raiseNotImplementedError('delete_group is not supported by this backend.')
def__reduce__(self,args=(),kwargs=None):kwargs={}ifnotkwargselsekwargsreturnsuper().__reduce__(args,dict(kwargs,connection=self._connection,exchange=self.exchange.name,exchange_type=self.exchange.type,persistent=self.persistent,serializer=self.serializer,auto_delete=self.auto_delete,expires=self.expires,))@propertydefbinding(self):returnself.Queue(self.oid,self.exchange,self.oid,durable=False,auto_delete=True,expires=self.expires,)@cached_propertydefoid(self):# cached here is the app thread OID: name of queue we receive results on.returnself.app.thread_oid