"""Worker remote control command implementations."""importioimporttempfilefromcollectionsimportUserDict,defaultdict,namedtuplefrombilliard.commonimportTERM_SIGNAMEfromkombu.utils.encodingimportsafe_reprfromcelery.exceptionsimportWorkerShutdownfromcelery.platformsimportsignalsas_signalsfromcelery.utils.functionalimportmaybe_listfromcelery.utils.logimportget_loggerfromcelery.utils.serializationimportjsonify,strtoboolfromcelery.utils.timeimportratefrom.importstateasworker_statefrom.requestimportRequest__all__=('Panel',)DEFAULT_TASK_INFO_ITEMS=('exchange','routing_key','rate_limit')logger=get_logger(__name__)controller_info_t=namedtuple('controller_info_t',['alias','type','visible','default_timeout','help','signature','args','variadic',])defok(value):return{'ok':value}defnok(value):return{'error':value}
[文档]classPanel(UserDict):"""Global registry of remote control commands."""data={}# global dict.meta={}# -"-
defcontrol_command(**kwargs):returnPanel.register(type='control',**kwargs)definspect_command(**kwargs):returnPanel.register(type='inspect',**kwargs)# -- App@inspect_command()defreport(state):"""Information about Celery installation for bug reports."""returnok(state.app.bugreport())@inspect_command(alias='dump_conf',# XXX < backwards compatiblesignature='[include_defaults=False]',args=[('with_defaults',strtobool)],)defconf(state,with_defaults=False,**kwargs):"""List configuration."""returnjsonify(state.app.conf.table(with_defaults=with_defaults),keyfilter=_wanted_config_key,unknown_type_filter=safe_repr)def_wanted_config_key(key):returnisinstance(key,str)andnotkey.startswith('__')# -- Task@inspect_command(variadic='ids',signature='[id1 [id2 [... [idN]]]]',)defquery_task(state,ids,**kwargs):"""Query for task information by id."""return{req.id:(_state_of_task(req),req.info())forreqin_find_requests_by_id(maybe_list(ids))}def_find_requests_by_id(ids,get_request=worker_state.requests.__getitem__):fortask_idinids:try:yieldget_request(task_id)exceptKeyError:passdef_state_of_task(request,is_active=worker_state.active_requests.__contains__,is_reserved=worker_state.reserved_requests.__contains__):ifis_active(request):return'active'elifis_reserved(request):return'reserved'return'ready'@control_command(variadic='task_id',signature='[id1 [id2 [... [idN]]]]',)defrevoke(state,task_id,terminate=False,signal=None,**kwargs):"""Revoke task by task id (or list of ids). Keyword Arguments: terminate (bool): Also terminate the process if the task is active. signal (str): Name of signal to use for terminate (e.g., ``KILL``). """# pylint: disable=redefined-outer-name# XXX Note that this redefines `terminate`:# Outside of this scope that is a function.# supports list argument since 3.1task_ids,task_id=set(maybe_list(task_id)or[]),Nonetask_ids=_revoke(state,task_ids,terminate,signal,**kwargs)ifisinstance(task_ids,dict)and'ok'intask_ids:returntask_idsreturnok(f'tasks {task_ids} flagged as revoked')@control_command(variadic='headers',signature='[key1=value1 [key2=value2 [... [keyN=valueN]]]]',)defrevoke_by_stamped_headers(state,headers,terminate=False,signal=None,**kwargs):"""Revoke task by header (or list of headers). Keyword Arguments: headers(dictionary): Dictionary that contains stamping scheme name as keys and stamps as values. If headers is a list, it will be converted to a dictionary. terminate (bool): Also terminate the process if the task is active. signal (str): Name of signal to use for terminate (e.g., ``KILL``). Sample headers input: {'mtask_id': [id1, id2, id3]} """# pylint: disable=redefined-outer-name# XXX Note that this redefines `terminate`:# Outside of this scope that is a function.# supports list argument since 3.1signum=_signals.signum(signalorTERM_SIGNAME)ifisinstance(headers,list):headers={h.split('=')[0]:h.split('=')[1]forhinheaders}forheader,stampsinheaders.items():updated_stamps=maybe_list(worker_state.revoked_stamps.get(header)or[])+list(maybe_list(stamps))worker_state.revoked_stamps[header]=updated_stampsifnotterminate:returnok(f'headers {headers} flagged as revoked, but not terminated')active_requests=list(worker_state.active_requests)terminated_scheme_to_stamps_mapping=defaultdict(set)# Terminate all running tasks of matching headers# Go through all active requests, and check if one of the# requests has a stamped header that matches the given headers to revokeforreqinactive_requests:# Check stamps existifhasattr(req,"stamps")andreq.stamps:# if so, check if any stamps match a revoked stampforexpected_header_key,expected_header_valueinheaders.items():ifexpected_header_keyinreq.stamps:expected_header_value=maybe_list(expected_header_value)actual_header=maybe_list(req.stamps[expected_header_key])matching_stamps_for_request=set(actual_header)&set(expected_header_value)# Check any possible match regardless if the stamps are a sequence or notifmatching_stamps_for_request:terminated_scheme_to_stamps_mapping[expected_header_key].update(matching_stamps_for_request)req.terminate(state.consumer.pool,signal=signum)ifnotterminated_scheme_to_stamps_mapping:returnok(f'headers {headers} were not terminated')returnok(f'headers {terminated_scheme_to_stamps_mapping} revoked')def_revoke(state,task_ids,terminate=False,signal=None,**kwargs):size=len(task_ids)terminated=set()worker_state.revoked.update(task_ids)ifterminate:signum=_signals.signum(signalorTERM_SIGNAME)forrequestin_find_requests_by_id(task_ids):ifrequest.idnotinterminated:terminated.add(request.id)logger.info('Terminating %s (%s)',request.id,signum)request.terminate(state.consumer.pool,signal=signum)iflen(terminated)>=size:breakifnotterminated:returnok('terminate: tasks unknown')returnok('terminate: {}'.format(', '.join(terminated)))idstr=', '.join(task_ids)logger.info('Tasks flagged as revoked: %s',idstr)returntask_ids@control_command(variadic='task_id',args=[('signal',str)],signature='<signal> [id1 [id2 [... [idN]]]]')defterminate(state,signal,task_id,**kwargs):"""Terminate task by task id (or list of ids)."""returnrevoke(state,task_id,terminate=True,signal=signal)@control_command(args=[('task_name',str),('rate_limit',str)],signature='<task_name> <rate_limit (e.g., 5/s | 5/m | 5/h)>',)defrate_limit(state,task_name,rate_limit,**kwargs):"""Tell worker(s) to modify the rate limit for a task by type. See Also: :attr:`celery.app.task.Task.rate_limit`. Arguments: task_name (str): Type of task to set rate limit for. rate_limit (int, str): New rate limit. """# pylint: disable=redefined-outer-name# XXX Note that this redefines `terminate`:# Outside of this scope that is a function.try:rate(rate_limit)exceptValueErrorasexc:returnnok(f'Invalid rate limit string: {exc!r}')try:state.app.tasks[task_name].rate_limit=rate_limitexceptKeyError:logger.error('Rate limit attempt for unknown task %s',task_name,exc_info=True)returnnok('unknown task')state.consumer.reset_rate_limits()ifnotrate_limit:logger.info('Rate limits disabled for tasks of type %s',task_name)returnok('rate limit disabled successfully')logger.info('New rate limit for tasks of type %s: %s.',task_name,rate_limit)returnok('new rate limit set successfully')@control_command(args=[('task_name',str),('soft',float),('hard',float)],signature='<task_name> <soft_secs> [hard_secs]',)deftime_limit(state,task_name=None,hard=None,soft=None,**kwargs):"""Tell worker(s) to modify the time limit for task by type. Arguments: task_name (str): Name of task to change. hard (float): Hard time limit. soft (float): Soft time limit. """try:task=state.app.tasks[task_name]exceptKeyError:logger.error('Change time limit attempt for unknown task %s',task_name,exc_info=True)returnnok('unknown task')task.soft_time_limit=softtask.time_limit=hardlogger.info('New time limits for tasks of type %s: soft=%s hard=%s',task_name,soft,hard)returnok('time limits set successfully')# -- Events@inspect_command()defclock(state,**kwargs):"""Get current logical clock value."""return{'clock':state.app.clock.value}@control_command()defelection(state,id,topic,action=None,**kwargs):"""Hold election. Arguments: id (str): Unique election id. topic (str): Election topic. action (str): Action to take for elected actor. """ifstate.consumer.gossip:state.consumer.gossip.election(id,topic,action)@control_command()defenable_events(state):"""Tell worker(s) to send task-related events."""dispatcher=state.consumer.event_dispatcherifdispatcher.groupsand'task'notindispatcher.groups:dispatcher.groups.add('task')logger.info('Events of group {task} enabled by remote.')returnok('task events enabled')returnok('task events already enabled')@control_command()defdisable_events(state):"""Tell worker(s) to stop sending task-related events."""dispatcher=state.consumer.event_dispatcherif'task'indispatcher.groups:dispatcher.groups.discard('task')logger.info('Events of group {task} disabled by remote.')returnok('task events disabled')returnok('task events already disabled')@control_command()defheartbeat(state):"""Tell worker(s) to send event heartbeat immediately."""logger.debug('Heartbeat requested by remote.')dispatcher=state.consumer.event_dispatcherdispatcher.send('worker-heartbeat',freq=5,**worker_state.SOFTWARE_INFO)# -- Worker@inspect_command(visible=False)defhello(state,from_node,revoked=None,**kwargs):"""Request mingle sync-data."""# pylint: disable=redefined-outer-name# XXX Note that this redefines `revoked`:# Outside of this scope that is a function.iffrom_node!=state.hostname:logger.info('sync with %s',from_node)ifrevoked:worker_state.revoked.update(revoked)# Do not send expired items to the other worker.worker_state.revoked.purge()return{'revoked':worker_state.revoked._data,'clock':state.app.clock.forward(),}@inspect_command(default_timeout=0.2)defping(state,**kwargs):"""Ping worker(s)."""returnok('pong')@inspect_command()defstats(state,**kwargs):"""Request worker statistics/information."""returnstate.consumer.controller.stats()@inspect_command(alias='dump_schedule')defscheduled(state,**kwargs):"""List of currently scheduled ETA/countdown tasks."""returnlist(_iter_schedule_requests(state.consumer.timer))def_iter_schedule_requests(timer):forwaitingintimer.schedule.queue:try:arg0=waiting.entry.args[0]except(IndexError,TypeError):continueelse:ifisinstance(arg0,Request):yield{'eta':arg0.eta.isoformat()ifarg0.etaelseNone,'priority':waiting.priority,'request':arg0.info(),}@inspect_command(alias='dump_reserved')defreserved(state,**kwargs):"""List of currently reserved tasks, not including scheduled/active."""reserved_tasks=(state.tset(worker_state.reserved_requests)-state.tset(worker_state.active_requests))ifnotreserved_tasks:return[]return[request.info()forrequestinreserved_tasks]@inspect_command(alias='dump_active')defactive(state,safe=False,**kwargs):"""List of tasks currently being executed."""return[request.info(safe=safe)forrequestinstate.tset(worker_state.active_requests)]@inspect_command(alias='dump_revoked')defrevoked(state,**kwargs):"""List of revoked task-ids."""returnlist(worker_state.revoked)@inspect_command(alias='dump_tasks',variadic='taskinfoitems',signature='[attr1 [attr2 [... [attrN]]]]',)defregistered(state,taskinfoitems=None,builtins=False,**kwargs):"""List of registered tasks. Arguments: taskinfoitems (Sequence[str]): List of task attributes to include. Defaults to ``exchange,routing_key,rate_limit``. builtins (bool): Also include built-in tasks. """reg=state.app.taskstaskinfoitems=taskinfoitemsorDEFAULT_TASK_INFO_ITEMStasks=regifbuiltinselse(taskfortaskinregifnottask.startswith('celery.'))def_extract_info(task):fields={field:str(getattr(task,field,None))forfieldintaskinfoitemsifgetattr(task,field,None)isnotNone}iffields:info=['='.join(f)forfinfields.items()]return'{} [{}]'.format(task.name,' '.join(info))returntask.namereturn[_extract_info(reg[task])fortaskinsorted(tasks)]# -- Debugging@inspect_command(default_timeout=60.0,args=[('type',str),('num',int),('max_depth',int)],signature='[object_type=Request] [num=200 [max_depth=10]]',)defobjgraph(state,num=200,max_depth=10,type='Request'):# pragma: no cover"""Create graph of uncollected objects (memory-leak debugging). Arguments: num (int): Max number of objects to graph. max_depth (int): Traverse at most n levels deep. type (str): Name of object to graph. Default is ``"Request"``. """try:importobjgraphas_objgraphexceptImportError:raiseImportError('Requires the objgraph library')logger.info('Dumping graph for type %r',type)withtempfile.NamedTemporaryFile(prefix='cobjg',suffix='.png',delete=False)asfh:objects=_objgraph.by_type(type)[:num]_objgraph.show_backrefs(objects,max_depth=max_depth,highlight=lambdav:vinobjects,filename=fh.name,)return{'filename':fh.name}@inspect_command()defmemsample(state,**kwargs):"""Sample current RSS memory usage."""fromcelery.utils.debugimportsample_memreturnsample_mem()@inspect_command(args=[('samples',int)],signature='[n_samples=10]',)defmemdump(state,samples=10,**kwargs):# pragma: no cover"""Dump statistics of previous memsample requests."""fromcelery.utilsimportdebugout=io.StringIO()debug.memdump(file=out)returnout.getvalue()# -- Pool@control_command(args=[('n',int)],signature='[N=1]',)defpool_grow(state,n=1,**kwargs):"""Grow pool by n processes/threads."""ifstate.consumer.controller.autoscaler:returnnok("pool_grow is not supported with autoscale. Adjust autoscale range instead.")else:state.consumer.pool.grow(n)state.consumer._update_prefetch_count(n)returnok('pool will grow')@control_command(args=[('n',int)],signature='[N=1]',)defpool_shrink(state,n=1,**kwargs):"""Shrink pool by n processes/threads."""ifstate.consumer.controller.autoscaler:returnnok("pool_shrink is not supported with autoscale. Adjust autoscale range instead.")else:state.consumer.pool.shrink(n)state.consumer._update_prefetch_count(-n)returnok('pool will shrink')@control_command()defpool_restart(state,modules=None,reload=False,reloader=None,**kwargs):"""Restart execution pool."""ifstate.app.conf.worker_pool_restarts:state.consumer.controller.reload(modules,reload,reloader=reloader)returnok('reload started')else:raiseValueError('Pool restarts not enabled')@control_command(args=[('max',int),('min',int)],signature='[max [min]]',)defautoscale(state,max=None,min=None):"""Modify autoscale settings."""autoscaler=state.consumer.controller.autoscalerifautoscaler:max_,min_=autoscaler.update(max,min)returnok(f'autoscale now max={max_} min={min_}')raiseValueError('Autoscale not enabled')@control_command()defshutdown(state,msg='Got shutdown from remote',**kwargs):"""Shutdown worker(s)."""logger.warning(msg)raiseWorkerShutdown(0)# -- Queues@control_command(args=[('queue',str),('exchange',str),('exchange_type',str),('routing_key',str),],signature='<queue> [exchange [type [routing_key]]]',)defadd_consumer(state,queue,exchange=None,exchange_type=None,routing_key=None,**options):"""Tell worker(s) to consume from task queue by name."""state.consumer.call_soon(state.consumer.add_task_queue,queue,exchange,exchange_typeor'direct',routing_key,**options)returnok(f'add consumer {queue}')@control_command(args=[('queue',str)],signature='<queue>',)defcancel_consumer(state,queue,**_):"""Tell worker(s) to stop consuming from task queue by name."""state.consumer.call_soon(state.consumer.cancel_task_queue,queue,)returnok(f'no longer consuming from {queue}')@inspect_command()defactive_queues(state):"""List the task queues a worker is currently consuming from."""ifstate.consumer.task_consumer:return[dict(queue.as_dict(recurse=True))forqueueinstate.consumer.task_consumer.queues]return[]