"""Click customizations for Celery."""importjsonimportnumbersfromcollectionsimportOrderedDictfromfunctoolsimportupdate_wrapperfrompprintimportpformatfromtypingimportAnyimportclickfromclickimportContext,ParamTypefromkombu.utils.objectsimportcached_propertyfromcelery._stateimportget_current_appfromcelery.signalsimportuser_preload_optionsfromcelery.utilsimporttextfromcelery.utils.logimportmlevelfromcelery.utils.timeimportmaybe_iso8601try:frompygmentsimporthighlightfrompygments.formattersimportTerminal256Formatterfrompygments.lexersimportPythonLexerexceptImportError:defhighlight(s,*args,**kwargs):"""Place holder function in case pygments is missing."""returnsLEXER=NoneFORMATTER=Noneelse:LEXER=PythonLexer()FORMATTER=Terminal256Formatter()
[文档]classCLIContext:"""Context Object for the CLI."""def__init__(self,app,no_color,workdir,quiet=False):"""Initialize the CLI context."""self.app=apporget_current_app()self.no_color=no_colorself.quiet=quietself.workdir=workdir@cached_propertydefOK(self):returnself.style("OK",fg="green",bold=True)@cached_propertydefERROR(self):returnself.style("ERROR",fg="red",bold=True)
[文档]defhandle_preload_options(f):"""Extract preload options and return a wrapped callable."""defcaller(ctx,*args,**kwargs):app=ctx.obj.apppreload_options=[o.nameforoinapp.user_options.get('preload',[])]ifpreload_options:user_options={preload_option:kwargs[preload_option]forpreload_optioninpreload_options}user_preload_options.send(sender=f,app=app,options=user_options)returnf(ctx,*args,**kwargs)returnupdate_wrapper(caller,f)
[文档]classCeleryOption(click.Option):"""Customized option for Celery."""
def__init__(self,*args,**kwargs):"""Initialize a Celery option."""self.help_group=kwargs.pop('help_group',None)self.default_value_from_context=kwargs.pop('default_value_from_context',None)super().__init__(*args,**kwargs)
[文档]classCeleryCommand(click.Command):"""Customized command for Celery."""
[文档]defformat_options(self,ctx,formatter):"""Write all the options into the formatter if they exist."""opts=OrderedDict()forparaminself.get_params(ctx):rv=param.get_help_record(ctx)ifrvisnotNone:ifhasattr(param,'help_group')andparam.help_group:opts.setdefault(str(param.help_group),[]).append(rv)else:opts.setdefault('Options',[]).append(rv)forname,opts_groupinopts.items():withformatter.section(name):formatter.write_dl(opts_group)
[文档]defdaemon_setting(self,ctx:Context,opt:CeleryOption,value:Any)->Any:""" Try to fetch daemonization option from applications settings. Use the daemon command name as prefix (eg. `worker` -> `worker_pidfile`) """returnvalueorgetattr(ctx.obj.app.conf,f"{ctx.command.name}_{self.name}",None)
[文档]classCeleryDaemonCommand(CeleryCommand):"""Daemon commands."""def__init__(self,*args,**kwargs):"""Initialize a Celery command with common daemon options."""super().__init__(*args,**kwargs)self.params.extend((DaemonOption("--logfile","-f",help="Log destination; defaults to stderr"),DaemonOption("--pidfile",help="PID file path; defaults to no PID file"),DaemonOption("--uid",help="Drops privileges to this user ID"),DaemonOption("--gid",help="Drops privileges to this group ID"),DaemonOption("--umask",help="Create files and directories with this umask"),DaemonOption("--executable",help="Override path to the Python executable"),))
[文档]classCommaSeparatedList(ParamType):"""Comma separated list argument."""name="comma separated list"
[文档]defconvert(self,value,param,ctx):ifisinstance(value,list):returnvaluetry:v=json.loads(value)exceptValueErrorase:self.fail(str(e))ifnotisinstance(v,list):self.fail(f"{value} was not an array")returnv
[文档]defconvert(self,value,param,ctx):ifisinstance(value,dict):returnvaluetry:v=json.loads(value)exceptValueErrorase:self.fail(str(e))ifnotisinstance(v,dict):self.fail(f"{value} was not an object")returnv
[文档]classISO8601DateTime(ParamType):"""ISO 8601 Date Time argument."""name="iso-86091"
[文档]classLogLevel(click.Choice):"""Log level option."""def__init__(self):"""Initialize the log level option with the relevant choices."""super().__init__(('DEBUG','INFO','WARNING','ERROR','CRITICAL','FATAL'))