"""Task execution strategy (optimization)."""importloggingfromkombu.asynchronous.timerimportto_timestampfromceleryimportsignalsfromcelery.appimporttraceas_app_tracefromcelery.exceptionsimportInvalidTaskErrorfromcelery.utils.importsimportsymbol_by_namefromcelery.utils.logimportget_loggerfromcelery.utils.safereprimportsafereprfromcelery.utils.timeimporttimezonefrom.requestimportcreate_request_clsfrom.stateimporttask_reserved__all__=('default',)logger=get_logger(__name__)# pylint: disable=redefined-outer-name# We cache globals and attribute lookups, so disable this warning.defhybrid_to_proto2(message,body):"""Create a fresh protocol 2 message from a hybrid protocol 1/2 message."""try:args,kwargs=body.get('args',()),body.get('kwargs',{})kwargs.items# pylint: disable=pointless-statementexceptKeyError:raiseInvalidTaskError('Message does not have args/kwargs')exceptAttributeError:raiseInvalidTaskError('Task keyword arguments must be a mapping',)headers={'lang':body.get('lang'),'task':body.get('task'),'id':body.get('id'),'root_id':body.get('root_id'),'parent_id':body.get('parent_id'),'group':body.get('group'),'meth':body.get('meth'),'shadow':body.get('shadow'),'eta':body.get('eta'),'expires':body.get('expires'),'retries':body.get('retries',0),'timelimit':body.get('timelimit',(None,None)),'argsrepr':body.get('argsrepr'),'kwargsrepr':body.get('kwargsrepr'),'origin':body.get('origin'),}headers.update(message.headersor{})embed={'callbacks':body.get('callbacks'),'errbacks':body.get('errbacks'),'chord':body.get('chord'),'chain':None,}return(args,kwargs,embed),headers,True,body.get('utc',True)defproto1_to_proto2(message,body):"""Convert Task message protocol 1 arguments to protocol 2. Returns: Tuple: of ``(body, headers, already_decoded_status, utc)`` """try:args,kwargs=body.get('args',()),body.get('kwargs',{})kwargs.items# pylint: disable=pointless-statementexceptKeyError:raiseInvalidTaskError('Message does not have args/kwargs')exceptAttributeError:raiseInvalidTaskError('Task keyword arguments must be a mapping',)body.update(argsrepr=saferepr(args),kwargsrepr=saferepr(kwargs),headers=message.headers,)try:body['group']=body['taskset']exceptKeyError:passembed={'callbacks':body.get('callbacks'),'errbacks':body.get('errbacks'),'chord':body.get('chord'),'chain':None,}return(args,kwargs,embed),body,True,body.get('utc',True)
[文档]defdefault(task,app,consumer,info=logger.info,error=logger.error,task_reserved=task_reserved,to_system_tz=timezone.to_system,bytes=bytes,proto1_to_proto2=proto1_to_proto2):"""Default task execution strategy. Note: Strategies are here as an optimization, so sadly it's not very easy to override. """hostname=consumer.hostnameconnection_errors=consumer.connection_errors_does_info=logger.isEnabledFor(logging.INFO)# task event related# (optimized to avoid calling request.send_event)eventer=consumer.event_dispatcherevents=eventerandeventer.enabledsend_event=eventerandeventer.sendtask_sends_events=eventsandtask.send_eventscall_at=consumer.timer.call_atapply_eta_task=consumer.apply_eta_taskrate_limits_enabled=notconsumer.disable_rate_limitsget_bucket=consumer.task_buckets.__getitem__handle=consumer.on_task_requestlimit_task=consumer._limit_tasklimit_post_eta=consumer._limit_post_etaRequest=symbol_by_name(task.Request)Req=create_request_cls(Request,task,consumer.pool,hostname,eventer,app=app)revoked_tasks=consumer.controller.state.revokeddeftask_message_handler(message,body,ack,reject,callbacks,to_timestamp=to_timestamp):ifbodyisNoneand'args'notinmessage.payload:body,headers,decoded,utc=(message.body,message.headers,False,app.uses_utc_timezone(),)else:if'args'inmessage.payload:body,headers,decoded,utc=hybrid_to_proto2(message,message.payload)else:body,headers,decoded,utc=proto1_to_proto2(message,body)req=Req(message,on_ack=ack,on_reject=reject,app=app,hostname=hostname,eventer=eventer,task=task,connection_errors=connection_errors,body=body,headers=headers,decoded=decoded,utc=utc,)if_does_info:# Similar to `app.trace.info()`, we pass the formatting args as the# `extra` kwarg for custom log handlerscontext={'id':req.id,'name':req.name,'args':req.argsrepr,'kwargs':req.kwargsrepr,'eta':req.eta,}info(_app_trace.LOG_RECEIVED,context,extra={'data':context})if(req.expiresorreq.idinrevoked_tasks)andreq.revoked():returnsignals.task_received.send(sender=consumer,request=req)iftask_sends_events:send_event('task-received',uuid=req.id,name=req.name,args=req.argsrepr,kwargs=req.kwargsrepr,root_id=req.root_id,parent_id=req.parent_id,retries=req.request_dict.get('retries',0),eta=req.etaandreq.eta.isoformat(),expires=req.expiresandreq.expires.isoformat(),)bucket=Noneeta=Noneifreq.eta:try:ifreq.utc:eta=to_timestamp(to_system_tz(req.eta))else:eta=to_timestamp(req.eta,app.timezone)except(OverflowError,ValueError)asexc:error("Couldn't convert ETA %r to timestamp: %r. Task: %r",req.eta,exc,req.info(safe=True),exc_info=True)req.reject(requeue=False)ifrate_limits_enabled:bucket=get_bucket(task.name)ifetaandbucket:consumer.qos.increment_eventually()returncall_at(eta,limit_post_eta,(req,bucket,1),priority=6)ifeta:consumer.qos.increment_eventually()call_at(eta,apply_eta_task,(req,),priority=6)returntask_message_handlerifbucket:returnlimit_task(req,bucket,1)task_reserved(req)ifcallbacks:[callback(req)forcallbackincallbacks]handle(req)returntask_message_handler