[文档]defto_timestamp(d,default_timezone=ZoneInfo("UTC"),time=monotonic):"""Convert datetime to timestamp. If d' is already a timestamp, then that will be used. """ifisinstance(d,datetime):ifd.tzinfoisNone:d=d.replace(tzinfo=default_timezone)diff=_time()-time()returnmax((d-EPOCH).total_seconds()-diff,0)returnd
[文档]@total_orderingclassEntry:"""Schedule Entry."""ifnotIS_PYPY:# pragma: no cover__slots__=('fun','args','kwargs','tref','canceled','_last_run','__weakref__',)def__init__(self,fun,args=None,kwargs=None):self.fun=funself.args=argsor[]self.kwargs=kwargsor{}self.tref=weakrefproxy(self)self._last_run=Noneself.canceled=Falsedef__call__(self):returnself.fun(*self.args,**self.kwargs)
[文档]defcancel(self):try:self.tref.canceled=TrueexceptReferenceError:# pragma: no coverpass
def__repr__(self):return'<TimerEntry: {}(*{!r}, **{!r})'.format(self.fun.__name__,self.args,self.kwargs)# must not use hash() to order entriesdef__lt__(self,other):returnid(self)<id(other)@propertydefcancelled(self):returnself.canceled@cancelled.setterdefcancelled(self,value):self.canceled=value
classTimer:"""Async timer implementation."""Entry=Entryon_error=Nonedef__init__(self,max_interval=None,on_error=None,**kwargs):self.max_interval=float(max_intervalorDEFAULT_MAX_INTERVAL)self.on_error=on_errororself.on_errorself._queue=[]def__enter__(self):returnselfdef__exit__(self,exc_type:type[BaseException]|None,exc_val:BaseException|None,exc_tb:TracebackType|None)->None:self.stop()defcall_at(self,eta,fun,args=(),kwargs=None,priority=0):kwargs={}ifnotkwargselsekwargsreturnself.enter_at(self.Entry(fun,args,kwargs),eta,priority)defcall_after(self,secs,fun,args=(),kwargs=None,priority=0):kwargs={}ifnotkwargselsekwargsreturnself.enter_after(secs,self.Entry(fun,args,kwargs),priority)defcall_repeatedly(self,secs,fun,args=(),kwargs=None,priority=0):kwargs={}ifnotkwargselsekwargstref=self.Entry(fun,args,kwargs)@wraps(fun)def_reschedules(*args,**kwargs):last,now=tref._last_run,monotonic()lsince=(now-tref._last_run)iflastelsesecstry:iflsinceandlsince>=secs:tref._last_run=nowreturnfun(*args,**kwargs)finally:ifnottref.canceled:last=tref._last_runnext=secs-(now-last)iflastelsesecsself.enter_after(next,tref,priority)tref.fun=_reschedulestref._last_run=Nonereturnself.enter_after(secs,tref,priority)defenter_at(self,entry,eta=None,priority=0,time=monotonic):"""Enter function into the scheduler. Arguments: --------- entry (~kombu.asynchronous.timer.Entry): Item to enter. eta (datetime.datetime): Scheduled time. priority (int): Unused. """ifetaisNone:eta=time()ifisinstance(eta,datetime):try:eta=to_timestamp(eta)exceptExceptionasexc:ifnotself.handle_error(exc):raisereturnreturnself._enter(eta,priority,entry)defenter_after(self,secs,entry,priority=0,time=monotonic):returnself.enter_at(entry,time()+float(secs),priority)def_enter(self,eta,priority,entry,push=heapq.heappush):push(self._queue,scheduled(eta,priority,entry))returnentrydefapply_entry(self,entry):try:entry()exceptExceptionasexc:ifnotself.handle_error(exc):logger.error('Error in timer: %r',exc,exc_info=True)defhandle_error(self,exc_info):ifself.on_error:self.on_error(exc_info)returnTruedefstop(self):passdef__iter__(self,min=min,nowfun=monotonic,pop=heapq.heappop,push=heapq.heappush):"""Iterate over schedule. This iterator yields a tuple of ``(wait_seconds, entry)``, where if entry is :const:`None` the caller should wait for ``wait_seconds`` until it polls the schedule again. """max_interval=self.max_intervalqueue=self._queuewhile1:ifqueue:eventA=queue[0]now,eta=nowfun(),eventA[0]ifnow<eta:yieldmin(eta-now,max_interval),Noneelse:eventB=pop(queue)ifeventBiseventA:entry=eventA[2]ifnotentry.canceled:yieldNone,entrycontinueelse:push(queue,eventB)else:yieldNone,Nonedefclear(self):self._queue[:]=[]# atomic, without creating a new list.defcancel(self,tref):tref.cancel()def__len__(self):returnlen(self._queue)def__nonzero__(self):returnTrue@propertydefqueue(self,_pop=heapq.heappop):"""Snapshot of underlying datastructure."""events=list(self._queue)return[_pop(v)forvin[events]*len(events)]@propertydefschedule(self):returnself