[文档]classLRUCache(UserDict):"""LRU Cache implementation using a doubly linked list to track access. Arguments: --------- limit (int): The maximum number of keys to keep in the cache. When a new key is inserted and the limit has been exceeded, the *Least Recently Used* key will be discarded from the cache. """def__init__(self,limit=None):self.limit=limitself.mutex=threading.RLock()self.data=OrderedDict()def__getitem__(self,key):withself.mutex:value=self[key]=self.data.pop(key)returnvalue
[文档]defupdate(self,*args,**kwargs):withself.mutex:data,limit=self.data,self.limitdata.update(*args,**kwargs)iflimitandlen(data)>limit:# pop additional items in case limit exceededfor_inrange(len(data)-limit):data.popitem(last=False)
def__setitem__(self,key,value):# remove least recently used key.withself.mutex:ifself.limitandlen(self.data)>=self.limit:self.data.pop(next(iter(self.data)))self.data[key]=valuedef__iter__(self):returniter(self.data)def_iterate_items(self):withself.mutex:forkinself:try:yield(k,self.data[k])exceptKeyError:# pragma: no coverpassiteritems=_iterate_itemsdef_iterate_values(self):withself.mutex:forkinself:try:yieldself.data[k]exceptKeyError:# pragma: no coverpassitervalues=_iterate_valuesdef_iterate_keys(self):# userdict.keys in py3k calls __getitem__withself.mutex:returnself.data.keys()iterkeys=_iterate_keys
[文档]defincr(self,key,delta=1):withself.mutex:# this acts as memcached does- store as a string, but return a# integer as long as it exists and we can cast itnewval=int(self.data.pop(key))+deltaself[key]=str(newval)returnnewval
[文档]defmemoize(maxsize=None,keyfun=None,Cache=LRUCache):"""Decorator to cache function return value."""def_memoize(fun):mutex=threading.Lock()cache=Cache(limit=maxsize)@wraps(fun)def_M(*args,**kwargs):ifkeyfun:key=keyfun(args,kwargs)else:key=args+(KEYWORD_MARK,)+tuple(sorted(kwargs.items()))try:withmutex:value=cache[key]exceptKeyError:value=fun(*args,**kwargs)_M.misses+=1withmutex:cache[key]=valueelse:_M.hits+=1returnvaluedefclear():"""Clear the cache and reset cache statistics."""cache.clear()_M.hits=_M.misses=0_M.hits=_M.misses=0_M.clear=clear_M.original_func=funreturn_Mreturn_memoize
[文档]classlazy:"""Holds lazy evaluation. Evaluated when called or if the :meth:`evaluate` method is called. The function is re-evaluated on every call. Overloaded operations that will evaluate the promise: :meth:`__str__`, :meth:`__repr__`, :meth:`__cmp__`. """def__init__(self,fun,*args,**kwargs):self._fun=funself._args=argsself._kwargs=kwargsdef__call__(self):returnself.evaluate()
[文档]defmaybe_evaluate(value):"""Evaluate value only if value is a :class:`lazy` instance."""ifisinstance(value,lazy):returnvalue.evaluate()returnvalue
[文档]defis_list(obj,scalars=(Mapping,str),iters=(Iterable,)):"""Return true if the object is iterable. Note: ---- Returns false if object is a mapping or string. """returnisinstance(obj,iters)andnotisinstance(obj,scalarsor())
[文档]defmaybe_list(obj,scalars=(Mapping,str)):"""Return list of one element if ``l`` is a scalar."""returnobjifobjisNoneoris_list(obj,scalars)else[obj]
[文档]defdictfilter(d=None,**kw):"""Remove all keys from dict ``d`` whose value is :const:`None`."""d=kwifdisNoneelse(dict(d,**kw)ifkwelsed)return{k:vfork,vind.items()ifvisnotNone}
defshufflecycle(it):it=list(it)# don't modify callers listshuffle=random.shufflefor_inrepeat(None):shuffle(it)yieldit[0]deffxrange(start=1.0,stop=None,step=1.0,repeatlast=False):cur=start*1.0while1:ifnotstoporcur<=stop:yieldcurcur+=stepelse:ifnotrepeatlast:breakyieldcur-stepdeffxrangemax(start=1.0,stop=None,step=1.0,max=100.0):sum_,cur=0,start*1.0while1:ifsum_>=max:breakyieldcurifstop:cur=min(cur+step,stop)else:cur+=stepsum_+=curdefretry_over_time(fun,catch,args=None,kwargs=None,errback=None,max_retries=None,interval_start=2,interval_step=2,interval_max=30,callback=None,timeout=None):"""Retry the function over and over until max retries is exceeded. For each retry we sleep a for a while before we try again, this interval is increased for every retry until the max seconds is reached. Arguments: --------- fun (Callable): The function to try catch (Tuple[BaseException]): Exceptions to catch, can be either tuple or a single exception class. Keyword Arguments: ----------------- args (Tuple): Positional arguments passed on to the function. kwargs (Dict): Keyword arguments passed on to the function. errback (Callable): Callback for when an exception in ``catch`` is raised. The callback must take three arguments: ``exc``, ``interval_range`` and ``retries``, where ``exc`` is the exception instance, ``interval_range`` is an iterator which return the time in seconds to sleep next, and ``retries`` is the number of previous retries. max_retries (int): Maximum number of retries before we give up. If neither of this and timeout is set, we will retry forever. If one of this and timeout is reached, stop. interval_start (float): How long (in seconds) we start sleeping between retries. interval_step (float): By how much the interval is increased for each retry. interval_max (float): Maximum number of seconds to sleep between retries. timeout (int): Maximum seconds waiting before we give up. """kwargs={}ifnotkwargselsekwargsargs=[]ifnotargselseargsinterval_range=fxrange(interval_start,interval_max+interval_start,interval_step,repeatlast=True)end=time()+timeoutiftimeoutelseNoneforretriesincount():try:returnfun(*args,**kwargs)exceptcatchasexc:ifmax_retriesisnotNoneandretries>=max_retries:raiseifendandtime()>end:raiseifcallback:callback()tts=float(errback(exc,interval_range,retries)iferrbackelsenext(interval_range))iftts:for_inrange(int(tts)):ifcallback:callback()sleep(1.0)# sleep remainder after int truncation above.sleep(abs(int(tts)-tts))defreprkwargs(kwargs,sep=', ',fmt='{0}={1}'):returnsep.join(fmt.format(k,_safe_repr(v))fork,vinkwargs.items())defreprcall(name,args=(),kwargs=None,sep=', '):kwargs={}ifnotkwargselsekwargsreturn'{}({}{}{})'.format(name,sep.join(map(_safe_repr,argsor())),(argsandkwargs)andsepor'',reprkwargs(kwargs,sep),)defaccepts_argument(func,argument_name):argument_spec=inspect.getfullargspec(func)return(argument_nameinargument_spec.argsorargument_nameinargument_spec.kwonlyargs)# Compat names (before kombu 3.0)promise=lazymaybe_promise=maybe_evaluate