[文档]classmlazy(lazy):"""Memoized lazy evaluation. The function is only evaluated once, every subsequent access will return the same value. """#: Set to :const:`True` after the object has been evaluated.evaluated=False_value=None
[文档]defnoop(*args,**kwargs):"""No operation. Takes any arguments/keyword arguments and does nothing. """
defpass1(arg,*args,**kwargs):"""Return the first positional argument."""returnargdefevaluate_promises(it):forvalueinit:ifisinstance(value,promise):value=value()yieldvalue
[文档]deffirst(predicate,it):"""Return the first element in ``it`` that ``predicate`` accepts. If ``predicate`` is None it will return the first item that's not :const:`None`. """returnnext((vforvinevaluate_promises(it)if(predicate(v)ifpredicateisnotNoneelsevisnotNone)),None,)
[文档]deffirstmethod(method,on_call=None):"""Multiple dispatch. Return a function that with a list of instances, finds the first instance that gives a value for the given method. The list can also contain lazy instances (:class:`~kombu.utils.functional.lazy`.) """def_matcher(it,*args,**kwargs):forobjinit:try:meth=getattr(maybe_evaluate(obj),method)reply=(on_call(meth,*args,**kwargs)ifon_callelsemeth(*args,**kwargs))exceptAttributeError:passelse:ifreplyisnotNone:returnreplyreturn_matcher
[文档]defchunks(it,n):"""Split an iterator into chunks with `n` elements each. Warning: ``it`` must be an actual iterator, if you pass this a concrete sequence will get you repeating elements. So ``chunks(iter(range(1000)), 10)`` is fine, but ``chunks(range(1000), 10)`` is not. Example: # n == 2 >>> x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 2) >>> list(x) [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10]] # n == 3 >>> x = chunks(iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 3) >>> list(x) [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]] """foriteminit:yield[item]+list(islice(it,n-1))
[文档]defpadlist(container,size,default=None):"""Pad list with default elements. Example: >>> first, last, city = padlist(['George', 'Costanza', 'NYC'], 3) ('George', 'Costanza', 'NYC') >>> first, last, city = padlist(['George', 'Costanza'], 3) ('George', 'Costanza', None) >>> first, last, city, planet = padlist( ... ['George', 'Costanza', 'NYC'], 4, default='Earth', ... ) ('George', 'Costanza', 'NYC', 'Earth') """returnlist(container)[:size]+[default]*(size-len(container))
[文档]defmattrgetter(*attrs):"""Get attributes, ignoring attribute errors. Like :func:`operator.itemgetter` but return :const:`None` on missing attributes instead of raising :exc:`AttributeError`. """returnlambdaobj:{attr:getattr(obj,attr,None)forattrinattrs}
[文档]defuniq(it):"""Return all unique elements in ``it``, preserving order."""seen=set()return(seen.add(obj)orobjforobjinitifobjnotinseen)
deflookahead(it):"""Yield pairs of (current, next) items in `it`. `next` is None if `current` is the last item. Example: >>> list(lookahead(x for x in range(6))) [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, None)] """a,b=tee(it)next(b,None)returnzip_longest(a,b)
[文档]defregen(it):"""Convert iterator to an object that can be consumed multiple times. ``Regen`` takes any iterable, and if the object is an generator it will cache the evaluated list on first access, so that the generator can be "consumed" multiple times. """ifisinstance(it,(list,tuple)):returnitreturn_regen(it)
class_regen(UserList,list):# must be subclass of list so that json can encode.def__init__(self,it):# pylint: disable=super-init-not-called# UserList creates a new list and sets .data, so we don't# want to call init here.self.__it=itself.__consumed=[]self.__done=Falsedef__reduce__(self):returnlist,(self.data,)defmap(self,func):self.__consumed=[func(el)forelinself.__consumed]self.__it=map(func,self.__it)def__length_hint__(self):returnself.__it.__length_hint__()def__lookahead_consume(self,limit=None):ifnotself.__doneand(limitisNoneorlimit>0):it=iter(self.__it)try:now=next(it)exceptStopIteration:returnself.__consumed.append(now)# Maintain a single look-ahead to ensure we set `__done` when the# underlying iterator gets exhaustedwhilenotself.__done:try:next_=next(it)self.__consumed.append(next_)exceptStopIteration:self.__done=Truebreakfinally:yieldnownow=next_# We can break out when `limit` is exhaustediflimitisnotNone:limit-=1iflimit<=0:breakdef__iter__(self):yield fromself.__consumedyield fromself.__lookahead_consume()def__getitem__(self,index):ifindex<0:returnself.data[index]# Consume elements up to the desired index prior to attempting to# access it from within `__consumed`consume_count=index-len(self.__consumed)+1for_inself.__lookahead_consume(limit=consume_count):passreturnself.__consumed[index]def__bool__(self):iflen(self.__consumed):returnTruetry:next(iter(self))exceptStopIteration:returnFalseelse:returnTrue@propertydefdata(self):ifnotself.__done:self.__consumed.extend(self.__it)self.__done=Truereturnself.__consumeddef__repr__(self):return"<{}: [{}{}]>".format(self.__class__.__name__,", ".join(repr(e)foreinself.__consumed),"..."ifnotself.__doneelse"",)def_argsfromspec(spec,replace_defaults=True):ifspec.defaults:split=len(spec.defaults)defaults=(list(range(len(spec.defaults)))ifreplace_defaultselsespec.defaults)positional=spec.args[:-split]optional=list(zip(spec.args[-split:],defaults))else:positional,optional=spec.args,[]varargs=spec.varargsvarkw=spec.varkwifspec.kwonlydefaults:kwonlyargs=set(spec.kwonlyargs)-set(spec.kwonlydefaults.keys())ifreplace_defaults:kwonlyargs_optional=[(kw,i)fori,kwinenumerate(spec.kwonlydefaults.keys())]else:kwonlyargs_optional=list(spec.kwonlydefaults.items())else:kwonlyargs,kwonlyargs_optional=spec.kwonlyargs,[]return', '.join(filter(None,[', '.join(positional),', '.join(f'{k}={v}'fork,vinoptional),f'*{varargs}'ifvarargselseNone,'*'if(kwonlyargsorkwonlyargs_optional)andnotvarargselseNone,', '.join(kwonlyargs)ifkwonlyargselseNone,', '.join(f'{k}="{v}"'fork,vinkwonlyargs_optional),f'**{varkw}'ifvarkwelseNone,]))
[文档]defhead_from_fun(fun:Callable[...,Any],bound:bool=False)->str:"""Generate signature function from actual function."""# we could use inspect.Signature here, but that implementation# is very slow since it implements the argument checking# in pure-Python. Instead we use exec to create a new function# with an empty body, meaning it has the same performance as# as just calling a function.is_function=inspect.isfunction(fun)is_callable=callable(fun)is_cython=fun.__class__.__name__=='cython_function_or_method'is_method=inspect.ismethod(fun)ifnotis_functionandis_callableandnotis_methodandnotis_cython:name,fun=fun.__class__.__name__,fun.__call__else:name=fun.__name__definition=FUNHEAD_TEMPLATE.format(fun_name=name,fun_args=_argsfromspec(inspect.getfullargspec(fun)),fun_value=1,)logger.debug(definition)namespace={'__name__':fun.__module__}# pylint: disable=exec-used# Tasks are rarely, if ever, created at runtime - exec here is fine.exec(definition,namespace)result=namespace[name]result._source=definitionifbound:returnpartial(result,object())returnresult
[文档]deffun_accepts_kwargs(fun):"""Return true if function accepts arbitrary keyword arguments."""returnany(pforpininspect.signature(fun).parameters.values()ifp.kind==p.VAR_KEYWORD)
[文档]defmaybe(typ,val):"""Call typ on value if val is defined."""returntyp(val)ifvalisnotNoneelseval
defseq_concat_item(seq,item):"""Return copy of sequence seq with item added. Returns: Sequence: if seq is a tuple, the result will be a tuple, otherwise it depends on the implementation of ``__add__``. """returnseq+(item,)ifisinstance(seq,tuple)elseseq+[item]defseq_concat_seq(a,b):"""Concatenate two sequences: ``a + b``. Returns: Sequence: The return value will depend on the largest sequence - if b is larger and is a tuple, the return value will be a tuple. - if a is larger and is a list, the return value will be a list, """# find the type of the largest sequenceprefer=type(max([a,b],key=len))# convert the smallest list to the type of the largest sequence.ifnotisinstance(a,prefer):a=prefer(a)ifnotisinstance(b,prefer):b=prefer(b)returna+bdefis_numeric_value(value):returnisinstance(value,(int,float))andnotisinstance(value,bool)