"""Program used to start a Celery worker instance."""importosimportsysimportclickfromclickimportParamTypefromclick.typesimportStringParamTypefromceleryimportconcurrencyfromcelery.bin.baseimport(COMMA_SEPARATED_LIST,LOG_LEVEL,CeleryDaemonCommand,CeleryOption,handle_preload_options)fromcelery.concurrency.baseimportBasePoolfromcelery.exceptionsimportSecurityErrorfromcelery.platformsimportEX_FAILURE,EX_OK,detached,maybe_drop_privilegesfromcelery.utils.logimportget_loggerfromcelery.utils.nodenamesimportdefault_nodename,host_format,node_formatlogger=get_logger(__name__)
[文档]defconvert(self,value,param,ctx):ifctx.obj.app.IS_WINDOWSandvalue:self.fail('-B option does not work on Windows. ''Please run celery beat as a separate service.')returnvalue
[文档]classWorkersPool(click.Choice):"""Workers pool option."""name="pool"def__init__(self):"""Initialize the workers pool option with the relevant choices."""super().__init__(concurrency.get_available_pool_names())
[文档]defconvert(self,value,param,ctx):# Pools like eventlet/gevent needs to patch libs as early# as possible.ifisinstance(value,type)andissubclass(value,BasePool):returnvaluevalue=super().convert(value,param,ctx)worker_pool=ctx.obj.app.conf.worker_poolifvalue=='prefork'andworker_pool:# If we got the default pool through the CLI# we need to check if the worker pool was configured.# If the worker pool was configured, we shouldn't use the default.value=concurrency.get_implementation(worker_pool)else:value=concurrency.get_implementation(value)ifnotvalue:value=concurrency.get_implementation(worker_pool)returnvalue
[文档]defconvert(self,value,param,ctx):value=value.split(',')iflen(value)>2:self.fail("Expected two comma separated integers or one integer."f"Got {len(value)} instead.")iflen(value)==1:try:value=(int(value[0]),0)exceptValueError:self.fail(f"Expected an integer. Got {value} instead.")try:returntuple(reversed(sorted(map(int,value))))exceptValueError:self.fail("Expected two comma separated integers."f"Got {value.join(',')} instead.")
[文档]defdetach(path,argv,logfile=None,pidfile=None,uid=None,gid=None,umask=None,workdir=None,fake=False,app=None,executable=None,hostname=None):"""Detach program by argv."""fake=1ifC_FAKEFORKelsefake# `detached()` will attempt to touch the logfile to confirm that error# messages won't be lost after detaching stdout/err, but this means we need# to pre-format it rather than relying on `setup_logging_subsystem()` like# we can elsewhere.logfile=node_format(logfile,hostname)withdetached(logfile,pidfile,uid,gid,umask,workdir,fake,after_forkers=False):try:ifexecutableisnotNone:path=executableos.execv(path,[path]+argv)returnEX_OKexceptException:# pylint: disable=broad-exceptifappisNone:fromceleryimportcurrent_appapp=current_appapp.log.setup_logging_subsystem('ERROR',logfile,hostname=hostname)logger.critical("Can't exec %r",' '.join([path]+argv),exc_info=True)returnEX_FAILURE
@click.command(cls=CeleryDaemonCommand,context_settings={'allow_extra_args':True})@click.option('-n','--hostname',default=host_format(default_nodename(None)),cls=CeleryOption,type=HOSTNAME,help_group="Worker Options",help="Set custom hostname (e.g., 'w1@%%h'). ""Expands: %%h (hostname), %%n (name) and %%d, (domain).")@click.option('-D','--detach',cls=CeleryOption,is_flag=True,default=False,help_group="Worker Options",help="Start worker as a background process.")@click.option('-S','--statedb',cls=CeleryOption,type=click.Path(),callback=lambdactx,_,value:valueorctx.obj.app.conf.worker_state_db,help_group="Worker Options",help="Path to the state database. The extension '.db' may be ""appended to the filename.")@click.option('-l','--loglevel',default='WARNING',cls=CeleryOption,type=LOG_LEVEL,help_group="Worker Options",help="Logging level.")@click.option('-O','--optimization',default='default',cls=CeleryOption,type=click.Choice(('default','fair')),help_group="Worker Options",help="Apply optimization profile.")@click.option('--prefetch-multiplier',type=int,metavar="<prefetch multiplier>",callback=lambdactx,_,value:valueorctx.obj.app.conf.worker_prefetch_multiplier,cls=CeleryOption,help_group="Worker Options",help="Set custom prefetch multiplier value ""for this worker instance.")@click.option('-c','--concurrency',type=int,metavar="<concurrency>",callback=lambdactx,_,value:valueorctx.obj.app.conf.worker_concurrency,cls=CeleryOption,help_group="Pool Options",help="Number of child processes processing the queue. ""The default is the number of CPUs available"" on your system.")@click.option('-P','--pool',default='prefork',type=WORKERS_POOL,cls=CeleryOption,help_group="Pool Options",help="Pool implementation.")@click.option('-E','--task-events','--events',is_flag=True,default=None,cls=CeleryOption,help_group="Pool Options",help="Send task-related events that can be captured by monitors"" like celery events, celerymon, and others.")@click.option('--time-limit',type=float,cls=CeleryOption,help_group="Pool Options",help="Enables a hard time limit ""(in seconds int/float) for tasks.")@click.option('--soft-time-limit',type=float,cls=CeleryOption,help_group="Pool Options",help="Enables a soft time limit ""(in seconds int/float) for tasks.")@click.option('--max-tasks-per-child',type=int,cls=CeleryOption,help_group="Pool Options",help="Maximum number of tasks a pool worker can execute before ""it's terminated and replaced by a new worker.")@click.option('--max-memory-per-child',type=int,cls=CeleryOption,help_group="Pool Options",help="Maximum amount of resident memory, in KiB, that may be ""consumed by a child process before it will be replaced ""by a new one. If a single task causes a child process ""to exceed this limit, the task will be completed and ""the child process will be replaced afterwards.\n""Default: no limit.")@click.option('--purge','--discard',is_flag=True,cls=CeleryOption,help_group="Queue Options")@click.option('--queues','-Q',type=COMMA_SEPARATED_LIST,cls=CeleryOption,help_group="Queue Options")@click.option('--exclude-queues','-X',type=COMMA_SEPARATED_LIST,cls=CeleryOption,help_group="Queue Options")@click.option('--include','-I',type=COMMA_SEPARATED_LIST,cls=CeleryOption,help_group="Queue Options")@click.option('--without-gossip',is_flag=True,cls=CeleryOption,help_group="Features")@click.option('--without-mingle',is_flag=True,cls=CeleryOption,help_group="Features")@click.option('--without-heartbeat',is_flag=True,cls=CeleryOption,help_group="Features",)@click.option('--heartbeat-interval',type=int,cls=CeleryOption,help_group="Features",)@click.option('--autoscale',type=AUTOSCALE,cls=CeleryOption,help_group="Features",)@click.option('-B','--beat',type=CELERY_BEAT,cls=CeleryOption,is_flag=True,help_group="Embedded Beat Options")@click.option('-s','--schedule-filename','--schedule',callback=lambdactx,_,value:valueorctx.obj.app.conf.beat_schedule_filename,cls=CeleryOption,help_group="Embedded Beat Options")@click.option('--scheduler',cls=CeleryOption,help_group="Embedded Beat Options")@click.pass_context@handle_preload_optionsdefworker(ctx,hostname=None,pool_cls=None,app=None,uid=None,gid=None,loglevel=None,logfile=None,pidfile=None,statedb=None,**kwargs):"""Start worker instance. \b Examples -------- \b $ celery --app=proj worker -l INFO $ celery -A proj worker -l INFO -Q hipri,lopri $ celery -A proj worker --concurrency=4 $ celery -A proj worker --concurrency=1000 -P eventlet $ celery worker --autoscale=10,0 """try:app=ctx.obj.appifctx.args:try:app.config_from_cmdline(ctx.args,namespace='worker')except(KeyError,ValueError)ase:# TODO: Improve the error messagesraiseclick.UsageError("Unable to parse extra configuration from command line.\n"f"Reason: {e}",ctx=ctx)ifkwargs.get('detach',False):argv=['-m','celery']+sys.argv[1:]if'--detach'inargv:argv.remove('--detach')if'-D'inargv:argv.remove('-D')if"--uid"inargv:argv.remove('--uid')if"--gid"inargv:argv.remove('--gid')returndetach(sys.executable,argv,logfile=logfile,pidfile=pidfile,uid=uid,gid=gid,umask=kwargs.get('umask',None),workdir=kwargs.get('workdir',None),app=app,executable=kwargs.get('executable',None),hostname=hostname)maybe_drop_privileges(uid=uid,gid=gid)worker=app.Worker(hostname=hostname,pool_cls=pool_cls,loglevel=loglevel,logfile=logfile,# node format handled by celery.app.log.setuppidfile=node_format(pidfile,hostname),statedb=node_format(statedb,hostname),no_color=ctx.obj.no_color,quiet=ctx.obj.quiet,**kwargs)worker.start()ctx.exit(worker.exitcode)exceptSecurityErrorase:ctx.obj.error(e.args[0])ctx.exit(1)