"""Loader base class."""importimportlibimportosimportreimportsysfromdatetimeimportdatetime,timezonefromkombu.utilsimportjsonfromkombu.utils.objectsimportcached_propertyfromceleryimportsignalsfromcelery.exceptionsimportreraisefromcelery.utils.collectionsimportDictAttribute,force_mappingfromcelery.utils.functionalimportmaybe_listfromcelery.utils.importsimportNotAPackage,find_module,import_from_cwd,symbol_by_name__all__=('BaseLoader',)_RACE_PROTECTION=FalseCONFIG_INVALID_NAME="""\Error: Module '{module}' doesn't exist, or it's not a valid \Python module name."""CONFIG_WITH_SUFFIX=CONFIG_INVALID_NAME+"""\Did you mean '{suggest}'?"""unconfigured=object()
[文档]classBaseLoader:"""Base class for loaders. Loaders handles, * Reading celery client/worker configurations. * What happens when a task starts? See :meth:`on_task_init`. * What happens when the worker starts? See :meth:`on_worker_init`. * What happens when the worker shuts down? See :meth:`on_worker_shutdown`. * What modules are imported to find tasks? """builtin_modules=frozenset()configured=Falseoverride_backends={}worker_initialized=False_conf=unconfigureddef__init__(self,app,**kwargs):self.app=appself.task_modules=set()
[文档]defimport_default_modules(self):responses=signals.import_modules.send(sender=self.app)# Prior to this point loggers are not yet set up properly, need to# check responses manually and reraised exceptions if any, otherwise# they'll be silenced, making it incredibly difficult to debug.for_,responseinresponses:ifisinstance(response,Exception):raiseresponsereturn[self.import_task_module(m)forminself.default_modules]
def_smart_import(self,path,imp=None):imp=self.import_moduleifimpisNoneelseimpif':'inpath:# Path includes attribute so can just jump# here (e.g., ``os.path:abspath``).returnsymbol_by_name(path,imp=imp)# Not sure if path is just a module name or if it includes an# attribute name (e.g., ``os.path``, vs, ``os.path.abspath``).try:returnimp(path)exceptImportError:# Not a module name, so try module + attribute.returnsymbol_by_name(path,imp=imp)def_import_config_module(self,name):try:self.find_module(name)exceptNotAPackageasexc:ifname.endswith('.py'):reraise(NotAPackage,NotAPackage(CONFIG_WITH_SUFFIX.format(module=name,suggest=name[:-3])),sys.exc_info()[2])raiseNotAPackage(CONFIG_INVALID_NAME.format(module=name))fromexcelse:returnself.import_from_cwd(name)
[文档]defcmdline_config_parser(self,args,namespace='celery',re_type=re.compile(r'\((\w+)\)'),extra_types=None,override_types=None):extra_types=extra_typesifextra_typeselse{'json':json.loads}override_types=override_typesifoverride_typeselse{'tuple':'json','list':'json','dict':'json'}fromcelery.app.defaultsimportNAMESPACES,Optionnamespace=namespaceandnamespace.lower()typemap=dict(Option.typemap,**extra_types)defgetarg(arg):"""Parse single configuration from command-line."""# ## find key/value# ns.key=value|ns_key=value (case insensitive)key,value=arg.split('=',1)key=key.lower().replace('.','_')# ## find name-space.# .key=value|_key=value expands to default name-space.ifkey[0]=='_':ns,key=namespace,key[1:]else:# find name-space part of keyns,key=key.split('_',1)ns_key=(nsandns+'_'or'')+key# (type)value makes cast to custom type.cast=re_type.match(value)ifcast:type_=cast.groups()[0]type_=override_types.get(type_,type_)value=value[len(cast.group()):]value=typemap[type_](value)else:try:value=NAMESPACES[ns.lower()][key].to_python(value)exceptValueErrorasexc:# display key name in error message.raiseValueError(f'{ns_key!r}: {exc}')returnns_key,valuereturndict(getarg(arg)forarginargs)
defautodiscover_tasks(packages,related_name='tasks'):global_RACE_PROTECTIONif_RACE_PROTECTION:return()_RACE_PROTECTION=Truetry:return[find_related_module(pkg,related_name)forpkginpackages]finally:_RACE_PROTECTION=Falsedeffind_related_module(package,related_name):"""Find module in package."""# Django 1.7 allows for specifying a class name in INSTALLED_APPS.# (Issue #2248).try:# Return package itself when no related_name.module=importlib.import_module(package)ifnotrelated_nameandmodule:returnmoduleexceptModuleNotFoundError:# On import error, try to walk package up one level.package,_,_=package.rpartition('.')ifnotpackage:raisemodule_name=f'{package}.{related_name}'try:# Try to find related_name under package.returnimportlib.import_module(module_name)exceptModuleNotFoundErrorase:import_exc_name=getattr(e,'name',None)# If candidate does not exist, then return None.ifimport_exc_nameandmodule_name==import_exc_name:return# Otherwise, raise because error probably originated from a nested import.raisee