"""Task results/state and results for groups of tasks."""importdatetimeimporttimefromcollectionsimportdequefromcontextlibimportcontextmanagerfromweakrefimportproxyfromdateutil.parserimportisoparsefromkombu.utils.objectsimportcached_propertyfromvineimportThenable,barrier,promisefrom.importcurrent_app,statesfrom._stateimport_set_task_join_will_block,task_join_will_blockfrom.appimportapp_or_defaultfrom.exceptionsimportImproperlyConfigured,IncompleteStream,TimeoutErrorfrom.utils.graphimportDependencyGraph,GraphFormattertry:importtblibexceptImportError:tblib=None__all__=('ResultBase','AsyncResult','ResultSet','GroupResult','EagerResult','result_from_tuple',)E_WOULDBLOCK="""\Never call result.get() within a task!See https://docs.celeryq.dev/en/latest/userguide/tasks.html\#avoid-launching-synchronous-subtasks"""defassert_will_not_block():iftask_join_will_block():raiseRuntimeError(E_WOULDBLOCK)@contextmanagerdefallow_join_result():reset_value=task_join_will_block()_set_task_join_will_block(False)try:yieldfinally:_set_task_join_will_block(reset_value)@contextmanagerdefdenied_join_result():reset_value=task_join_will_block()_set_task_join_will_block(True)try:yieldfinally:_set_task_join_will_block(reset_value)
[文档]classResultBase:"""Base class for results."""#: Parent result (if part of a chain)parent=None
[文档]@Thenable.registerclassAsyncResult(ResultBase):"""Query task state. Arguments: id (str): See :attr:`id`. backend (Backend): See :attr:`backend`. """app=None#: Error raised for timeouts.TimeoutError=TimeoutError#: The task's UUID.id=None#: The task result backend to use.backend=Nonedef__init__(self,id,backend=None,task_name=None,# deprecatedapp=None,parent=None):ifidisNone:raiseValueError(f'AsyncResult requires valid id, not {type(id)}')self.app=app_or_default(apporself.app)self.id=idself.backend=backendorself.app.backendself.parent=parentself.on_ready=promise(self._on_fulfilled,weak=True)self._cache=Noneself._ignored=False@propertydefignored(self):"""If True, task result retrieval is disabled."""ifhasattr(self,'_ignored'):returnself._ignoredreturnFalse@ignored.setterdefignored(self,value):"""Enable/disable task result retrieval."""self._ignored=value
[文档]defas_list(self):"""Return as a list of task IDs."""results=[]parent=self.parentresults.append(self.id)ifparentisnotNone:results.extend(parent.as_list())returnresults
[文档]defforget(self):"""Forget the result of this task and its parents."""self._cache=Noneifself.parent:self.parent.forget()self.backend.forget(self.id)
[文档]defrevoke(self,connection=None,terminate=False,signal=None,wait=False,timeout=None):"""Send revoke signal to all workers. Any worker receiving the task, or having reserved the task, *must* ignore it. Arguments: terminate (bool): Also terminate the process currently working on the task (if any). signal (str): Name of signal to send to process if terminate. Default is TERM. wait (bool): Wait for replies from workers. The ``timeout`` argument specifies the seconds to wait. Disabled by default. timeout (float): Time in seconds to wait for replies when ``wait`` is enabled. """self.app.control.revoke(self.id,connection=connection,terminate=terminate,signal=signal,reply=wait,timeout=timeout)
[文档]defrevoke_by_stamped_headers(self,headers,connection=None,terminate=False,signal=None,wait=False,timeout=None):"""Send revoke signal to all workers only for tasks with matching headers values. Any worker receiving the task, or having reserved the task, *must* ignore it. All header fields *must* match. Arguments: headers (dict[str, Union(str, list)]): Headers to match when revoking tasks. terminate (bool): Also terminate the process currently working on the task (if any). signal (str): Name of signal to send to process if terminate. Default is TERM. wait (bool): Wait for replies from workers. The ``timeout`` argument specifies the seconds to wait. Disabled by default. timeout (float): Time in seconds to wait for replies when ``wait`` is enabled. """self.app.control.revoke_by_stamped_headers(headers,connection=connection,terminate=terminate,signal=signal,reply=wait,timeout=timeout)
[文档]defget(self,timeout=None,propagate=True,interval=0.5,no_ack=True,follow_parents=True,callback=None,on_message=None,on_interval=None,disable_sync_subtasks=True,EXCEPTION_STATES=states.EXCEPTION_STATES,PROPAGATE_STATES=states.PROPAGATE_STATES):"""Wait until task is ready, and return its result. Warning: Waiting for tasks within a task may lead to deadlocks. Please read :ref:`task-synchronous-subtasks`. Warning: Backends use resources to store and transmit results. To ensure that resources are released, you must eventually call :meth:`~@AsyncResult.get` or :meth:`~@AsyncResult.forget` on EVERY :class:`~@AsyncResult` instance returned after calling a task. Arguments: timeout (float): How long to wait, in seconds, before the operation times out. This is the setting for the publisher (celery client) and is different from `timeout` parameter of `@app.task`, which is the setting for the worker. The task isn't terminated even if timeout occurs. propagate (bool): Re-raise exception if the task failed. interval (float): Time to wait (in seconds) before retrying to retrieve the result. Note that this does not have any effect when using the RPC/redis result store backends, as they don't use polling. no_ack (bool): Enable amqp no ack (automatically acknowledge message). If this is :const:`False` then the message will **not be acked**. follow_parents (bool): Re-raise any exception raised by parent tasks. disable_sync_subtasks (bool): Disable tasks to wait for sub tasks this is the default configuration. CAUTION do not enable this unless you must. Raises: celery.exceptions.TimeoutError: if `timeout` isn't :const:`None` and the result does not arrive within `timeout` seconds. Exception: If the remote call raised an exception then that exception will be re-raised in the caller process. """ifself.ignored:returnifdisable_sync_subtasks:assert_will_not_block()_on_interval=promise()iffollow_parentsandpropagateandself.parent:_on_interval=promise(self._maybe_reraise_parent_error,weak=True)self._maybe_reraise_parent_error()ifon_interval:_on_interval.then(on_interval)ifself._cache:ifpropagate:self.maybe_throw(callback=callback)returnself.resultself.backend.add_pending_result(self)returnself.backend.wait_for_pending(self,timeout=timeout,interval=interval,on_interval=_on_interval,no_ack=no_ack,propagate=propagate,callback=callback,on_message=on_message,)
wait=get# deprecated alias to :meth:`get`.def_maybe_reraise_parent_error(self):fornodeinreversed(list(self._parents())):node.maybe_throw()def_parents(self):node=self.parentwhilenode:yieldnodenode=node.parent
[文档]defcollect(self,intermediate=False,**kwargs):"""Collect results as they return. Iterator, like :meth:`get` will wait for the task to complete, but will also follow :class:`AsyncResult` and :class:`ResultSet` returned by the task, yielding ``(result, value)`` tuples for each result in the tree. An example would be having the following tasks: .. code-block:: python from celery import group from proj.celery import app @app.task(trail=True) def A(how_many): return group(B.s(i) for i in range(how_many))() @app.task(trail=True) def B(i): return pow2.delay(i) @app.task(trail=True) def pow2(i): return i ** 2 .. code-block:: pycon >>> from celery.result import ResultBase >>> from proj.tasks import A >>> result = A.delay(10) >>> [v for v in result.collect() ... if not isinstance(v, (ResultBase, tuple))] [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] Note: The ``Task.trail`` option must be enabled so that the list of children is stored in ``result.children``. This is the default but enabled explicitly for illustration. Yields: Tuple[AsyncResult, Any]: tuples containing the result instance of the child task, and the return value of that task. """for_,Rinself.iterdeps(intermediate=intermediate):yieldR,R.get(**kwargs)
[文档]defready(self):"""Return :const:`True` if the task has executed. If the task is still running, pending, or is waiting for retry then :const:`False` is returned. """returnself.stateinself.backend.READY_STATES
[文档]defsuccessful(self):"""Return :const:`True` if the task executed successfully."""returnself.state==states.SUCCESS
[文档]deffailed(self):"""Return :const:`True` if the task failed."""returnself.state==states.FAILURE
def__str__(self):"""`str(self) -> self.id`."""returnstr(self.id)def__hash__(self):"""`hash(self) -> hash(self.id)`."""returnhash(self.id)def__repr__(self):returnf'<{type(self).__name__}: {self.id}>'def__eq__(self,other):ifisinstance(other,AsyncResult):returnother.id==self.idelifisinstance(other,str):returnother==self.idreturnNotImplementeddef__copy__(self):returnself.__class__(self.id,self.backend,None,self.app,self.parent,)def__reduce__(self):returnself.__class__,self.__reduce_args__()def__reduce_args__(self):returnself.id,self.backend,None,None,self.parentdef__del__(self):"""Cancel pending operations when the instance is destroyed."""ifself.backendisnotNone:self.backend.remove_pending_result(self)@cached_propertydefgraph(self):returnself.build_graph()@propertydefsupports_native_join(self):returnself.backend.supports_native_join@propertydefchildren(self):returnself._get_task_meta().get('children')def_maybe_set_cache(self,meta):ifmeta:state=meta['status']ifstateinstates.READY_STATES:d=self._set_cache(self.backend.meta_from_decoded(meta))self.on_ready(self)returndreturnmetadef_get_task_meta(self):ifself._cacheisNone:returnself._maybe_set_cache(self.backend.get_task_meta(self.id))returnself._cachedef_iter_meta(self,**kwargs):returniter([self._get_task_meta()])def_set_cache(self,d):children=d.get('children')ifchildren:d['children']=[result_from_tuple(child,self.app)forchildinchildren]self._cache=dreturnd@propertydefresult(self):"""Task return value. Note: When the task has been executed, this contains the return value. If the task raised an exception, this will be the exception instance. """returnself._get_task_meta()['result']info=result@propertydeftraceback(self):"""Get the traceback of a failed task."""returnself._get_task_meta().get('traceback')@propertydefstate(self):"""The tasks current state. Possible values includes: *PENDING* The task is waiting for execution. *STARTED* The task has been started. *RETRY* The task is to be retried, possibly because of failure. *FAILURE* The task raised an exception, or has exceeded the retry limit. The :attr:`result` attribute then contains the exception raised by the task. *SUCCESS* The task executed successfully. The :attr:`result` attribute then contains the tasks return value. """returnself._get_task_meta()['status']status=state# XXX compat@propertydeftask_id(self):"""Compat. alias to :attr:`id`."""returnself.id@task_id.setterdeftask_id(self,id):self.id=id@propertydefname(self):returnself._get_task_meta().get('name')@propertydefargs(self):returnself._get_task_meta().get('args')@propertydefkwargs(self):returnself._get_task_meta().get('kwargs')@propertydefworker(self):returnself._get_task_meta().get('worker')@propertydefdate_done(self):"""UTC date and time."""date_done=self._get_task_meta().get('date_done')ifdate_doneandnotisinstance(date_done,datetime.datetime):returnisoparse(date_done)returndate_done@propertydefretries(self):returnself._get_task_meta().get('retries')@propertydefqueue(self):returnself._get_task_meta().get('queue')
[文档]@Thenable.registerclassResultSet(ResultBase):"""A collection of results. Arguments: results (Sequence[AsyncResult]): List of result instances. """_app=None#: List of results in in the set.results=Nonedef__init__(self,results,app=None,ready_barrier=None,**kwargs):self._app=appself.results=resultsself.on_ready=promise(args=(proxy(self),))self._on_full=ready_barrierorbarrier(results)ifself._on_full:self._on_full.then(promise(self._on_ready,weak=True))
[文档]defadd(self,result):"""Add :class:`AsyncResult` as a new member of the set. Does nothing if the result is already a member. """ifresultnotinself.results:self.results.append(result)ifself._on_full:self._on_full.add(result)
[文档]defremove(self,result):"""Remove result from the set; it must be a member. Raises: KeyError: if the result isn't a member. """ifisinstance(result,str):result=self.app.AsyncResult(result)try:self.results.remove(result)exceptValueError:raiseKeyError(result)
[文档]defdiscard(self,result):"""Remove result from the set if it is a member. Does nothing if it's not a member. """try:self.remove(result)exceptKeyError:pass
[文档]defupdate(self,results):"""Extend from iterable of results."""self.results.extend(rforrinresultsifrnotinself.results)
[文档]defclear(self):"""Remove all results from this set."""self.results[:]=[]# don't create new list.
[文档]defsuccessful(self):"""Return true if all tasks successful. Returns: bool: true if all of the tasks finished successfully (i.e. didn't raise an exception). """returnall(result.successful()forresultinself.results)
[文档]deffailed(self):"""Return true if any of the tasks failed. Returns: bool: true if one of the tasks failed. (i.e., raised an exception) """returnany(result.failed()forresultinself.results)
[文档]defwaiting(self):"""Return true if any of the tasks are incomplete. Returns: bool: true if one of the tasks are still waiting for execution. """returnany(notresult.ready()forresultinself.results)
[文档]defready(self):"""Did all of the tasks complete? (either by success of failure). Returns: bool: true if all of the tasks have been executed. """returnall(result.ready()forresultinself.results)
[文档]defcompleted_count(self):"""Task completion count. Note that `complete` means `successful` in this context. In other words, the return value of this method is the number of ``successful`` tasks. Returns: int: the number of complete (i.e. successful) tasks. """returnsum(int(result.successful())forresultinself.results)
[文档]defforget(self):"""Forget about (and possible remove the result of) all the tasks."""forresultinself.results:result.forget()
[文档]defrevoke(self,connection=None,terminate=False,signal=None,wait=False,timeout=None):"""Send revoke signal to all workers for all tasks in the set. Arguments: terminate (bool): Also terminate the process currently working on the task (if any). signal (str): Name of signal to send to process if terminate. Default is TERM. wait (bool): Wait for replies from worker. The ``timeout`` argument specifies the number of seconds to wait. Disabled by default. timeout (float): Time in seconds to wait for replies when the ``wait`` argument is enabled. """self.app.control.revoke([r.idforrinself.results],connection=connection,timeout=timeout,terminate=terminate,signal=signal,reply=wait)
[文档]defget(self,timeout=None,propagate=True,interval=0.5,callback=None,no_ack=True,on_message=None,disable_sync_subtasks=True,on_interval=None):"""See :meth:`join`. This is here for API compatibility with :class:`AsyncResult`, in addition it uses :meth:`join_native` if available for the current result backend. """return(self.join_nativeifself.supports_native_joinelseself.join)(timeout=timeout,propagate=propagate,interval=interval,callback=callback,no_ack=no_ack,on_message=on_message,disable_sync_subtasks=disable_sync_subtasks,on_interval=on_interval,)
[文档]defjoin(self,timeout=None,propagate=True,interval=0.5,callback=None,no_ack=True,on_message=None,disable_sync_subtasks=True,on_interval=None):"""Gather the results of all tasks as a list in order. Note: This can be an expensive operation for result store backends that must resort to polling (e.g., database). You should consider using :meth:`join_native` if your backend supports it. Warning: Waiting for tasks within a task may lead to deadlocks. Please see :ref:`task-synchronous-subtasks`. Arguments: timeout (float): The number of seconds to wait for results before the operation times out. propagate (bool): If any of the tasks raises an exception, the exception will be re-raised when this flag is set. interval (float): Time to wait (in seconds) before retrying to retrieve a result from the set. Note that this does not have any effect when using the amqp result store backend, as it does not use polling. callback (Callable): Optional callback to be called for every result received. Must have signature ``(task_id, value)`` No results will be returned by this function if a callback is specified. The order of results is also arbitrary when a callback is used. To get access to the result object for a particular id you'll have to generate an index first: ``index = {r.id: r for r in gres.results.values()}`` Or you can create new result objects on the fly: ``result = app.AsyncResult(task_id)`` (both will take advantage of the backend cache anyway). no_ack (bool): Automatic message acknowledgment (Note that if this is set to :const:`False` then the messages *will not be acknowledged*). disable_sync_subtasks (bool): Disable tasks to wait for sub tasks this is the default configuration. CAUTION do not enable this unless you must. Raises: celery.exceptions.TimeoutError: if ``timeout`` isn't :const:`None` and the operation takes longer than ``timeout`` seconds. """ifdisable_sync_subtasks:assert_will_not_block()time_start=time.monotonic()remaining=Noneifon_messageisnotNone:raiseImproperlyConfigured('Backend does not support on_message callback')results=[]forresultinself.results:remaining=Noneiftimeout:remaining=timeout-(time.monotonic()-time_start)ifremaining<=0.0:raiseTimeoutError('join operation timed out')value=result.get(timeout=remaining,propagate=propagate,interval=interval,no_ack=no_ack,on_interval=on_interval,disable_sync_subtasks=disable_sync_subtasks,)ifcallback:callback(result.id,value)else:results.append(value)returnresults
[文档]defiter_native(self,timeout=None,interval=0.5,no_ack=True,on_message=None,on_interval=None):"""Backend optimized version of :meth:`iterate`. .. versionadded:: 2.2 Note that this does not support collecting the results for different task types using different backends. This is currently only supported by the amqp, Redis and cache result backends. """returnself.backend.iter_native(self,timeout=timeout,interval=interval,no_ack=no_ack,on_message=on_message,on_interval=on_interval,)
[文档]defjoin_native(self,timeout=None,propagate=True,interval=0.5,callback=None,no_ack=True,on_message=None,on_interval=None,disable_sync_subtasks=True):"""Backend optimized version of :meth:`join`. .. versionadded:: 2.2 Note that this does not support collecting the results for different task types using different backends. This is currently only supported by the amqp, Redis and cache result backends. """ifdisable_sync_subtasks:assert_will_not_block()order_index=Noneifcallbackelse{result.id:ifori,resultinenumerate(self.results)}acc=Noneifcallbackelse[Nonefor_inrange(len(self))]fortask_id,metainself.iter_native(timeout,interval,no_ack,on_message,on_interval):ifisinstance(meta,list):value=[]forchildren_resultinmeta:value.append(children_result.get())else:value=meta['result']ifpropagateandmeta['status']instates.PROPAGATE_STATES:raisevalueifcallback:callback(task_id,value)else:acc[order_index[task_id]]=valuereturnacc
[文档]@Thenable.registerclassGroupResult(ResultSet):"""Like :class:`ResultSet`, but with an associated id. This type is returned by :class:`~celery.group`. It enables inspection of the tasks state and return values as a single entity. Arguments: id (str): The id of the group. results (Sequence[AsyncResult]): List of result instances. parent (ResultBase): Parent result of this group. """#: The UUID of the group.id=None#: List/iterator of results in the groupresults=Nonedef__init__(self,id=None,results=None,parent=None,**kwargs):self.id=idself.parent=parentsuper().__init__(results,**kwargs)def_on_ready(self):self.backend.remove_pending_result(self)super()._on_ready()
[文档]defsave(self,backend=None):"""Save group-result for later retrieval using :meth:`restore`. Example: >>> def save_and_restore(result): ... result.save() ... result = GroupResult.restore(result.id) """return(backendorself.app.backend).save_group(self.id,self)
[文档]defdelete(self,backend=None):"""Remove this result if it was previously saved."""(backendorself.app.backend).delete_group(self.id)
def__reduce__(self):returnself.__class__,self.__reduce_args__()def__reduce_args__(self):returnself.id,self.resultsdef__bool__(self):returnbool(self.idorself.results)__nonzero__=__bool__# Included for Py2 backwards compatibilitydef__eq__(self,other):ifisinstance(other,GroupResult):return(other.id==self.idandother.results==self.resultsandother.parent==self.parent)elifisinstance(other,str):returnother==self.idreturnNotImplementeddef__repr__(self):returnf'<{type(self).__name__}: {self.id} [{", ".join(r.idforrinself.results)}]>'def__str__(self):"""`str(self) -> self.id`."""returnstr(self.id)def__hash__(self):"""`hash(self) -> hash(self.id)`."""returnhash(self.id)
[文档]@classmethoddefrestore(cls,id,backend=None,app=None):"""Restore previously saved group result."""app=appor(cls.appifnotisinstance(cls.app,property)elsecurrent_app)backend=backendorapp.backendreturnbackend.restore_group(id)
[文档]@Thenable.registerclassEagerResult(AsyncResult):"""Result that we know has already been executed."""def__init__(self,id,ret_value,state,traceback=None,name=None):# pylint: disable=super-init-not-called# XXX should really not be inheriting from AsyncResultself.id=idself._result=ret_valueself._state=stateself._traceback=tracebackself._name=nameself.on_ready=promise()self.on_ready(self)
def__repr__(self):returnf'<EagerResult: {self.id}>'@propertydef_cache(self):return{'task_id':self.id,'result':self._result,'status':self._state,'traceback':self._traceback,'name':self._name,}@propertydefresult(self):"""The tasks return value."""returnself._result@propertydefstate(self):"""The tasks state."""returnself._statestatus=state@propertydeftraceback(self):"""The traceback if the task failed."""returnself._traceback@propertydefsupports_native_join(self):returnFalse
[文档]defresult_from_tuple(r,app=None):"""Deserialize result from tuple."""# earlier backends may just pickle, so check if# result is already prepared.app=app_or_default(app)Result=app.AsyncResultifnotisinstance(r,ResultBase):res,nodes=rid,parent=resifisinstance(res,(list,tuple))else(res,None)ifparent:parent=result_from_tuple(parent,app)ifnodesisnotNone:returnapp.GroupResult(id,[result_from_tuple(child,app)forchildinnodes],parent=parent,)returnResult(id,parent=parent)returnr