"""Internal worker state (global).This includes the currently active and reserved tasks,statistics, and revoked tasks."""importosimportplatformimportshelveimportsysimportweakrefimportzlibfromcollectionsimportCounterfromkombu.serializationimportpickle,pickle_protocolfromkombu.utils.objectsimportcached_propertyfromceleryimport__version__fromcelery.exceptionsimportWorkerShutdown,WorkerTerminatefromcelery.utils.collectionsimportLimitedSet__all__=('SOFTWARE_INFO','reserved_requests','active_requests','total_count','revoked','task_reserved','maybe_shutdown','task_accepted','task_ready','Persistent',)#: Worker software/platform information.SOFTWARE_INFO={'sw_ident':'py-celery','sw_ver':__version__,'sw_sys':platform.system(),}#: maximum number of revokes to keep in memory.REVOKES_MAX=int(os.environ.get('CELERY_WORKER_REVOKES_MAX',50000))#: maximum number of successful tasks to keep in memory.SUCCESSFUL_MAX=int(os.environ.get('CELERY_WORKER_SUCCESSFUL_MAX',1000))#: how many seconds a revoke will be active before#: being expired when the max limit has been exceeded.REVOKE_EXPIRES=float(os.environ.get('CELERY_WORKER_REVOKE_EXPIRES',10800))#: how many seconds a successful task will be cached in memory#: before being expired when the max limit has been exceeded.SUCCESSFUL_EXPIRES=float(os.environ.get('CELERY_WORKER_SUCCESSFUL_EXPIRES',10800))#: Mapping of reserved task_id->Request.requests={}#: set of all reserved :class:`~celery.worker.request.Request`'s.reserved_requests=weakref.WeakSet()#: set of currently active :class:`~celery.worker.request.Request`'s.active_requests=weakref.WeakSet()#: A limited set of successful :class:`~celery.worker.request.Request`'s.successful_requests=LimitedSet(maxlen=SUCCESSFUL_MAX,expires=SUCCESSFUL_EXPIRES)#: count of tasks accepted by the worker, sorted by type.total_count=Counter()#: count of all tasks accepted by the workerall_total_count=[0]#: the list of currently revoked tasks. Persistent if ``statedb`` set.revoked=LimitedSet(maxlen=REVOKES_MAX,expires=REVOKE_EXPIRES)#: Mapping of stamped headers flagged for revoking.revoked_stamps={}should_stop=Noneshould_terminate=Nonedefreset_state():requests.clear()reserved_requests.clear()active_requests.clear()successful_requests.clear()total_count.clear()all_total_count[:]=[0]revoked.clear()revoked_stamps.clear()
[文档]defmaybe_shutdown():"""Shutdown if flags have been set."""ifshould_terminateisnotNoneandshould_terminateisnotFalse:raiseWorkerTerminate(should_terminate)elifshould_stopisnotNoneandshould_stopisnotFalse:raiseWorkerShutdown(should_stop)
deftask_reserved(request,add_request=requests.__setitem__,add_reserved_request=reserved_requests.add):"""Update global state when a task has been reserved."""add_request(request.id,request)add_reserved_request(request)
[文档]deftask_accepted(request,_all_total_count=None,add_request=requests.__setitem__,add_active_request=active_requests.add,add_to_total_count=total_count.update):"""Update global state when a task has been accepted."""ifnot_all_total_count:_all_total_count=all_total_countadd_request(request.id,request)add_active_request(request)add_to_total_count({request.name:1})all_total_count[0]+=1
deftask_ready(request,successful=False,remove_request=requests.pop,discard_active_request=active_requests.discard,discard_reserved_request=reserved_requests.discard):"""Update global state when a task is ready."""ifsuccessful:successful_requests.add(request.id)remove_request(request.id,None)discard_active_request(request)discard_reserved_request(request)C_BENCH=os.environ.get('C_BENCH')oros.environ.get('CELERY_BENCH')C_BENCH_EVERY=int(os.environ.get('C_BENCH_EVERY')oros.environ.get('CELERY_BENCH_EVERY')or1000)ifC_BENCH:# pragma: no coverimportatexitfromtimeimportmonotonicfrombilliard.processimportcurrent_processfromcelery.utils.debugimportmemdump,sample_memall_count=0bench_first=Nonebench_start=Nonebench_last=Nonebench_every=C_BENCH_EVERYbench_sample=[]__reserved=task_reserved__ready=task_readyifcurrent_process()._name=='MainProcess':@atexit.registerdefon_shutdown():ifbench_firstisnotNoneandbench_lastisnotNone:print('- Time spent in benchmark: {!r}'.format(bench_last-bench_first))print('- Avg: {}'.format(sum(bench_sample)/len(bench_sample)))memdump()
[文档]deftask_reserved(request):"""Called when a task is reserved by the worker."""globalbench_startglobalbench_firstnow=Noneifbench_startisNone:bench_start=now=monotonic()ifbench_firstisNone:bench_first=nowreturn__reserved(request)
[文档]deftask_ready(request):"""Called when a task is completed."""globalall_countglobalbench_startglobalbench_lastall_count+=1ifnotall_count%bench_every:now=monotonic()diff=now-bench_startprint('- Time spent processing {} tasks (since first ''task received): ~{:.4f}s\n'.format(bench_every,diff))sys.stdout.flush()bench_start=bench_last=nowbench_sample.append(diff)sample_mem()return__ready(request)
[文档]classPersistent:"""Stores worker state between restarts. This is the persistent data stored by the worker when :option:`celery worker --statedb` is enabled. Currently only stores revoked task id's. """storage=shelveprotocol=pickle_protocolcompress=zlib.compressdecompress=zlib.decompress_is_open=Falsedef__init__(self,state,filename,clock=None):self.state=stateself.filename=filenameself.clock=clockself.merge()
def_merge_with(self,d):self._merge_revoked(d)self._merge_clock(d)returnddef_sync_with(self,d):self._revoked_tasks.purge()d.update({'__proto__':3,'zrevoked':self.compress(self._dumps(self._revoked_tasks)),'clock':self.clock.forward()ifself.clockelse0,})returnddef_merge_clock(self,d):ifself.clock:d['clock']=self.clock.adjust(d.get('clock')or0)def_merge_revoked(self,d):try:self._merge_revoked_v3(d['zrevoked'])exceptKeyError:try:self._merge_revoked_v2(d.pop('revoked'))exceptKeyError:pass# purge expired items at bootself._revoked_tasks.purge()def_merge_revoked_v3(self,zrevoked):ifzrevoked:self._revoked_tasks.update(pickle.loads(self.decompress(zrevoked)))def_merge_revoked_v2(self,saved):ifnotisinstance(saved,LimitedSet):# (pre 3.0.18) used to be stored as a dictreturnself._merge_revoked_v1(saved)self._revoked_tasks.update(saved)def_merge_revoked_v1(self,saved):add=self._revoked_tasks.addforiteminsaved:add(item)def_dumps(self,obj):returnpickle.dumps(obj,protocol=self.protocol)@propertydef_revoked_tasks(self):returnself.state.revoked@cached_propertydefdb(self):self._is_open=Truereturnself.open()