"""In-memory representation of cluster state.This module implements a data-structure used to keeptrack of the state of a cluster of workers and the tasksit is working on (by consuming events).For every event consumed the state is updated,so the state represents the state of the clusterat the time of the last event.Snapshots (:mod:`celery.events.snapshot`) can be used totake "pictures" of this state at regular intervalsto for example, store that in a database."""importbisectimportsysimportthreadingfromcollectionsimportdefaultdictfromcollections.abcimportCallablefromdatetimeimportdatetimefromdecimalimportDecimalfromitertoolsimportislicefromoperatorimportitemgetterfromtimeimporttimefromtypingimportMapping,Optional# noqafromweakrefimportWeakSet,reffromkombu.clocksimporttimetuplefromkombu.utils.objectsimportcached_propertyfromceleryimportstatesfromcelery.utils.functionalimportLRUCache,memoize,pass1fromcelery.utils.logimportget_logger__all__=('Worker','Task','State','heartbeat_expires')# pylint: disable=redefined-outer-name# We cache globals and attribute lookups, so disable this warning.# pylint: disable=too-many-function-args# For some reason pylint thinks ._event is a method, when it's a property.#: Set if running PyPyPYPY=hasattr(sys,'pypy_version_info')#: The window (in percentage) is added to the workers heartbeat#: frequency. If the time between updates exceeds this window,#: then the worker is considered to be offline.HEARTBEAT_EXPIRE_WINDOW=200#: Max drift between event timestamp and time of event received#: before we alert that clocks may be unsynchronized.HEARTBEAT_DRIFT_MAX=16DRIFT_WARNING=("Substantial drift from %s may mean clocks are out of sync. Current drift is ""%s seconds. [orig: %s recv: %s]")logger=get_logger(__name__)warn=logger.warningR_STATE='<State: events={0.event_count} tasks={0.task_count}>'R_WORKER='<Worker: {0.hostname} ({0.status_string} clock:{0.clock})'R_TASK='<Task: {0.name}({0.uuid}) {0.state} clock:{0.clock}>'#: Mapping of task event names to task state.TASK_EVENT_TO_STATE={'sent':states.PENDING,'received':states.RECEIVED,'started':states.STARTED,'failed':states.FAILURE,'retried':states.RETRY,'succeeded':states.SUCCESS,'revoked':states.REVOKED,'rejected':states.REJECTED,}classCallableDefaultdict(defaultdict):""":class:`~collections.defaultdict` with configurable __call__. We use this for backwards compatibility in State.tasks_by_type etc, which used to be a method but is now an index instead. So you can do:: >>> add_tasks = state.tasks_by_type['proj.tasks.add'] while still supporting the method call:: >>> add_tasks = list(state.tasks_by_type( ... 'proj.tasks.add', reverse=True)) """def__init__(self,fun,*args,**kwargs):self.fun=funsuper().__init__(*args,**kwargs)def__call__(self,*args,**kwargs):returnself.fun(*args,**kwargs)Callable.register(CallableDefaultdict)@memoize(maxsize=1000,keyfun=lambdaa,_:a[0])def_warn_drift(hostname,drift,local_received,timestamp):# we use memoize here so the warning is only logged once per hostnamewarn(DRIFT_WARNING,hostname,drift,datetime.fromtimestamp(local_received),datetime.fromtimestamp(timestamp))
[文档]defheartbeat_expires(timestamp,freq=60,expire_window=HEARTBEAT_EXPIRE_WINDOW,Decimal=Decimal,float=float,isinstance=isinstance):"""Return time when heartbeat expires."""# some json implementations returns decimal.Decimal objects,# which aren't compatible with float.freq=float(freq)ifisinstance(freq,Decimal)elsefreqifisinstance(timestamp,Decimal):timestamp=float(timestamp)returntimestamp+(freq*(expire_window/1e2))
[文档]@with_unique_field('hostname')classWorker:"""Worker State."""heartbeat_max=4expire_window=HEARTBEAT_EXPIRE_WINDOW_fields=('hostname','pid','freq','heartbeats','clock','active','processed','loadavg','sw_ident','sw_ver','sw_sys')ifnotPYPY:# pragma: no cover__slots__=_fields+('event','__dict__','__weakref__')def__init__(self,hostname=None,pid=None,freq=60,heartbeats=None,clock=0,active=None,processed=None,loadavg=None,sw_ident=None,sw_ver=None,sw_sys=None):self.hostname=hostnameself.pid=pidself.freq=freqself.heartbeats=[]ifheartbeatsisNoneelseheartbeatsself.clock=clockor0self.active=activeself.processed=processedself.loadavg=loadavgself.sw_ident=sw_identself.sw_ver=sw_verself.sw_sys=sw_sysself.event=self._create_event_handler()def__reduce__(self):returnself.__class__,(self.hostname,self.pid,self.freq,self.heartbeats,self.clock,self.active,self.processed,self.loadavg,self.sw_ident,self.sw_ver,self.sw_sys)def_create_event_handler(self):_set=object.__setattr__hbmax=self.heartbeat_maxheartbeats=self.heartbeatshb_pop=self.heartbeats.pophb_append=self.heartbeats.appenddefevent(type_,timestamp=None,local_received=None,fields=None,max_drift=HEARTBEAT_DRIFT_MAX,abs=abs,int=int,insort=bisect.insort,len=len):fields=fieldsor{}fork,vinfields.items():_set(self,k,v)iftype_=='offline':heartbeats[:]=[]else:ifnotlocal_receivedornottimestamp:returndrift=abs(int(local_received)-int(timestamp))ifdrift>max_drift:_warn_drift(self.hostname,drift,local_received,timestamp)iflocal_received:# pragma: no coverhearts=len(heartbeats)ifhearts>hbmax-1:hb_pop(0)ifheartsandlocal_received>heartbeats[-1]:hb_append(local_received)else:insort(heartbeats,local_received)returnevent
[文档]@with_unique_field('uuid')classTask:"""Task State."""name=received=sent=started=succeeded=failed=retried= \
revoked=rejected=args=kwargs=eta=expires=retries= \
worker=result=exception=timestamp=runtime=traceback= \
exchange=routing_key=root_id=parent_id=client=Nonestate=states.PENDINGclock=0_fields=('uuid','name','state','received','sent','started','rejected','succeeded','failed','retried','revoked','args','kwargs','eta','expires','retries','worker','result','exception','timestamp','runtime','traceback','exchange','routing_key','clock','client','root','root_id','parent','parent_id','children',)ifnotPYPY:# pragma: no cover__slots__=('__dict__','__weakref__')#: How to merge out of order events.#: Disorder is detected by logical ordering (e.g., :event:`task-received`#: must've happened before a :event:`task-failed` event).#:#: A merge rule consists of a state and a list of fields to keep from#: that state. ``(RECEIVED, ('name', 'args')``, means the name and args#: fields are always taken from the RECEIVED state, and any values for#: these fields received before or after is simply ignored.merge_rules={states.RECEIVED:('name','args','kwargs','parent_id','root_id','retries','eta','expires',),}#: meth:`info` displays these fields by default._info_fields=('args','kwargs','retries','result','eta','runtime','expires','exception','exchange','routing_key','root_id','parent_id',)def__init__(self,uuid=None,cluster_state=None,children=None,**kwargs):self.uuid=uuidself.cluster_state=cluster_stateifself.cluster_stateisnotNone:self.children=WeakSet(self.cluster_state.tasks.get(task_id)fortask_idinchildrenor()iftask_idinself.cluster_state.tasks)else:self.children=WeakSet()self._serializer_handlers={'children':self._serializable_children,'root':self._serializable_root,'parent':self._serializable_parent,}ifkwargs:self.__dict__.update(kwargs)
[文档]defevent(self,type_,timestamp=None,local_received=None,fields=None,precedence=states.precedence,setattr=setattr,task_event_to_state=TASK_EVENT_TO_STATE.get,RETRY=states.RETRY):fields=fieldsor{}# using .get is faster than catching KeyError in this case.state=task_event_to_state(type_)ifstateisnotNone:# sets, for example, self.succeeded to the timestamp.setattr(self,type_,timestamp)else:state=type_.upper()# custom state# note that precedence here is reversed# see implementation in celery.states.state.__lt__ifstate!=RETRYandself.state!=RETRYand \
precedence(state)>precedence(self.state):# this state logically happens-before the current state, so merge.keep=self.merge_rules.get(state)ifkeepisnotNone:fields={k:vfork,vinfields.items()ifkinkeep}else:fields.update(state=state,timestamp=timestamp)# update current state with info from this event.self.__dict__.update(fields)
[文档]definfo(self,fields=None,extra=None):"""Information about this task suitable for on-screen display."""extra=[]ifnotextraelseextrafields=self._info_fieldsiffieldsisNoneelsefieldsdef_keys():forkeyinlist(fields)+list(extra):value=getattr(self,key,None)ifvalueisnotNone:yieldkey,valuereturndict(_keys())
[文档]defget_or_create_worker(self,hostname,**kwargs):"""Get or create worker by hostname. Returns: Tuple: of ``(worker, was_created)`` pairs. """try:worker=self.workers[hostname]ifkwargs:worker.update(kwargs)returnworker,FalseexceptKeyError:worker=self.workers[hostname]=self.Worker(hostname,**kwargs)returnworker,True
[文档]defget_or_create_task(self,uuid):"""Get or create task by uuid."""try:returnself.tasks[uuid],FalseexceptKeyError:task=self.tasks[uuid]=self.Task(uuid,cluster_state=self)returntask,True
[文档]deftask_event(self,type_,fields):"""Deprecated, use :meth:`event`."""returnself._event(dict(fields,type='-'.join(['task',type_])))[0]
[文档]defworker_event(self,type_,fields):"""Deprecated, use :meth:`event`."""returnself._event(dict(fields,type='-'.join(['worker',type_])))[0]
def_create_dispatcher(self):# pylint: disable=too-many-statements# This code is highly optimized, but not for reusability.get_handler=self.handlers.__getitem__event_callback=self.event_callbackwfields=itemgetter('hostname','timestamp','local_received')tfields=itemgetter('uuid','hostname','timestamp','local_received','clock')taskheap=self._taskheapth_append=taskheap.appendth_pop=taskheap.pop# Removing events from task heap is an O(n) operation,# so easier to just account for the common number of events# for each task (PENDING->RECEIVED->STARTED->final)#: an O(n) operationmax_events_in_heap=self.max_tasks_in_memory*self.heap_multiplieradd_type=self._seen_types.addon_node_join,on_node_leave=self.on_node_join,self.on_node_leavetasks,Task=self.tasks,self.Taskworkers,Worker=self.workers,self.Worker# avoid updating LRU entry at getitemget_worker,get_task=workers.data.__getitem__,tasks.data.__getitem__get_task_by_type_set=self.tasks_by_type.__getitem__get_task_by_worker_set=self.tasks_by_worker.__getitem__def_event(event,timetuple=timetuple,KeyError=KeyError,insort=bisect.insort,created=True):self.event_count+=1ifevent_callback:event_callback(self,event)group,_,subject=event['type'].partition('-')try:handler=get_handler(group)exceptKeyError:passelse:returnhandler(subject,event),subjectifgroup=='worker':try:hostname,timestamp,local_received=wfields(event)exceptKeyError:passelse:is_offline=subject=='offline'try:worker,created=get_worker(hostname),FalseexceptKeyError:ifis_offline:worker,created=Worker(hostname),Falseelse:worker=workers[hostname]=Worker(hostname)worker.event(subject,timestamp,local_received,event)ifon_node_joinand(createdorsubject=='online'):on_node_join(worker)ifon_node_leaveandis_offline:on_node_leave(worker)workers.pop(hostname,None)return(worker,created),subjectelifgroup=='task':(uuid,hostname,timestamp,local_received,clock)=tfields(event)# task-sent event is sent by client, not workeris_client_event=subject=='sent'try:task,task_created=get_task(uuid),FalseexceptKeyError:task=tasks[uuid]=Task(uuid,cluster_state=self)task_created=Trueifis_client_event:task.client=hostnameelse:try:worker=get_worker(hostname)exceptKeyError:worker=workers[hostname]=Worker(hostname)task.worker=workerifworkerisnotNoneandlocal_received:worker.event(None,local_received,timestamp)origin=hostnameifis_client_eventelseworker.id# remove oldest event if exceeding the limit.heaps=len(taskheap)ifheaps+1>max_events_in_heap:th_pop(0)# most events will be dated later than the previous.timetup=timetuple(clock,timestamp,origin,ref(task))ifheapsandtimetup>taskheap[-1]:th_append(timetup)else:insort(taskheap,timetup)ifsubject=='received':self.task_count+=1task.event(subject,timestamp,local_received,event)task_name=task.nameiftask_nameisnotNone:add_type(task_name)iftask_created:# add to tasks_by_type indexget_task_by_type_set(task_name).add(task)get_task_by_worker_set(hostname).add(task)iftask.parent_id:try:parent_task=self.tasks[task.parent_id]exceptKeyError:self._add_pending_task_child(task)else:parent_task.children.add(task)try:_children=self._tasks_to_resolve.pop(uuid)exceptKeyError:passelse:task.children.update(_children)return(task,task_created),subjectreturn_eventdef_add_pending_task_child(self,task):try:ch=self._tasks_to_resolve[task.parent_id]exceptKeyError:ch=self._tasks_to_resolve[task.parent_id]=WeakSet()ch.add(task)
[文档]deftasks_by_time(self,limit=None,reverse:bool=True):"""Generator yielding tasks ordered by time. Yields: Tuples of ``(uuid, Task)``. """_heap=self._taskheapifreverse:_heap=reversed(_heap)seen=set()forevtupinislice(_heap,0,limit):task=evtup[3]()iftaskisnotNone:uuid=task.uuidifuuidnotinseen:yielduuid,taskseen.add(uuid)
tasks_by_timestamp=tasks_by_timedef_tasks_by_type(self,name,limit=None,reverse=True):"""Get all tasks by type. This is slower than accessing :attr:`tasks_by_type`, but will be ordered by time. Returns: Generator: giving ``(uuid, Task)`` pairs. """returnislice(((uuid,task)foruuid,taskinself.tasks_by_time(reverse=reverse)iftask.name==name),0,limit,)def_tasks_by_worker(self,hostname,limit=None,reverse=True):"""Get all tasks by worker. Slower than accessing :attr:`tasks_by_worker`, but ordered by time. """returnislice(((uuid,task)foruuid,taskinself.tasks_by_time(reverse=reverse)iftask.worker.hostname==hostname),0,limit,)
[文档]deftask_types(self):"""Return a list of all seen task types."""returnsorted(self._seen_types)
[文档]defalive_workers(self):"""Return a list of (seemingly) alive workers."""return(wforwinself.workers.values()ifw.alive)