"""Custom maps, sets, sequences, and other data structures."""importtimefromcollectionsimportOrderedDictas_OrderedDictfromcollectionsimportdequefromcollections.abcimportCallable,Mapping,MutableMapping,MutableSet,Sequencefromheapqimportheapify,heappop,heappushfromitertoolsimportchain,countfromqueueimportEmptyfromtypingimportAny,Dict,Iterable,List# noqafrom.functionalimportfirst,uniqfrom.textimportmatch_casetry:# pypy: dicts are ordered in recent versionsfrom__pypy__importreversed_dictas_dict_is_orderedexceptImportError:_dict_is_ordered=Nonetry:fromdjango.utils.functionalimportLazyObject,LazySettingsexceptImportError:classLazyObject:passLazySettings=LazyObject__all__=('AttributeDictMixin','AttributeDict','BufferMap','ChainMap','ConfigurationView','DictAttribute','Evictable','LimitedSet','Messagebuffer','OrderedDict','force_mapping','lpmerge',)REPR_LIMITED_SET="""\<{name}({size}): maxlen={0.maxlen}, expires={0.expires}, minlen={0.minlen}>\"""
[文档]defforce_mapping(m):# type: (Any) -> Mapping"""Wrap object into supporting the mapping interface if necessary."""ifisinstance(m,(LazyObject,LazySettings)):m=m._wrappedreturnDictAttribute(m)ifnotisinstance(m,Mapping)elsem
[文档]deflpmerge(L,R):# type: (Mapping, Mapping) -> Mapping"""In place left precedent dictionary merge. Keeps values from `L`, if the value in `R` is :const:`None`. """setitem=L.__setitem__[setitem(k,v)fork,vinR.items()ifvisnotNone]returnL
[文档]classOrderedDict(_OrderedDict):"""Dict where insertion order matters."""def_LRUkey(self):# type: () -> Any# return value of od.keys does not support __next__,# but this version will also not create a copy of the list.returnnext(iter(self.keys()))ifnothasattr(_OrderedDict,'move_to_end'):if_dict_is_ordered:# pragma: no coverdefmove_to_end(self,key,last=True):# type: (Any, bool) -> Noneifnotlast:# we don't use this argument, and the only way to# implement this on PyPy seems to be O(n): creating a# copy with the order changed, so we just raise.raiseNotImplementedError('no last=True on PyPy')self[key]=self.pop(key)else:defmove_to_end(self,key,last=True):# type: (Any, bool) -> Nonelink=self._OrderedDict__map[key]link_prev=link[0]link_next=link[1]link_prev[1]=link_nextlink_next[0]=link_prevroot=self._OrderedDict__rootiflast:last=root[0]link[0]=lastlink[1]=rootlast[1]=root[0]=linkelse:first_node=root[1]link[0]=rootlink[1]=first_noderoot[1]=first_node[0]=link
[文档]classAttributeDictMixin:"""Mixin for Mapping interface that adds attribute access. I.e., `d.key -> d[key]`). """def__getattr__(self,k):# type: (str) -> Any"""`d.key -> d[key]`."""try:returnself[k]exceptKeyError:raiseAttributeError(f'{type(self).__name__!r} object has no attribute {k!r}')def__setattr__(self,key:str,value)->None:"""`d[key] = value -> d.key = value`."""self[key]=value
[文档]classAttributeDict(dict,AttributeDictMixin):"""Dict subclass with attribute access."""
[文档]classChainMap(MutableMapping):"""Key lookup on a sequence of maps."""key_t=Nonechanges=Nonedefaults=Nonemaps=None_observers=()def__init__(self,*maps,**kwargs):# type: (*Mapping, **Any) -> Nonemaps=list(mapsor[{}])self.__dict__.update(key_t=kwargs.get('key_t'),maps=maps,changes=maps[0],defaults=maps[1:],_observers=[],)
[文档]defpop(self,key,*default):# type: (Any, *Any) -> Anytry:returnself.maps[0].pop(key,*default)exceptKeyError:raiseKeyError(f'Key not found in the first mapping: {key!r}')
def__missing__(self,key):# type: (Any) -> AnyraiseKeyError(key)def_key(self,key):# type: (Any) -> Anyreturnself.key_t(key)ifself.key_tisnotNoneelsekeydef__getitem__(self,key):# type: (Any) -> Any_key=self._key(key)formappinginself.maps:try:returnmapping[_key]exceptKeyError:passreturnself.__missing__(key)def__setitem__(self,key,value):# type: (Any, Any) -> Noneself.changes[self._key(key)]=valuedef__delitem__(self,key):# type: (Any) -> Nonetry:delself.changes[self._key(key)]exceptKeyError:raiseKeyError(f'Key not found in first mapping: {key!r}')
[文档]@classmethoddeffromkeys(cls,iterable,*args):# type: (type, Iterable, *Any) -> 'ChainMap'"""Create a ChainMap with a single dict created from the iterable."""returncls(dict.fromkeys(iterable,*args))
__copy__=copy# Py2def_iter(self,op):# type: (Callable) -> Iterable# defaults must be first in the stream, so values in# changes take precedence.# pylint: disable=bad-reversed-sequence# Someone should teach pylint about properties.returnchain(*(op(d)fordinreversed(self.maps)))def_iterate_keys(self):# type: () -> Iterablereturnuniq(self._iter(lambdad:d.keys()))iterkeys=_iterate_keysdef_iterate_items(self):# type: () -> Iterablereturn((key,self[key])forkeyinself)iteritems=_iterate_itemsdef_iterate_values(self):# type: () -> Iterablereturn(self[key]forkeyinself)itervalues=_iterate_values
[文档]classConfigurationView(ChainMap,AttributeDictMixin):"""A view over an applications configuration dictionaries. Custom (but older) version of :class:`collections.ChainMap`. If the key does not exist in ``changes``, the ``defaults`` dictionaries are consulted. Arguments: changes (Mapping): Map of configuration changes. defaults (List[Mapping]): List of dictionaries containing the default configuration. """def__init__(self,changes,defaults=None,keys=None,prefix=None):# type: (Mapping, Mapping, List[str], str) -> Nonedefaults=[]ifdefaultsisNoneelsedefaultssuper().__init__(changes,*defaults)self.__dict__.update(prefix=prefix.rstrip('_')+'_'ifprefixelseprefix,_keys=keys,)def_to_keys(self,key):# type: (str) -> Sequence[str]prefix=self.prefixifprefix:pkey=prefix+keyifnotkey.startswith(prefix)elsekeyreturnmatch_case(pkey,prefix),keyreturnkey,def__getitem__(self,key):# type: (str) -> Anykeys=self._to_keys(key)getitem=super().__getitem__forkinkeys+(tuple(f(key)forfinself._keys)ifself._keyselse()):try:returngetitem(k)exceptKeyError:passtry:# support subclasses implementing __missing__returnself.__missing__(key)exceptKeyError:iflen(keys)>1:raiseKeyError('Key not found: {0!r} (with prefix: {0!r})'.format(*keys))raisedef__setitem__(self,key,value):# type: (str, Any) -> Anyself.changes[self._key(key)]=value
[文档]classLimitedSet:"""Kind-of Set (or priority queue) with limitations. Good for when you need to test for membership (`a in set`), but the set should not grow unbounded. ``maxlen`` is enforced at all times, so if the limit is reached we'll also remove non-expired items. You can also configure ``minlen``: this is the minimal residual size of the set. All arguments are optional, and no limits are enabled by default. Arguments: maxlen (int): Optional max number of items. Adding more items than ``maxlen`` will result in immediate removal of items sorted by oldest insertion time. expires (float): TTL for all items. Expired items are purged as keys are inserted. minlen (int): Minimal residual size of this set. .. versionadded:: 4.0 Value must be less than ``maxlen`` if both are configured. Older expired items will be deleted, only after the set exceeds ``minlen`` number of items. data (Sequence): Initial data to initialize set with. Can be an iterable of ``(key, value)`` pairs, a dict (``{key: insertion_time}``), or another instance of :class:`LimitedSet`. Example: >>> s = LimitedSet(maxlen=50000, expires=3600, minlen=4000) >>> for i in range(60000): ... s.add(i) ... s.add(str(i)) ... >>> 57000 in s # last 50k inserted values are kept True >>> '10' in s # '10' did expire and was purged from set. False >>> len(s) # maxlen is reached 50000 >>> s.purge(now=time.monotonic() + 7200) # clock + 2 hours >>> len(s) # now only minlen items are cached 4000 >>>> 57000 in s # even this item is gone now False """max_heap_percent_overload=15def__init__(self,maxlen=0,expires=0,data=None,minlen=0):# type: (int, float, Mapping, int) -> Noneself.maxlen=0ifmaxlenisNoneelsemaxlenself.minlen=0ifminlenisNoneelseminlenself.expires=0ifexpiresisNoneelseexpiresself._data={}self._heap=[]ifdata:# import items from dataself.update(data)ifnotself.maxlen>=self.minlen>=0:raiseValueError('minlen must be a positive number, less or equal to maxlen.')ifself.expires<0:raiseValueError('expires cannot be negative!')def_refresh_heap(self):# type: () -> None"""Time consuming recreating of heap. Don't run this too often."""self._heap[:]=[entryforentryinself._data.values()]heapify(self._heap)def_maybe_refresh_heap(self):# type: () -> Noneifself._heap_overload>=self.max_heap_percent_overload:self._refresh_heap()
[文档]defclear(self):# type: () -> None"""Clear all data, start from scratch again."""self._data.clear()self._heap[:]=[]
[文档]defadd(self,item,now=None):# type: (Any, float) -> None"""Add a new item, or reset the expiry time of an existing item."""now=nowortime.monotonic()ifiteminself._data:self.discard(item)entry=(now,item)self._data[item]=entryheappush(self._heap,entry)ifself.maxlenandlen(self._data)>=self.maxlen:self.purge()
[文档]defupdate(self,other):# type: (Iterable) -> None"""Update this set from other LimitedSet, dict or iterable."""ifnotother:returnifisinstance(other,LimitedSet):self._data.update(other._data)self._refresh_heap()self.purge()elifisinstance(other,dict):# revokes are sent as a dictforkey,insertedinother.items():ifisinstance(inserted,(tuple,list)):# in case someone uses ._data directly for sending updateinserted=inserted[0]ifnotisinstance(inserted,float):raiseValueError('Expecting float timestamp, got type 'f'{type(inserted)!r} with value: {inserted}')self.add(key,inserted)else:# XXX AVOID THIS, it could keep old data if more parties# exchange them all over and over againforobjinother:self.add(obj)
[文档]defdiscard(self,item):# type: (Any) -> None# mark an existing item as removed. If KeyError is not found, pass.self._data.pop(item,None)self._maybe_refresh_heap()
pop_value=discard
[文档]defpurge(self,now=None):# type: (float) -> None"""Check oldest items and remove them if needed. Arguments: now (float): Time of purging -- by default right now. This can be useful for unit testing. """now=nowortime.monotonic()now=now()ifisinstance(now,Callable)elsenowifself.maxlen:whilelen(self._data)>self.maxlen:self.pop()# time based expiring:ifself.expires:whilelen(self._data)>self.minlen>=0:inserted_time,_=self._heap[0]ifinserted_time+self.expires>now:break# oldest item hasn't expired yetself.pop()
[文档]defpop(self,default:Any=None)->Any:"""Remove and return the oldest item, or :const:`None` when empty."""whileself._heap:_,item=heappop(self._heap)try:self._data.pop(item)exceptKeyError:passelse:returnitemreturndefault
[文档]defas_dict(self):# type: () -> Dict"""Whole set as serializable dictionary. Example: >>> s = LimitedSet(maxlen=200) >>> r = LimitedSet(maxlen=200) >>> for i in range(500): ... s.add(i) ... >>> r.update(s.as_dict()) >>> r == s True """return{key:insertedforinserted,keyinself._data.values()}
def__eq__(self,other):# type: (Any) -> boolreturnself._data==other._datadef__repr__(self):# type: () -> strreturnREPR_LIMITED_SET.format(self,name=type(self).__name__,size=len(self),)def__iter__(self):# type: () -> Iterablereturn(ifor_,iinsorted(self._data.values()))def__len__(self):# type: () -> intreturnlen(self._data)def__contains__(self,key):# type: (Any) -> boolreturnkeyinself._datadef__reduce__(self):# type: () -> Anyreturnself.__class__,(self.maxlen,self.expires,self.as_dict(),self.minlen)def__bool__(self):# type: () -> boolreturnbool(self._data)__nonzero__=__bool__# Py2@propertydef_heap_overload(self):# type: () -> float"""Compute how much is heap bigger than data [percents]."""returnlen(self._heap)*100/max(len(self._data),1)-100
MutableSet.register(LimitedSet)
[文档]classEvictable:"""Mixin for classes supporting the ``evict`` method."""Empty=Empty
[文档]defevict(self)->None:"""Force evict until maxsize is enforced."""self._evict(range=count)
[文档]deftake(self,key,*default):# type: (Any, *Any) -> Anyitem,throw=None,Falsetry:buf=self[key]exceptKeyError:throw=Trueelse:try:item=buf.take()self.total-=1exceptself.Empty:throw=Trueelse:self.move_to_end(key)# mark as LRUifthrow:ifdefault:returndefault[0]raiseself.Empty()returnitem
def_get_or_create_buffer(self,key):# type: (Any) -> Messagebuffertry:returnself[key]exceptKeyError:buf=self[key]=self._new_buffer()returnbufdef_new_buffer(self):# type: () -> Messagebufferreturnself.Buffer(maxsize=self.bufmaxsize)def_LRUpop(self,*default):# type: (*Any) -> Anyreturnself[self._LRUkey()].take(*default)def_pop_to_evict(self):# type: () -> Nonefor_inrange(100):key=self._LRUkey()buf=self[key]try:buf.take()except(IndexError,self.Empty):# buffer empty, remove it from mapping.self.pop(key)else:# we removed one itemself.total-=1# if buffer is empty now, remove it from mapping.ifnotlen(buf):self.pop(key)else:# move to least recently used.self.move_to_end(key)breakdef__repr__(self):# type: () -> strreturnf'<{type(self).__name__}: {self.total}/{self.maxsize}>'@propertydef_evictcount(self):# type: () -> intreturnself.total