"""Trace task execution.This module defines how the task execution is traced:errors are recorded, handlers are applied and so on."""importloggingimportosimportsysimporttimefromcollectionsimportnamedtuplefromwarningsimportwarnfrombilliard.einfoimportExceptionInfo,ExceptionWithTracebackfromkombu.exceptionsimportEncodeErrorfromkombu.serializationimportloadsasloads_messagefromkombu.serializationimportprepare_accept_contentfromkombu.utils.encodingimportsafe_repr,safe_strfromceleryimportcurrent_app,group,signals,statesfromcelery._stateimport_task_stackfromcelery.app.taskimportContextfromcelery.app.taskimportTaskasBaseTaskfromcelery.exceptionsimportBackendGetMetaError,Ignore,InvalidTaskError,Reject,Retryfromcelery.resultimportAsyncResultfromcelery.utils.logimportget_loggerfromcelery.utils.nodenamesimportgethostnamefromcelery.utils.objectsimportmro_lookupfromcelery.utils.safereprimportsafereprfromcelery.utils.serializationimportget_pickleable_etype,get_pickleable_exception,get_pickled_exception# ## ---# This is the heart of the worker, the inner loop so to speak.# It used to be split up into nice little classes and methods,# but in the end it only resulted in bad performance and horrible tracebacks,# so instead we now use one closure per task class.# pylint: disable=redefined-outer-name# We cache globals and attribute lookups, so disable this warning.# pylint: disable=broad-except# We know what we're doing...__all__=('TraceInfo','build_tracer','trace_task','setup_worker_optimizations','reset_worker_optimizations',)fromcelery.worker.stateimportsuccessful_requestslogger=get_logger(__name__)#: Format string used to log task receipt.LOG_RECEIVED="""\Task %(name)s[%(id)s] received\"""#: Format string used to log task success.LOG_SUCCESS="""\Task %(name)s[%(id)s] succeeded in %(runtime)ss: %(return_value)s\"""#: Format string used to log task failure.LOG_FAILURE="""\Task %(name)s[%(id)s] %(description)s: %(exc)s\"""#: Format string used to log task internal error.LOG_INTERNAL_ERROR="""\Task %(name)s[%(id)s] %(description)s: %(exc)s\"""#: Format string used to log task ignored.LOG_IGNORED="""\Task %(name)s[%(id)s] %(description)s\"""#: Format string used to log task rejected.LOG_REJECTED="""\Task %(name)s[%(id)s] %(exc)s\"""#: Format string used to log task retry.LOG_RETRY="""\Task %(name)s[%(id)s] retry: %(exc)s\"""log_policy_t=namedtuple('log_policy_t',('format','description','severity','traceback','mail'),)log_policy_reject=log_policy_t(LOG_REJECTED,'rejected',logging.WARN,1,1)log_policy_ignore=log_policy_t(LOG_IGNORED,'ignored',logging.INFO,0,0)log_policy_internal=log_policy_t(LOG_INTERNAL_ERROR,'INTERNAL ERROR',logging.CRITICAL,1,1,)log_policy_expected=log_policy_t(LOG_FAILURE,'raised expected',logging.INFO,0,0,)log_policy_unexpected=log_policy_t(LOG_FAILURE,'raised unexpected',logging.ERROR,1,1,)send_prerun=signals.task_prerun.sendsend_postrun=signals.task_postrun.sendsend_success=signals.task_success.sendSTARTED=states.STARTEDSUCCESS=states.SUCCESSIGNORED=states.IGNOREDREJECTED=states.REJECTEDRETRY=states.RETRYFAILURE=states.FAILUREEXCEPTION_STATES=states.EXCEPTION_STATESIGNORE_STATES=frozenset({IGNORED,RETRY,REJECTED})#: set by :func:`setup_worker_optimizations`_localized=[]_patched={}trace_ok_t=namedtuple('trace_ok_t',('retval','info','runtime','retstr'))definfo(fmt,context):"""Log 'fmt % context' with severity 'INFO'. 'context' is also passed in extra with key 'data' for custom handlers. """logger.info(fmt,context,extra={'data':context})deftask_has_custom(task,attr):"""Return true if the task overrides ``attr``."""returnmro_lookup(task.__class__,attr,stop={BaseTask,object},monkey_patched=['celery.app.task'])defget_log_policy(task,einfo,exc):ifisinstance(exc,Reject):returnlog_policy_rejectelifisinstance(exc,Ignore):returnlog_policy_ignoreelifeinfo.internal:returnlog_policy_internalelse:iftask.throwsandisinstance(exc,task.throws):returnlog_policy_expectedreturnlog_policy_unexpecteddefget_task_name(request,default):"""Use 'shadow' in request for the task name if applicable."""# request.shadow could be None or an empty string.# If so, we should use default.returngetattr(request,'shadow',None)ordefault
[文档]classTraceInfo:"""Information about task execution."""__slots__=('state','retval')def__init__(self,state,retval=None):self.state=stateself.retval=retval
[文档]defhandle_retry(self,task,req,store_errors=True,**kwargs):"""Handle retry exception."""# the exception raised is the Retry semi-predicate,# and it's exc' attribute is the original exception raised (if any).type_,_,tb=sys.exc_info()try:reason=self.retvaleinfo=ExceptionInfo((type_,reason,tb))ifstore_errors:task.backend.mark_as_retry(req.id,reason.exc,einfo.traceback,request=req,)task.on_retry(reason.exc,req.id,req.args,req.kwargs,einfo)signals.task_retry.send(sender=task,request=req,reason=reason,einfo=einfo)info(LOG_RETRY,{'id':req.id,'name':get_task_name(req,task.name),'exc':str(reason),})returneinfofinally:deltb
[文档]defhandle_failure(self,task,req,store_errors=True,call_errbacks=True):"""Handle exception."""orig_exc=self.retvalexc=get_pickleable_exception(orig_exc)ifexc.__traceback__isNone:# `get_pickleable_exception` may have created a new exception without# a traceback._,_,exc.__traceback__=sys.exc_info()exc_type=get_pickleable_etype(type(orig_exc))# make sure we only send pickleable exceptions back to parent.einfo=ExceptionInfo(exc_info=(exc_type,exc,exc.__traceback__))task.backend.mark_as_failure(req.id,exc,einfo.traceback,request=req,store_result=store_errors,call_errbacks=call_errbacks,)task.on_failure(exc,req.id,req.args,req.kwargs,einfo)signals.task_failure.send(sender=task,task_id=req.id,exception=exc,args=req.args,kwargs=req.kwargs,traceback=exc.__traceback__,einfo=einfo)self._log_error(task,req,einfo)returneinfo
deftraceback_clear(exc=None):# Cleared Tb, but einfo still has a reference to Traceback.# exc cleans up the Traceback at the last moment that can be revealed.tb=NoneifexcisnotNone:ifhasattr(exc,'__traceback__'):tb=exc.__traceback__else:_,_,tb=sys.exc_info()else:_,_,tb=sys.exc_info()whiletbisnotNone:try:tb.tb_frame.clear()tb.tb_frame.f_localsexceptRuntimeError:# Ignore the exception raised if the frame is still executing.passtb=tb.tb_next
[文档]defbuild_tracer(name,task,loader=None,hostname=None,store_errors=True,Info=TraceInfo,eager=False,propagate=False,app=None,monotonic=time.monotonic,trace_ok_t=trace_ok_t,IGNORE_STATES=IGNORE_STATES):"""Return a function that traces task execution. Catches all exceptions and updates result backend with the state and result. If the call was successful, it saves the result to the task result backend, and sets the task status to `"SUCCESS"`. If the call raises :exc:`~@Retry`, it extracts the original exception, uses that as the result and sets the task state to `"RETRY"`. If the call results in an exception, it saves the exception as the task result, and sets the task state to `"FAILURE"`. Return a function that takes the following arguments: :param uuid: The id of the task. :param args: List of positional args to pass on to the function. :param kwargs: Keyword arguments mapping to pass on to the function. :keyword request: Request dict. """# pylint: disable=too-many-statements# If the task doesn't define a custom __call__ method# we optimize it away by simply calling the run method directly,# saving the extra method call and a line less in the stack trace.fun=taskiftask_has_custom(task,'__call__')elsetask.runloader=loaderorapp.loaderignore_result=task.ignore_resulttrack_started=task.track_startedtrack_started=noteagerand(task.track_startedandnotignore_result)# #6476ifeagerandnotignore_resultandtask.store_eager_result:publish_result=Trueelse:publish_result=noteagerandnotignore_resultdeduplicate_successful_tasks=((app.conf.task_acks_lateortask.acks_late)andapp.conf.worker_deduplicate_successful_tasksandapp.backend.persistent)hostname=hostnameorgethostname()inherit_parent_priority=app.conf.task_inherit_parent_priorityloader_task_init=loader.on_task_initloader_cleanup=loader.on_process_cleanuptask_before_start=Nonetask_on_success=Nonetask_after_return=Noneiftask_has_custom(task,'before_start'):task_before_start=task.before_startiftask_has_custom(task,'on_success'):task_on_success=task.on_successiftask_has_custom(task,'after_return'):task_after_return=task.after_returnpid=os.getpid()request_stack=task.request_stackpush_request=request_stack.pushpop_request=request_stack.poppush_task=_task_stack.pushpop_task=_task_stack.pop_does_info=logger.isEnabledFor(logging.INFO)resultrepr_maxsize=task.resultrepr_maxsizeprerun_receivers=signals.task_prerun.receiverspostrun_receivers=signals.task_postrun.receiverssuccess_receivers=signals.task_success.receiversfromceleryimportcanvassignature=canvas.maybe_signature# maybe_ does not clone if alreadydefon_error(request,exc,state=FAILURE,call_errbacks=True):ifpropagate:raiseI=Info(state,exc)R=I.handle_error_state(task,request,eager=eager,call_errbacks=call_errbacks,)returnI,R,I.state,I.retvaldeftrace_task(uuid,args,kwargs,request=None):# R - is the possibly prepared return value.# I - is the Info object.# T - runtime# Rstr - textual representation of return value# retval - is the always unmodified return value.# state - is the resulting task state.# This function is very long because we've unrolled all the calls# for performance reasons, and because the function is so long# we want the main variables (I, and R) to stand out visually from the# the rest of the variables, so breaking PEP8 is worth it ;)R=I=T=Rstr=retval=state=Nonetask_request=Nonetime_start=monotonic()try:try:kwargs.itemsexceptAttributeError:raiseInvalidTaskError('Task keyword arguments is not a mapping')task_request=Context(requestor{},args=args,called_directly=False,kwargs=kwargs)redelivered=(task_request.delivery_infoandtask_request.delivery_info.get('redelivered',False))ifdeduplicate_successful_tasksandredelivered:iftask_request.idinsuccessful_requests:returntrace_ok_t(R,I,T,Rstr)r=AsyncResult(task_request.id,app=app)try:state=r.stateexceptBackendGetMetaError:passelse:ifstate==SUCCESS:info(LOG_IGNORED,{'id':task_request.id,'name':get_task_name(task_request,name),'description':'Task already completed successfully.'})returntrace_ok_t(R,I,T,Rstr)push_task(task)root_id=task_request.root_idoruuidtask_priority=task_request.delivery_info.get('priority')if \
inherit_parent_priorityelseNonepush_request(task_request)try:# -*- PRE -*-ifprerun_receivers:send_prerun(sender=task,task_id=uuid,task=task,args=args,kwargs=kwargs)loader_task_init(uuid,task)iftrack_started:task.backend.store_result(uuid,{'pid':pid,'hostname':hostname},STARTED,request=task_request,)# -*- TRACE -*-try:iftask_before_start:task_before_start(uuid,args,kwargs)R=retval=fun(*args,**kwargs)state=SUCCESSexceptRejectasexc:I,R=Info(REJECTED,exc),ExceptionInfo(internal=True)state,retval=I.state,I.retvalI.handle_reject(task,task_request)traceback_clear(exc)exceptIgnoreasexc:I,R=Info(IGNORED,exc),ExceptionInfo(internal=True)state,retval=I.state,I.retvalI.handle_ignore(task,task_request)traceback_clear(exc)exceptRetryasexc:I,R,state,retval=on_error(task_request,exc,RETRY,call_errbacks=False)traceback_clear(exc)exceptExceptionasexc:I,R,state,retval=on_error(task_request,exc)traceback_clear(exc)exceptBaseException:raiseelse:try:# callback tasks must be applied before the result is# stored, so that result.children is populated.# groups are called inline and will store trail# separately, so need to call them separately# so that the trail's not added multiple times :(# (Issue #1936)callbacks=task.request.callbacksifcallbacks:iflen(task.request.callbacks)>1:sigs,groups=[],[]forsigincallbacks:sig=signature(sig,app=app)ifisinstance(sig,group):groups.append(sig)else:sigs.append(sig)forgroup_ingroups:group_.apply_async((retval,),parent_id=uuid,root_id=root_id,priority=task_priority)ifsigs:group(sigs,app=app).apply_async((retval,),parent_id=uuid,root_id=root_id,priority=task_priority)else:signature(callbacks[0],app=app).apply_async((retval,),parent_id=uuid,root_id=root_id,priority=task_priority)# execute first task in chainchain=task_request.chainifchain:_chsig=signature(chain.pop(),app=app)_chsig.apply_async((retval,),chain=chain,parent_id=uuid,root_id=root_id,priority=task_priority)task.backend.mark_as_done(uuid,retval,task_request,publish_result,)exceptEncodeErrorasexc:I,R,state,retval=on_error(task_request,exc)else:Rstr=saferepr(R,resultrepr_maxsize)T=monotonic()-time_startiftask_on_success:task_on_success(retval,uuid,args,kwargs)ifsuccess_receivers:send_success(sender=task,result=retval)if_does_info:info(LOG_SUCCESS,{'id':uuid,'name':get_task_name(task_request,name),'return_value':Rstr,'runtime':T,'args':task_request.get('argsrepr')orsafe_repr(args),'kwargs':task_request.get('kwargsrepr')orsafe_repr(kwargs),})# -* POST *-ifstatenotinIGNORE_STATES:iftask_after_return:task_after_return(state,retval,uuid,args,kwargs,None,)finally:try:ifpostrun_receivers:send_postrun(sender=task,task_id=uuid,task=task,args=args,kwargs=kwargs,retval=retval,state=state)finally:pop_task()pop_request()ifnoteager:try:task.backend.process_cleanup()loader_cleanup()except(KeyboardInterrupt,SystemExit,MemoryError):raiseexceptExceptionasexc:logger.error('Process cleanup failed: %r',exc,exc_info=True)exceptMemoryError:raiseexceptExceptionasexc:_signal_internal_error(task,uuid,args,kwargs,request,exc)ifeager:raiseR=report_internal_error(task,exc)iftask_requestisnotNone:I,_,_,_=on_error(task_request,exc)returntrace_ok_t(R,I,T,Rstr)returntrace_task
def_signal_internal_error(task,uuid,args,kwargs,request,exc):"""Send a special `internal_error` signal to the app for outside body errors."""try:_,_,tb=sys.exc_info()einfo=ExceptionInfo()einfo.exception=get_pickleable_exception(einfo.exception)einfo.type=get_pickleable_etype(einfo.type)signals.task_internal_error.send(sender=task,task_id=uuid,args=args,kwargs=kwargs,request=request,exception=exc,traceback=tb,einfo=einfo,)finally:deltbdeftrace_task_ret(name,uuid,request,body,content_type,content_encoding,loads=loads_message,app=None,**extra_request):app=apporcurrent_app._get_current_object()embed=Noneifcontent_type:accept=prepare_accept_content(app.conf.accept_content)args,kwargs,embed=loads(body,content_type,content_encoding,accept=accept,)else:args,kwargs,embed=bodyhostname=gethostname()request.update({'args':args,'kwargs':kwargs,'hostname':hostname,'is_eager':False,},**embedor{})R,I,T,Rstr=trace_task(app.tasks[name],uuid,args,kwargs,request,app=app)return(1,R,T)ifIelse(0,Rstr,T)deffast_trace_task(task,uuid,request,body,content_type,content_encoding,loads=loads_message,_loc=None,hostname=None,**_):_loc=_localizedifnot_locelse_locembed=Nonetasks,accept,hostname=_locifcontent_type:args,kwargs,embed=loads(body,content_type,content_encoding,accept=accept,)else:args,kwargs,embed=bodyrequest.update({'args':args,'kwargs':kwargs,'hostname':hostname,'is_eager':False,},**embedor{})R,I,T,Rstr=tasks[task].__trace__(uuid,args,kwargs,request,)return(1,R,T)ifIelse(0,Rstr,T)defreport_internal_error(task,exc):_type,_value,_tb=sys.exc_info()try:_value=task.backend.prepare_exception(exc,'pickle')exc_info=ExceptionInfo((_type,_value,_tb),internal=True)warn(RuntimeWarning('Exception raised outside body: {!r}:\n{}'.format(exc,exc_info.traceback)))returnexc_infofinally:del_tb
[文档]defsetup_worker_optimizations(app,hostname=None):"""Setup worker related optimizations."""hostname=hostnameorgethostname()# make sure custom Task.__call__ methods that calls super# won't mess up the request/task stack._install_stack_protection()# all new threads start without a current app, so if an app is not# passed on to the thread it will fall back to the "default app",# which then could be the wrong app. So for the worker# we set this to always return our app. This is a hack,# and means that only a single app can be used for workers# running in the same process.app.set_current()app.set_default()# evaluate all task classes by finalizing the app.app.finalize()# set fast shortcut to task registry_localized[:]=[app._tasks,prepare_accept_content(app.conf.accept_content),hostname,]app.use_fast_trace_task=True
def_install_stack_protection():# Patches BaseTask.__call__ in the worker to handle the edge case# where people override it and also call super.## - The worker optimizes away BaseTask.__call__ and instead# calls task.run directly.# - so with the addition of current_task and the request stack# BaseTask.__call__ now pushes to those stacks so that# they work when tasks are called directly.## The worker only optimizes away __call__ in the case# where it hasn't been overridden, so the request/task stack# will blow if a custom task class defines __call__ and also# calls super().ifnotgetattr(BaseTask,'_stackprotected',False):_patched['BaseTask.__call__']=orig=BaseTask.__call__def__protected_call__(self,*args,**kwargs):stack=self.request_stackreq=stack.topifreqandnotreq._protectedand \
len(stack)==1andnotreq.called_directly:req._protected=1returnself.run(*args,**kwargs)returnorig(self,*args,**kwargs)BaseTask.__call__=__protected_call__BaseTask._stackprotected=True