"""App utilities: Compat settings, bug-report tool, pickling apps."""importosimportplatformas_platformimportrefromcollectionsimportnamedtuplefromcollections.abcimportMappingfromcopyimportdeepcopyfromtypesimportModuleTypefromkombu.utils.urlimportmaybe_sanitize_urlfromcelery.exceptionsimportImproperlyConfiguredfromcelery.platformsimportpyimplementationfromcelery.utils.collectionsimportConfigurationViewfromcelery.utils.importsimportimport_from_cwd,qualname,symbol_by_namefromcelery.utils.textimportprettyfrom.defaultsimport_OLD_DEFAULTS,_OLD_SETTING_KEYS,_TO_NEW_KEY,_TO_OLD_KEY,DEFAULTS,SETTING_KEYS,find__all__=('Settings','appstr','bugreport','filter_hidden_settings','find_app',)#: Format used to generate bug-report information.BUGREPORT_INFO="""software -> celery:{celery_v} kombu:{kombu_v} py:{py_v} billiard:{billiard_v}{driver_v}platform -> system:{system} arch:{arch} kernel version:{kernel_version} imp:{py_i}loader -> {loader}settings -> transport:{transport} results:{results}{human_settings}"""HIDDEN_SETTINGS=re.compile('API|TOKEN|KEY|SECRET|PASS|PROFANITIES_LIST|SIGNATURE|DATABASE',re.IGNORECASE,)E_MIX_OLD_INTO_NEW="""Cannot mix new and old setting keys, please rename thefollowing settings to the new format:{renames}"""E_MIX_NEW_INTO_OLD="""Cannot mix new setting names with old setting names, pleaserename the following settings to use the old format:{renames}Or change all of the settings to use the new format :)"""FMT_REPLACE_SETTING='{replace:<36} -> {with_}'
[文档]defappstr(app):"""String used in __repr__ etc, to id app instances."""returnf'{app.mainor"__main__"} at {id(app):#x}'
[文档]classSettings(ConfigurationView):"""Celery settings object. .. seealso: :ref:`configuration` for a full list of configuration keys. """def__init__(self,*args,deprecated_settings=None,**kwargs):super().__init__(*args,**kwargs)self.deprecated_settings=deprecated_settings@propertydefbroker_read_url(self):return(os.environ.get('CELERY_BROKER_READ_URL')orself.get('broker_read_url')orself.broker_url)@propertydefbroker_write_url(self):return(os.environ.get('CELERY_BROKER_WRITE_URL')orself.get('broker_write_url')orself.broker_url)@propertydefbroker_url(self):return(os.environ.get('CELERY_BROKER_URL')orself.first('broker_url','broker_host'))@propertydefresult_backend(self):return(os.environ.get('CELERY_RESULT_BACKEND')orself.first('result_backend','CELERY_RESULT_BACKEND'))@propertydeftask_default_exchange(self):returnself.first('task_default_exchange','task_default_queue',)@propertydeftask_default_routing_key(self):returnself.first('task_default_routing_key','task_default_queue',)@propertydeftimezone(self):# this way we also support django's time zone.returnself.first('timezone','TIME_ZONE')
[文档]defwithout_defaults(self):"""Return the current configuration, but without defaults."""# the last stash is the default settings, so just skip thatreturnSettings({},self.maps[:-1])
[文档]deffind_option(self,name,namespace=''):"""Search for option by name. Example: >>> from proj.celery import app >>> app.conf.find_option('disable_rate_limits') ('worker', 'prefetch_multiplier', <Option: type->bool default->False>)) Arguments: name (str): Name of option, cannot be partial. namespace (str): Preferred name-space (``None`` by default). Returns: Tuple: of ``(namespace, key, type)``. """returnfind(name,namespace)
[文档]deffind_value_for_key(self,name,namespace='celery'):"""Shortcut to ``get_by_parts(*find_option(name)[:-1])``."""returnself.get_by_parts(*self.find_option(name,namespace)[:-1])
[文档]defget_by_parts(self,*parts):"""Return the current value for setting specified as a path. Example: >>> from proj.celery import app >>> app.conf.get_by_parts('worker', 'disable_rate_limits') False """returnself['_'.join(partforpartinpartsifpart)]
[文档]deffinalize(self):# See PendingConfiguration in celery/app/base.py# first access will read actual configuration.try:self['__bogus__']exceptKeyError:passreturnself
[文档]defhumanize(self,with_defaults=False,censored=True):"""Return a human readable text showing configuration changes."""return'\n'.join(f'{key}: {pretty(value,width=50)}'forkey,valueinself.table(with_defaults,censored).items())
[文档]defmaybe_warn_deprecated_settings(self):# TODO: Remove this method in Celery 6.0ifself.deprecated_settings:fromcelery.app.defaultsimport_TO_NEW_KEYfromcelery.utilsimportdeprecatedforsettinginself.deprecated_settings:deprecated.warn(description=f'The {setting!r} setting',removal='6.0.0',alternative=f'Use the {_TO_NEW_KEY[setting]} instead')returnTruereturnFalse
def_new_key_to_old(key,convert=_TO_OLD_KEY.get):returnconvert(key,key)def_old_key_to_new(key,convert=_TO_NEW_KEY.get):returnconvert(key,key)_settings_info_t=namedtuple('settings_info_t',('defaults','convert','key_t','mix_error',))_settings_info=_settings_info_t(DEFAULTS,_TO_NEW_KEY,_old_key_to_new,E_MIX_OLD_INTO_NEW,)_old_settings_info=_settings_info_t(_OLD_DEFAULTS,_TO_OLD_KEY,_new_key_to_old,E_MIX_NEW_INTO_OLD,)defdetect_settings(conf,preconf=None,ignore_keys=None,prefix=None,all_keys=None,old_keys=None):preconf={}ifnotpreconfelsepreconfignore_keys=set()ifnotignore_keyselseignore_keysall_keys=SETTING_KEYSifnotall_keyselseall_keysold_keys=_OLD_SETTING_KEYSifnotold_keyselseold_keyssource=confifconfisNone:source,conf=preconf,{}have=set(source.keys())-ignore_keysis_in_new=have.intersection(all_keys)is_in_old=have.intersection(old_keys)info=Noneifis_in_new:# have new setting namesinfo,left=_settings_info,is_in_oldifis_in_oldandlen(is_in_old)>len(is_in_new):# Majority of the settings are old.info,left=_old_settings_info,is_in_newifis_in_old:# have old setting names, or a majority of the names are old.ifnotinfo:info,left=_old_settings_info,is_in_newifis_in_newandlen(is_in_new)>len(is_in_old):# Majority of the settings are newinfo,left=_settings_info,is_in_oldelse:# no settings, just use new format.info,left=_settings_info,is_in_oldifprefix:# always use new format if prefix is used.info,left=_settings_info,set()# only raise error for keys that the user didn't provide two keys# for (e.g., both ``result_expires`` and ``CELERY_TASK_RESULT_EXPIRES``).really_left={keyforkeyinleftifinfo.convert[key]notinhave}ifreally_left:# user is mixing old/new, or new/old settings, give renaming# suggestions.raiseImproperlyConfigured(info.mix_error.format(renames='\n'.join(FMT_REPLACE_SETTING.format(replace=key,with_=info.convert[key])forkeyinsorted(really_left))))preconf={info.convert.get(k,k):vfork,vinpreconf.items()}defaults=dict(deepcopy(info.defaults),**preconf)returnSettings(preconf,[conf,defaults],(_old_key_to_new,_new_key_to_old),deprecated_settings=is_in_old,prefix=prefix,)classAppPickler:"""Old application pickler/unpickler (< 3.1)."""def__call__(self,cls,*args):kwargs=self.build_kwargs(*args)app=self.construct(cls,**kwargs)self.prepare(app,**kwargs)returnappdefprepare(self,app,**kwargs):app.conf.update(kwargs['changes'])defbuild_kwargs(self,*args):returnself.build_standard_kwargs(*args)defbuild_standard_kwargs(self,main,changes,loader,backend,amqp,events,log,control,accept_magic_kwargs,config_source=None):return{'main':main,'loader':loader,'backend':backend,'amqp':amqp,'changes':changes,'events':events,'log':log,'control':control,'set_as_current':False,'config_source':config_source}defconstruct(self,cls,**kwargs):returncls(**kwargs)def_unpickle_app(cls,pickler,*args):"""Rebuild app for versions 2.5+."""returnpickler()(cls,*args)def_unpickle_app_v2(cls,kwargs):"""Rebuild app for versions 3.1+."""kwargs['set_as_current']=Falsereturncls(**kwargs)
[文档]defbugreport(app):"""Return a string containing information useful in bug-reports."""importbilliardimportkombuimportcelerytry:conn=app.connection()driver_v='{}:{}'.format(conn.transport.driver_name,conn.transport.driver_version())transport=conn.transport_clsexceptException:# pylint: disable=broad-excepttransport=driver_v=''returnBUGREPORT_INFO.format(system=_platform.system(),arch=', '.join(xforxin_platform.architecture()ifx),kernel_version=_platform.release(),py_i=pyimplementation(),celery_v=celery.VERSION_BANNER,kombu_v=kombu.__version__,billiard_v=billiard.__version__,py_v=_platform.python_version(),driver_v=driver_v,transport=transport,results=maybe_sanitize_url(app.conf.result_backendor'disabled'),human_settings=app.conf.humanize(),loader=qualname(app.loader.__class__),)
[文档]deffind_app(app,symbol_by_name=symbol_by_name,imp=import_from_cwd):"""Find app by name."""from.baseimportCelerytry:sym=symbol_by_name(app,imp=imp)exceptAttributeError:# last part was not an attribute, but a modulesym=imp(app)ifisinstance(sym,ModuleType)and':'notinapp:try:found=sym.appifisinstance(found,ModuleType):raiseAttributeError()exceptAttributeError:try:found=sym.celeryifisinstance(found,ModuleType):raiseAttributeError("attribute 'celery' is the celery module not the instance of celery")exceptAttributeError:ifgetattr(sym,'__path__',None):try:returnfind_app(f'{app}.celery',symbol_by_name=symbol_by_name,imp=imp,)exceptImportError:passforsuspectinvars(sym).values():ifisinstance(suspect,Celery):returnsuspectraiseelse:returnfoundelse:returnfoundreturnsym