"""Celery Command Line Interface."""importosimportpathlibimportsysimporttracebacktry:fromimportlib.metadataimportentry_pointsexceptImportError:fromimportlib_metadataimportentry_pointsimportclickimportclick.exceptionsfromclick_didyoumeanimportDYMGroupfromclick_pluginsimportwith_pluginsfromceleryimportVERSION_BANNERfromcelery.app.utilsimportfind_appfromcelery.bin.amqpimportamqpfromcelery.bin.baseimportCeleryCommand,CeleryOption,CLIContextfromcelery.bin.beatimportbeatfromcelery.bin.callimportcallfromcelery.bin.controlimportcontrol,inspect,statusfromcelery.bin.eventsimporteventsfromcelery.bin.graphimportgraphfromcelery.bin.listimportlist_fromcelery.bin.logtoolimportlogtoolfromcelery.bin.migrateimportmigratefromcelery.bin.multiimportmultifromcelery.bin.purgeimportpurgefromcelery.bin.resultimportresultfromcelery.bin.shellimportshellfromcelery.bin.upgradeimportupgradefromcelery.bin.workerimportworkerUNABLE_TO_LOAD_APP_MODULE_NOT_FOUND=click.style("""Unable to load celery application.The module {0} was not found.""",fg='red')UNABLE_TO_LOAD_APP_ERROR_OCCURRED=click.style("""Unable to load celery application.While trying to load the module {0} the following error occurred:{1}""",fg='red')UNABLE_TO_LOAD_APP_APP_MISSING=click.style("""Unable to load celery application.{0}""")ifsys.version_info>=(3,10):_PLUGINS=entry_points(group='celery.commands')else:try:_PLUGINS=entry_points().get('celery.commands',[])exceptAttributeError:_PLUGINS=entry_points().select(group='celery.commands')@with_plugins(_PLUGINS)@click.group(cls=DYMGroup,invoke_without_command=True)@click.option('-A','--app',envvar='APP',cls=CeleryOption,# May take either: a str when invoked from command line (Click),# or a Celery object when invoked from inside Celery; hence the# need to prevent Click from "processing" the Celery object and# converting it into its str representation.type=click.UNPROCESSED,help_group="Global Options")@click.option('-b','--broker',envvar='BROKER_URL',cls=CeleryOption,help_group="Global Options")@click.option('--result-backend',envvar='RESULT_BACKEND',cls=CeleryOption,help_group="Global Options")@click.option('--loader',envvar='LOADER',cls=CeleryOption,help_group="Global Options")@click.option('--config',envvar='CONFIG_MODULE',cls=CeleryOption,help_group="Global Options")@click.option('--workdir',cls=CeleryOption,type=pathlib.Path,callback=lambda_,__,wd:os.chdir(wd)ifwdelseNone,is_eager=True,help_group="Global Options")@click.option('-C','--no-color',envvar='NO_COLOR',is_flag=True,cls=CeleryOption,help_group="Global Options")@click.option('-q','--quiet',is_flag=True,cls=CeleryOption,help_group="Global Options")@click.option('--version',cls=CeleryOption,is_flag=True,help_group="Global Options")@click.option('--skip-checks',envvar='SKIP_CHECKS',cls=CeleryOption,is_flag=True,help_group="Global Options",help="Skip Django core checks on startup. Setting the SKIP_CHECKS environment ""variable to any non-empty string will have the same effect.")@click.pass_contextdefcelery(ctx,app,broker,result_backend,loader,config,workdir,no_color,quiet,version,skip_checks):"""Celery command entrypoint."""ifversion:click.echo(VERSION_BANNER)ctx.exit()elifctx.invoked_subcommandisNone:click.echo(ctx.get_help())ctx.exit()ifloader:# Default app takes loader from this env (Issue #1066).os.environ['CELERY_LOADER']=loaderifbroker:os.environ['CELERY_BROKER_URL']=brokerifresult_backend:os.environ['CELERY_RESULT_BACKEND']=result_backendifconfig:os.environ['CELERY_CONFIG_MODULE']=configifskip_checks:os.environ['CELERY_SKIP_CHECKS']='true'ifisinstance(app,str):try:app=find_app(app)exceptModuleNotFoundErrorase:ife.name!=app:exc=traceback.format_exc()ctx.fail(UNABLE_TO_LOAD_APP_ERROR_OCCURRED.format(app,exc))ctx.fail(UNABLE_TO_LOAD_APP_MODULE_NOT_FOUND.format(e.name))exceptAttributeErrorase:attribute_name=e.args[0].capitalize()ctx.fail(UNABLE_TO_LOAD_APP_APP_MISSING.format(attribute_name))exceptException:exc=traceback.format_exc()ctx.fail(UNABLE_TO_LOAD_APP_ERROR_OCCURRED.format(app,exc))ctx.obj=CLIContext(app=app,no_color=no_color,workdir=workdir,quiet=quiet)# User optionsworker.params.extend(ctx.obj.app.user_options.get('worker',[]))beat.params.extend(ctx.obj.app.user_options.get('beat',[]))events.params.extend(ctx.obj.app.user_options.get('events',[]))forcommandincelery.commands.values():command.params.extend(ctx.obj.app.user_options.get('preload',[]))@celery.command(cls=CeleryCommand)@click.pass_contextdefreport(ctx,**kwargs):"""Shows information useful to include in bug-reports."""app=ctx.obj.appapp.loader.import_default_modules()ctx.obj.echo(app.bugreport())celery.add_command(purge)celery.add_command(call)celery.add_command(beat)celery.add_command(list_)celery.add_command(result)celery.add_command(migrate)celery.add_command(status)celery.add_command(worker)celery.add_command(events)celery.add_command(inspect)celery.add_command(control)celery.add_command(graph)celery.add_command(upgrade)celery.add_command(logtool)celery.add_command(amqp)celery.add_command(shell)celery.add_command(multi)# Monkey-patch click to display a custom error# when -A or --app are used as sub-command options instead of as options# of the global command.previous_show_implementation=click.exceptions.NoSuchOption.showWRONG_APP_OPTION_USAGE_MESSAGE="""You are using `{option_name}` as an option of the {info_name} sub-command:celery {info_name}{option_name} celeryapp <...>The support for this usage was removed in Celery 5.0. Instead you should use `{option_name}` as a global option:celery {option_name} celeryapp {info_name} <...>"""def_show(self,file=None):ifself.option_namein('-A','--app'):self.ctx.obj.error(WRONG_APP_OPTION_USAGE_MESSAGE.format(option_name=self.option_name,info_name=self.ctx.info_name),fg='red')previous_show_implementation(self,file=file)click.exceptions.NoSuchOption.show=_show
[文档]defmain()->int:"""Start celery umbrella command. This function is the main entrypoint for the CLI. :return: The exit code of the CLI. """returncelery(auto_envvar_prefix="CELERY")