"""Task Routing.Contains utilities for working with task routers, (:setting:`task_routes`)."""importfnmatchimportrefromcollectionsimportOrderedDictfromcollections.abcimportMappingfromkombuimportQueuefromcelery.exceptionsimportQueueNotFoundfromcelery.utils.collectionsimportlpmergefromcelery.utils.functionalimportmaybe_evaluate,mlazyfromcelery.utils.importsimportsymbol_by_nametry:Pattern=re._pattern_typeexceptAttributeError:# pragma: no cover# for support Python 3.7Pattern=re.Pattern__all__=('MapRoute','Router','expand_router_string','prepare')
[文档]classMapRoute:"""Creates a router out of a :class:`dict`."""def__init__(self,map):map=map.items()ifisinstance(map,Mapping)elsemapself.map={}self.patterns=OrderedDict()fork,vinmap:ifisinstance(k,Pattern):self.patterns[k]=velif'*'ink:self.patterns[re.compile(fnmatch.translate(k))]=velse:self.map[k]=vdef__call__(self,name,*args,**kwargs):try:returndict(self.map[name])exceptKeyError:passexceptValueError:return{'queue':self.map[name]}forregex,routeinself.patterns.items():ifregex.match(name):try:returndict(route)exceptValueError:return{'queue':route}
[文档]classRouter:"""Route tasks based on the :setting:`task_routes` setting."""def__init__(self,routes=None,queues=None,create_missing=False,app=None):self.app=appself.queues={}ifqueuesisNoneelsequeuesself.routes=[]ifroutesisNoneelseroutesself.create_missing=create_missing
[文档]defroute(self,options,name,args=(),kwargs=None,task_type=None):kwargs={}ifnotkwargselsekwargsoptions=self.expand_destination(options)# expands 'queue'ifself.routes:route=self.lookup_route(name,args,kwargs,options,task_type)ifroute:# expands 'queue' in route.returnlpmerge(self.expand_destination(route),options)if'queue'notinoptions:options=lpmerge(self.expand_destination(self.app.conf.task_default_queue),options)returnoptions
[文档]defexpand_destination(self,route):# Route can be a queue name: convenient for direct exchanges.ifisinstance(route,str):queue,route=route,{}else:# can use defaults from configured queue, but override specific# things (like the routing_key): great for topic exchanges.queue=route.pop('queue',None)ifqueue:ifisinstance(queue,Queue):route['queue']=queueelse:try:route['queue']=self.queues[queue]exceptKeyError:raiseQueueNotFound(f'Queue {queue!r} missing from task_queues')returnroute
[文档]defquery_router(self,router,task,args,kwargs,options,task_type):router=maybe_evaluate(router)ifhasattr(router,'route_for_task'):# pre 4.0 router classreturnrouter.route_for_task(task,args,kwargs)returnrouter(task,args,kwargs,options,task=task_type)
[文档]defexpand_router_string(router):router=symbol_by_name(router)ifhasattr(router,'route_for_task'):# need to instantiate pre 4.0 router classesrouter=router()returnrouter
[文档]defprepare(routes):"""Expand the :setting:`task_routes` setting."""defexpand_route(route):ifisinstance(route,(Mapping,list,tuple)):returnMapRoute(route)ifisinstance(route,str):returnmlazy(expand_router_string,route)returnrouteifroutesisNone:return()ifnotisinstance(routes,(list,tuple)):routes=(routes,)return[expand_route(route)forrouteinroutes]