"""Scheduler for Python functions... note:: This is used for the thread-based worker only, not for amqp/redis/sqs/qpid where :mod:`kombu.asynchronous.timer` is used."""importosimportsysimportthreadingfromitertoolsimportcountfromthreadingimportTIMEOUT_MAXasTHREAD_TIMEOUT_MAXfromtimeimportsleepfromtypingimportAny,Callable,Iterator,Optional,Tuplefromkombu.asynchronous.timerimportEntryfromkombu.asynchronous.timerimportTimerasSchedulefromkombu.asynchronous.timerimportlogger,to_timestampTIMER_DEBUG=os.environ.get('TIMER_DEBUG')__all__=('Entry','Schedule','Timer','to_timestamp')
[文档]classTimer(threading.Thread):"""Timer thread. Note: This is only used for transports not supporting AsyncIO. """Entry=EntrySchedule=Schedulerunning:bool=Falseon_tick:Optional[Callable[[float],None]]=None_timer_count:count=count(1)ifTIMER_DEBUG:# pragma: no coverdefstart(self,*args:Any,**kwargs:Any)->None:importtracebackprint('- Timer starting')traceback.print_stack()super().start(*args,**kwargs)def__init__(self,schedule:Optional[Schedule]=None,on_error:Optional[Callable[[Exception],None]]=None,on_tick:Optional[Callable[[float],None]]=None,on_start:Optional[Callable[['Timer'],None]]=None,max_interval:Optional[float]=None,**kwargs:Any)->None:self.schedule=scheduleorself.Schedule(on_error=on_error,max_interval=max_interval)self.on_start=on_startself.on_tick=on_tickorself.on_ticksuper().__init__()# `_is_stopped` is likely to be an attribute on `Thread` objects so we# double underscore these names to avoid shadowing anything and# potentially getting confused by the superclass turning these into# something other than an `Event` instance (e.g. a `bool`)self.__is_shutdown=threading.Event()self.__is_stopped=threading.Event()self.mutex=threading.Lock()self.not_empty=threading.Condition(self.mutex)self.daemon=Trueself.name=f'Timer-{next(self._timer_count)}'def_next_entry(self)->Optional[float]:withself.not_empty:delay:Optional[float]entry:Optional[Entry]delay,entry=next(self.scheduler)ifentryisNone:ifdelayisNone:self.not_empty.wait(1.0)returndelayreturnself.schedule.apply_entry(entry)__next__=next=_next_entry# for 2to3
[文档]defrun(self)->None:try:self.running=Trueself.scheduler:Iterator[Tuple[Optional[float],Optional[Entry]]]=iter(self.schedule)whilenotself.__is_shutdown.is_set():delay=self._next_entry()ifdelay:ifself.on_tick:self.on_tick(delay)ifsleepisNone:# pragma: no coverbreaksleep(delay)try:self.__is_stopped.set()exceptTypeError:# pragma: no cover# we lost the race at interpreter shutdown,# so gc collected built-in modules.passexceptExceptionasexc:logger.error('Thread Timer crashed: %r',exc,exc_info=True)sys.stderr.flush()os._exit(1)