"""Eventlet execution pool."""importsysfromtimeimportmonotonicfromgreenletimportGreenletExitfromkombu.asynchronousimporttimeras_timerfromceleryimportsignalsfrom.importbase__all__=('TaskPool',)W_RACE="""\Celery module with %s imported before eventlet patched\"""RACE_MODS=('billiard.','celery.','kombu.')#: Warn if we couldn't patch early enough,#: and thread/socket depending celery modules have already been loaded.formodin(modformodinsys.modulesifmod.startswith(RACE_MODS)):forsidein('thread','threading','socket'):# pragma: no coverifgetattr(mod,side,None):importwarningswarnings.warn(RuntimeWarning(W_RACE%side))defapply_target(target,args=(),kwargs=None,callback=None,accept_callback=None,getpid=None):kwargs={}ifnotkwargselsekwargsreturnbase.apply_target(target,args,kwargs,callback,accept_callback,pid=getpid())classTimer(_timer.Timer):"""Eventlet Timer."""def__init__(self,*args,**kwargs):fromeventlet.greenthreadimportspawn_afterfromgreenletimportGreenletExitsuper().__init__(*args,**kwargs)self.GreenletExit=GreenletExitself._spawn_after=spawn_afterself._queue=set()def_enter(self,eta,priority,entry,**kwargs):secs=max(eta-monotonic(),0)g=self._spawn_after(secs,entry)self._queue.add(g)g.link(self._entry_exit,entry)g.entry=entryg.eta=etag.priority=priorityg.canceled=Falsereturngdef_entry_exit(self,g,entry):try:try:g.wait()exceptself.GreenletExit:entry.cancel()g.canceled=Truefinally:self._queue.discard(g)defclear(self):queue=self._queuewhilequeue:try:queue.pop().cancel()except(KeyError,self.GreenletExit):passdefcancel(self,tref):try:tref.cancel()exceptself.GreenletExit:pass@propertydefqueue(self):returnself._queue