celery.utils.collections

Custom maps, sets, sequences, and other data structures.

class celery.utils.collections.AttributeDict[源代码]

Dict subclass with attribute access.

class celery.utils.collections.AttributeDictMixin[源代码]

Mixin for Mapping interface that adds attribute access.

I.e., d.key -> d[key]).

class celery.utils.collections.BufferMap(maxsize: int, iterable: Iterable = None, bufmaxsize: int = 1000)[源代码]

Map of buffers.

Buffer

Messagebuffer 的别名

exception Empty

Exception raised by Queue.get(block=0)/get_nowait().

bufmaxsize = None
extend(key: Any, it: Iterable) None[源代码]
maxsize = None
put(key: Any, item: Any) None[源代码]
take(key: Any, *default: Any) Any[源代码]
total = 0
class celery.utils.collections.ChainMap(*maps: Mapping, **kwargs: Any)[源代码]

Key lookup on a sequence of maps.

add_defaults(d: Mapping) None[源代码]
bind_to(callback)[源代码]
changes = None
clear() None.  Remove all items from D.[源代码]
copy() ChainMap[源代码]
defaults = None
classmethod fromkeys(iterable: type, *args: Iterable) ChainMap[源代码]

Create a ChainMap with a single dict created from the iterable.

get(k[, d]) D[k] if k in D, else d.  d defaults to None.[源代码]
items() a set-like object providing a view on D's items
iteritems() Iterable
iterkeys() Iterable
itervalues() Iterable
key_t = None
keys() a set-like object providing a view on D's keys
maps = None
pop(k[, d]) v, remove specified key and return the corresponding value.[源代码]

If key is not found, d is returned if given, otherwise KeyError is raised.

setdefault(k[, d]) D.get(k,d), also set D[k]=d if k not in D[源代码]
update([E, ]**F) None.  Update D from mapping/iterable E and F.[源代码]

If E present and has a .keys() method, does: for k in E.keys(): D[k] = E[k] If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v In either case, this is followed by: for k, v in F.items(): D[k] = v

values() an object providing a view on D's values
class celery.utils.collections.ConfigurationView(changes: Mapping, defaults: Mapping = None, keys: List[str] = None, prefix: str = None)[源代码]

A view over an applications configuration dictionaries.

Custom (but older) version of collections.ChainMap.

If the key does not exist in changes, the defaults dictionaries are consulted.

参数:
  • changes (Mapping) -- Map of configuration changes.

  • defaults (List[Mapping]) -- List of dictionaries containing the default configuration.

clear() None[源代码]

Remove all changes, but keep defaults.

first(*keys: str) Any[源代码]
get(k[, d]) D[k] if k in D, else d.  d defaults to None.[源代码]
swap_with(other: ConfigurationView) None[源代码]
class celery.utils.collections.DictAttribute(obj: Any)[源代码]

Dict interface to attributes.

obj[k] -> obj.k obj[k] = val -> obj.k = val

get(key: Any, default: Any = None) Any[源代码]
items() Iterable
iteritems() Iterable
iterkeys() Iterable
itervalues() Iterable
keys() Iterable
obj = None
setdefault(key: Any, default: Any = None) None[源代码]
values() Iterable
class celery.utils.collections.Evictable[源代码]

Mixin for classes supporting the evict method.

exception Empty

Exception raised by Queue.get(block=0)/get_nowait().

evict() None[源代码]

Force evict until maxsize is enforced.

class celery.utils.collections.LimitedSet(maxlen: int = 0, expires: float = 0, data: Mapping = None, minlen: int = 0)[源代码]

Kind-of Set (or priority queue) with limitations.

Good for when you need to test for membership (a in set), but the set should not grow unbounded.

maxlen is enforced at all times, so if the limit is reached we'll also remove non-expired items.

You can also configure minlen: this is the minimal residual size of the set.

All arguments are optional, and no limits are enabled by default.

参数:
  • maxlen (int) -- Optional max number of items. Adding more items than maxlen will result in immediate removal of items sorted by oldest insertion time.

  • expires (float) -- TTL for all items. Expired items are purged as keys are inserted.

  • minlen (int) --

    Minimal residual size of this set. .. versionadded:: 4.0

    Value must be less than maxlen if both are configured.

    Older expired items will be deleted, only after the set exceeds minlen number of items.

  • data (Sequence) -- Initial data to initialize set with. Can be an iterable of (key, value) pairs, a dict ({key: insertion_time}), or another instance of LimitedSet.

示例

>>> s = LimitedSet(maxlen=50000, expires=3600, minlen=4000)
>>> for i in range(60000):
...     s.add(i)
...     s.add(str(i))
...
>>> 57000 in s  # last 50k inserted values are kept
True
>>> '10' in s  # '10' did expire and was purged from set.
False
>>> len(s)  # maxlen is reached
50000
>>> s.purge(now=time.monotonic() + 7200)  # clock + 2 hours
>>> len(s)  # now only minlen items are cached
4000
>>>> 57000 in s  # even this item is gone now
False
add(item: Any, now: float = None) None[源代码]

Add a new item, or reset the expiry time of an existing item.

as_dict() Dict[源代码]

Whole set as serializable dictionary.

示例

>>> s = LimitedSet(maxlen=200)
>>> r = LimitedSet(maxlen=200)
>>> for i in range(500):
...     s.add(i)
...
>>> r.update(s.as_dict())
>>> r == s
True
clear() None[源代码]

Clear all data, start from scratch again.

discard(item: Any) None[源代码]
max_heap_percent_overload = 15
pop(default: Any = None) Any[源代码]

Remove and return the oldest item, or None when empty.

pop_value(item: Any) None
purge(now: float = None) None[源代码]

Check oldest items and remove them if needed.

参数:

now (float) -- Time of purging -- by default right now. This can be useful for unit testing.

update(other: Iterable) None[源代码]

Update this set from other LimitedSet, dict or iterable.

class celery.utils.collections.Messagebuffer(maxsize: int, iterable: ~typing.Iterable = None, deque: ~typing.Any = <class 'collections.deque'>)[源代码]

A buffer of pending messages.

exception Empty

Exception raised by Queue.get(block=0)/get_nowait().

extend(it: Iterable) None[源代码]
put(item: Any) None[源代码]
take(*default: Any) Any[源代码]
class celery.utils.collections.OrderedDict[源代码]

Dict where insertion order matters.

celery.utils.collections.force_mapping(m: Any) Mapping[源代码]

Wrap object into supporting the mapping interface if necessary.

celery.utils.collections.lpmerge(L: Mapping, R: Mapping) Mapping[源代码]

In place left precedent dictionary merge.

Keeps values from L, if the value in R is None.