"""Gevent execution pool."""importfunctoolsimporttypesfromtimeimportmonotonicfromkombu.asynchronousimporttimeras_timerfrom.importbasetry:fromgeventimportTimeoutexceptImportError:Timeout=None__all__=('TaskPool',)# pylint: disable=redefined-outer-name# We cache globals and attribute lookups, so disable this warning.defapply_target(target,args=(),kwargs=None,callback=None,accept_callback=None,getpid=None,**_):kwargs={}ifnotkwargselsekwargsreturnbase.apply_target(target,args,kwargs,callback,accept_callback,pid=getpid(),**_)defapply_timeout(target,args=(),kwargs=None,callback=None,accept_callback=None,getpid=None,timeout=None,timeout_callback=None,Timeout=Timeout,apply_target=base.apply_target,**rest):kwargs={}ifnotkwargselsekwargstry:withTimeout(timeout):returnapply_target(target,args,kwargs,callback,accept_callback,getpid(),propagate=(Timeout,),**rest)exceptTimeout:returntimeout_callback(False,timeout)classTimer(_timer.Timer):def__init__(self,*args,**kwargs):fromgeventimportGreenlet,GreenletExitclass_Greenlet(Greenlet):cancel=Greenlet.killself._Greenlet=_Greenletself._GreenletExit=GreenletExitsuper().__init__(*args,**kwargs)self._queue=set()def_enter(self,eta,priority,entry,**kwargs):secs=max(eta-monotonic(),0)g=self._Greenlet.spawn_later(secs,entry)self._queue.add(g)g.link(self._entry_exit)g.entry=entryg.eta=etag.priority=priorityg.canceled=Falsereturngdef_entry_exit(self,g):try:g.kill()finally:self._queue.discard(g)defclear(self):queue=self._queuewhilequeue:try:queue.pop().kill()exceptKeyError:pass@propertydefqueue(self):returnself._queue