"""Task request.This module defines the :class:`Request` class, that specifieshow tasks are executed."""importloggingimportsysfromdatetimeimportdatetimefromtimeimportmonotonic,timefromweakrefimportreffrombilliard.commonimportTERM_SIGNAMEfrombilliard.einfoimportExceptionWithTracebackfromkombu.utils.encodingimportsafe_repr,safe_strfromkombu.utils.objectsimportcached_propertyfromceleryimportcurrent_app,signalsfromcelery.app.taskimportContextfromcelery.app.traceimportfast_trace_task,trace_task,trace_task_retfromcelery.concurrency.baseimportBasePoolfromcelery.exceptionsimport(Ignore,InvalidTaskError,Reject,Retry,TaskRevokedError,Terminated,TimeLimitExceeded,WorkerLostError)fromcelery.platformsimportsignalsas_signalsfromcelery.utils.functionalimportmaybe,maybe_list,noopfromcelery.utils.logimportget_loggerfromcelery.utils.nodenamesimportgethostnamefromcelery.utils.serializationimportget_pickled_exceptionfromcelery.utils.timeimportmaybe_iso8601,maybe_make_aware,timezonefrom.importstate__all__=('Request',)# pylint: disable=redefined-outer-name# We cache globals and attribute lookups, so disable this warning.IS_PYPY=hasattr(sys,'pypy_version_info')logger=get_logger(__name__)debug,info,warn,error=(logger.debug,logger.info,logger.warning,logger.error)_does_info=False_does_debug=Falsedef__optimize__():# this is also called by celery.app.trace.setup_worker_optimizationsglobal_does_debugglobal_does_info_does_debug=logger.isEnabledFor(logging.DEBUG)_does_info=logger.isEnabledFor(logging.INFO)__optimize__()# Localizetz_or_local=timezone.tz_or_localsend_revoked=signals.task_revoked.sendsend_retry=signals.task_retry.sendtask_accepted=state.task_acceptedtask_ready=state.task_readyrevoked_tasks=state.revokedrevoked_stamps=state.revoked_stamps
[文档]classRequest:"""A request for task execution."""acknowledged=Falsetime_start=Noneworker_pid=Nonetime_limits=(None,None)_already_revoked=False_already_cancelled=False_terminate_on_ack=None_apply_result=None_tzlocal=NoneifnotIS_PYPY:# pragma: no cover__slots__=('_app','_type','name','id','_root_id','_parent_id','_on_ack','_body','_hostname','_eventer','_connection_errors','_task','_eta','_expires','_request_dict','_on_reject','_utc','_content_type','_content_encoding','_argsrepr','_kwargsrepr','_args','_kwargs','_decoded','__payload','__weakref__','__dict__',)def__init__(self,message,on_ack=noop,hostname=None,eventer=None,app=None,connection_errors=None,request_dict=None,task=None,on_reject=noop,body=None,headers=None,decoded=False,utc=True,maybe_make_aware=maybe_make_aware,maybe_iso8601=maybe_iso8601,**opts):self._message=messageself._request_dict=(message.headers.copy()ifheadersisNoneelseheaders.copy())self._body=message.bodyifbodyisNoneelsebodyself._app=appself._utc=utcself._decoded=decodedifdecoded:self._content_type=self._content_encoding=Noneelse:self._content_type,self._content_encoding=(message.content_type,message.content_encoding,)self.__payload=self._bodyifself._decodedelsemessage.payloadself.id=self._request_dict['id']self._type=self.name=self._request_dict['task']if'shadow'inself._request_dict:self.name=self._request_dict['shadow']orself.nameself._root_id=self._request_dict.get('root_id')self._parent_id=self._request_dict.get('parent_id')timelimit=self._request_dict.get('timelimit',None)iftimelimit:self.time_limits=timelimitself._argsrepr=self._request_dict.get('argsrepr','')self._kwargsrepr=self._request_dict.get('kwargsrepr','')self._on_ack=on_ackself._on_reject=on_rejectself._hostname=hostnameorgethostname()self._eventer=eventerself._connection_errors=connection_errorsor()self._task=taskorself._app.tasks[self._type]self._ignore_result=self._request_dict.get('ignore_result',False)# timezone means the message is timezone-aware, and the only timezone# supported at this point is UTC.eta=self._request_dict.get('eta')ifetaisnotNone:try:eta=maybe_iso8601(eta)except(AttributeError,ValueError,TypeError)asexc:raiseInvalidTaskError(f'invalid ETA value {eta!r}: {exc}')self._eta=maybe_make_aware(eta,self.tzlocal)else:self._eta=Noneexpires=self._request_dict.get('expires')ifexpiresisnotNone:try:expires=maybe_iso8601(expires)except(AttributeError,ValueError,TypeError)asexc:raiseInvalidTaskError(f'invalid expires value {expires!r}: {exc}')self._expires=maybe_make_aware(expires,self.tzlocal)else:self._expires=Nonedelivery_info=message.delivery_infoor{}properties=message.propertiesor{}self._delivery_info={'exchange':delivery_info.get('exchange'),'routing_key':delivery_info.get('routing_key'),'priority':properties.get('priority'),'redelivered':delivery_info.get('redelivered',False),}self._request_dict.update({'properties':properties,'reply_to':properties.get('reply_to'),'correlation_id':properties.get('correlation_id'),'hostname':self._hostname,'delivery_info':self._delivery_info})# this is a reference pass to avoid memory usage burstself._request_dict['args'],self._request_dict['kwargs'],_=self.__payloadself._args=self._request_dict['args']self._kwargs=self._request_dict['kwargs']@propertydefdelivery_info(self):returnself._delivery_info@propertydefmessage(self):returnself._message@propertydefrequest_dict(self):returnself._request_dict@propertydefbody(self):returnself._body@propertydefapp(self):returnself._app@propertydefutc(self):returnself._utc@propertydefcontent_type(self):returnself._content_type@propertydefcontent_encoding(self):returnself._content_encoding@propertydeftype(self):returnself._type@propertydefroot_id(self):returnself._root_id@propertydefparent_id(self):returnself._parent_id@propertydefargsrepr(self):returnself._argsrepr@propertydefargs(self):returnself._args@propertydefkwargs(self):returnself._kwargs@propertydefkwargsrepr(self):returnself._kwargsrepr@propertydefon_ack(self):returnself._on_ack@propertydefon_reject(self):returnself._on_reject@on_reject.setterdefon_reject(self,value):self._on_reject=value@propertydefhostname(self):returnself._hostname@propertydefignore_result(self):returnself._ignore_result@propertydefeventer(self):returnself._eventer@eventer.setterdefeventer(self,eventer):self._eventer=eventer@propertydefconnection_errors(self):returnself._connection_errors@propertydeftask(self):returnself._task@propertydefeta(self):returnself._eta@propertydefexpires(self):returnself._expires@expires.setterdefexpires(self,value):self._expires=value@propertydeftzlocal(self):ifself._tzlocalisNone:self._tzlocal=self._app.conf.timezonereturnself._tzlocal@propertydefstore_errors(self):return(notself.task.ignore_resultorself.task.store_errors_even_if_ignored)@propertydeftask_id(self):# XXX compatreturnself.id@task_id.setterdeftask_id(self,value):self.id=value@propertydeftask_name(self):# XXX compatreturnself.name@task_name.setterdeftask_name(self,value):self.name=value@propertydefreply_to(self):# used by rpc backend when failures reported by parent processreturnself._request_dict['reply_to']@propertydefreplaced_task_nesting(self):returnself._request_dict.get('replaced_task_nesting',0)@propertydefgroups(self):returnself._request_dict.get('groups',[])@propertydefstamped_headers(self)->list:returnself._request_dict.get('stamped_headers')or[]@propertydefstamps(self)->dict:stamps=self._request_dict.get('stamps')or{}return{header:stamps.get(header)forheaderinself.stamped_headers}@propertydefcorrelation_id(self):# used similarly to reply_toreturnself._request_dict['correlation_id']
[文档]defexecute_using_pool(self,pool:BasePool,**kwargs):"""Used by the worker to send this task to the pool. Arguments: pool (~celery.concurrency.base.TaskPool): The execution pool used to execute this request. Raises: celery.exceptions.TaskRevokedError: if the task was revoked. """task_id=self.idtask=self._taskifself.revoked():raiseTaskRevokedError(task_id)time_limit,soft_time_limit=self.time_limitstrace=fast_trace_taskifself._app.use_fast_trace_taskelsetrace_task_retresult=pool.apply_async(trace,args=(self._type,task_id,self._request_dict,self._body,self._content_type,self._content_encoding),accept_callback=self.on_accepted,timeout_callback=self.on_timeout,callback=self.on_success,error_callback=self.on_failure,soft_timeout=soft_time_limitortask.soft_time_limit,timeout=time_limitortask.time_limit,correlation_id=task_id,)# cannot create weakref to Noneself._apply_result=maybe(ref,result)returnresult
[文档]defexecute(self,loglevel=None,logfile=None):"""Execute the task in a :func:`~celery.app.trace.trace_task`. Arguments: loglevel (int): The loglevel used by the task. logfile (str): The logfile used by the task. """ifself.revoked():return# acknowledge task as being processed.ifnotself.task.acks_late:self.acknowledge()_,_,embed=self._payloadrequest=self._request_dict# pylint: disable=unpacking-non-sequence# payload is a property, so pylint doesn't think it's a tuple.request.update({'loglevel':loglevel,'logfile':logfile,'is_eager':False,},**embedor{})retval,I,_,_=trace_task(self.task,self.id,self._args,self._kwargs,request,hostname=self._hostname,loader=self._app.loader,app=self._app)ifI:self.reject(requeue=False)else:self.acknowledge()returnretval
[文档]defmaybe_expire(self):"""If expired, mark the task as revoked."""ifself.expires:now=datetime.now(self.expires.tzinfo)ifnow>self.expires:revoked_tasks.add(self.id)returnTrue
[文档]defterminate(self,pool,signal=None):signal=_signals.signum(signalorTERM_SIGNAME)ifself.time_start:pool.terminate_job(self.worker_pid,signal)self._announce_revoked('terminated',True,signal,False)else:self._terminate_on_ack=pool,signalifself._apply_resultisnotNone:obj=self._apply_result()# is a weakrefifobjisnotNone:obj.terminate(signal)
[文档]defcancel(self,pool,signal=None):signal=_signals.signum(signalorTERM_SIGNAME)ifself.time_start:pool.terminate_job(self.worker_pid,signal)self._announce_cancelled()ifself._apply_resultisnotNone:obj=self._apply_result()# is a weakrefifobjisnotNone:obj.terminate(signal)
def_announce_cancelled(self):task_ready(self)self.send_event('task-cancelled')reason='cancelled by Celery'exc=Retry(message=reason)self.task.backend.mark_as_retry(self.id,exc,request=self._context)self.task.on_retry(exc,self.id,self.args,self.kwargs,None)self._already_cancelled=Truesend_retry(self.task,request=self._context,einfo=None)def_announce_revoked(self,reason,terminated,signum,expired):task_ready(self)self.send_event('task-revoked',terminated=terminated,signum=signum,expired=expired)self.task.backend.mark_as_revoked(self.id,reason,request=self._context,store_result=self.store_errors,)self.acknowledge()self._already_revoked=Truesend_revoked(self.task,request=self._context,terminated=terminated,signum=signum,expired=expired)
[文档]defrevoked(self):"""If revoked, skip task and mark state."""expired=Falseifself._already_revoked:returnTrueifself.expires:expired=self.maybe_expire()revoked_by_id=self.idinrevoked_tasksrevoked_by_header,revoking_header=False,Noneifnotrevoked_by_idandself.stamped_headers:forstampinself.stamped_headers:ifstampinrevoked_stamps:revoked_header=revoked_stamps[stamp]stamped_header=self._message.headers['stamps'][stamp]ifisinstance(stamped_header,(list,tuple)):forstamped_valueinstamped_header:ifstamped_valueinmaybe_list(revoked_header):revoked_by_header=Truerevoking_header={stamp:stamped_value}breakelse:revoked_by_header=any([stamped_headerinmaybe_list(revoked_header),stamped_header==revoked_header,# When the header is a single set value])revoking_header={stamp:stamped_header}breakifany((expired,revoked_by_id,revoked_by_header)):log_msg='Discarding revoked task: %s[%s]'ifrevoked_by_header:log_msg+=' (revoked by header: %s)'%revoking_headerinfo(log_msg,self.name,self.id)self._announce_revoked('expired'ifexpiredelse'revoked',False,None,expired,)returnTruereturnFalse
[文档]defon_accepted(self,pid,time_accepted):"""Handler called when task is accepted by worker pool."""self.worker_pid=pid# Convert monotonic time_accepted to absolute timeself.time_start=time()-(monotonic()-time_accepted)task_accepted(self)ifnotself.task.acks_late:self.acknowledge()self.send_event('task-started')if_does_debug:debug('Task accepted: %s[%s] pid:%r',self.name,self.id,pid)ifself._terminate_on_ackisnotNone:self.terminate(*self._terminate_on_ack)
[文档]defon_timeout(self,soft,timeout):"""Handler called if the task times out."""ifsoft:warn('Soft time limit (%ss) exceeded for %s[%s]',timeout,self.name,self.id)else:task_ready(self)error('Hard time limit (%ss) exceeded for %s[%s]',timeout,self.name,self.id)exc=TimeLimitExceeded(timeout)self.task.backend.mark_as_failure(self.id,exc,request=self._context,store_result=self.store_errors,)ifself.task.acks_lateandself.task.acks_on_failure_or_timeout:self.acknowledge()
[文档]defon_success(self,failed__retval__runtime,**kwargs):"""Handler called if the task was successfully processed."""failed,retval,runtime=failed__retval__runtimeiffailed:exc=retval.exceptionifisinstance(exc,ExceptionWithTraceback):exc=exc.excifisinstance(exc,(SystemExit,KeyboardInterrupt)):raiseexcreturnself.on_failure(retval,return_ok=True)task_ready(self,successful=True)ifself.task.acks_late:self.acknowledge()self.send_event('task-succeeded',result=retval,runtime=runtime)
[文档]defon_retry(self,exc_info):"""Handler called if the task should be retried."""ifself.task.acks_late:self.acknowledge()self.send_event('task-retried',exception=safe_repr(exc_info.exception.exc),traceback=safe_str(exc_info.traceback))
[文档]defon_failure(self,exc_info,send_failed_event=True,return_ok=False):"""Handler called if the task raised an exception."""task_ready(self)exc=exc_info.exceptionifisinstance(exc,ExceptionWithTraceback):exc=exc.excis_terminated=isinstance(exc,Terminated)ifis_terminated:# If the task was terminated and the task was not cancelled due# to a connection loss, it is revoked.# We always cancel the tasks inside the master process.# If the request was cancelled, it was not revoked and there's# nothing to be done.# According to the comment below, we need to check if the task# is already revoked and if it wasn't, we should announce that# it was.ifnotself._already_cancelledandnotself._already_revoked:# This is a special case where the process# would not have had time to write the result.self._announce_revoked('terminated',True,str(exc),False)returnelifisinstance(exc,MemoryError):raiseMemoryError(f'Process got: {exc}')elifisinstance(exc,Reject):returnself.reject(requeue=exc.requeue)elifisinstance(exc,Ignore):returnself.acknowledge()elifisinstance(exc,Retry):returnself.on_retry(exc_info)# (acks_late) acknowledge after result stored.requeue=Falseis_worker_lost=isinstance(exc,WorkerLostError)ifself.task.acks_late:reject=((self.task.reject_on_worker_lostandis_worker_lost)or(isinstance(exc,TimeLimitExceeded)andnotself.task.acks_on_failure_or_timeout))ack=self.task.acks_on_failure_or_timeoutifreject:requeue=Trueself.reject(requeue=requeue)send_failed_event=Falseelifack:self.acknowledge()else:# supporting the behaviour where a task failed and# need to be removed from prefetched local queueself.reject(requeue=False)# This is a special case where the process would not have had time# to write the result.ifnotrequeueand(is_worker_lostornotreturn_ok):# only mark as failure if task has not been requeuedself.task.backend.mark_as_failure(self.id,exc,request=self._context,store_result=self.store_errors,)signals.task_failure.send(sender=self.task,task_id=self.id,exception=exc,args=self.args,kwargs=self.kwargs,traceback=exc_info.traceback,einfo=exc_info)ifsend_failed_event:self.send_event('task-failed',exception=safe_repr(get_pickled_exception(exc_info.exception)),traceback=exc_info.traceback,)ifnotreturn_ok:error('Task handler raised error: %r',exc,exc_info=exc_info.exc_info)
def__str__(self):"""``str(self)``."""return' '.join([self.humaninfo(),f' ETA:[{self._eta}]'ifself._etaelse'',f' expires:[{self._expires}]'ifself._expireselse'',]).strip()def__repr__(self):"""``repr(self)``."""return'<{}: {}{}{}>'.format(type(self).__name__,self.humaninfo(),self._argsrepr,self._kwargsrepr,)@cached_propertydef_payload(self):returnself.__payload@cached_propertydefchord(self):# used by backend.mark_as_failure when failure is reported# by parent process# pylint: disable=unpacking-non-sequence# payload is a property, so pylint doesn't think it's a tuple._,_,embed=self._payloadreturnembed.get('chord')@cached_propertydeferrbacks(self):# used by backend.mark_as_failure when failure is reported# by parent process# pylint: disable=unpacking-non-sequence# payload is a property, so pylint doesn't think it's a tuple._,_,embed=self._payloadreturnembed.get('errbacks')@cached_propertydefgroup(self):# used by backend.on_chord_part_return when failures reported# by parent processreturnself._request_dict.get('group')@cached_propertydef_context(self):"""Context (:class:`~celery.app.task.Context`) of this task."""request=self._request_dict# pylint: disable=unpacking-non-sequence# payload is a property, so pylint doesn't think it's a tuple._,_,embed=self._payloadrequest.update(**embedor{})returnContext(request)@cached_propertydefgroup_index(self):# used by backend.on_chord_part_return to order return values in groupreturnself._request_dict.get('group_index')
defcreate_request_cls(base,task,pool,hostname,eventer,ref=ref,revoked_tasks=revoked_tasks,task_ready=task_ready,trace=None,app=current_app):default_time_limit=task.time_limitdefault_soft_time_limit=task.soft_time_limitapply_async=pool.apply_asyncacks_late=task.acks_lateevents=eventerandeventer.enablediftraceisNone:trace=fast_trace_taskifapp.use_fast_trace_taskelsetrace_task_retclassRequest(base):defexecute_using_pool(self,pool,**kwargs):task_id=self.task_idifself.revoked():raiseTaskRevokedError(task_id)time_limit,soft_time_limit=self.time_limitsresult=apply_async(trace,args=(self.type,task_id,self.request_dict,self.body,self.content_type,self.content_encoding),accept_callback=self.on_accepted,timeout_callback=self.on_timeout,callback=self.on_success,error_callback=self.on_failure,soft_timeout=soft_time_limitordefault_soft_time_limit,timeout=time_limitordefault_time_limit,correlation_id=task_id,)# cannot create weakref to None# pylint: disable=attribute-defined-outside-initself._apply_result=maybe(ref,result)returnresultdefon_success(self,failed__retval__runtime,**kwargs):failed,retval,runtime=failed__retval__runtimeiffailed:exc=retval.exceptionifisinstance(exc,ExceptionWithTraceback):exc=exc.excifisinstance(exc,(SystemExit,KeyboardInterrupt)):raiseexcreturnself.on_failure(retval,return_ok=True)task_ready(self,successful=True)ifacks_late:self.acknowledge()ifevents:self.send_event('task-succeeded',result=retval,runtime=runtime,)returnRequest