"""Implementation of the Observer pattern."""importsysimportthreadingimportwarningsimportweakreffromweakrefimportWeakMethodfromkombu.utils.functionalimportretry_over_timefromcelery.exceptionsimportCDeprecationWarningfromcelery.localimportPromiseProxy,Proxyfromcelery.utils.functionalimportfun_accepts_kwargsfromcelery.utils.logimportget_loggerfromcelery.utils.timeimporthumanize_seconds__all__=('Signal',)logger=get_logger(__name__)def_make_id(target):# pragma: no coverifisinstance(target,Proxy):target=target._get_current_object()ifisinstance(target,(bytes,str)):# see Issue #2475returntargetifhasattr(target,'__func__'):returnid(target.__func__)returnid(target)def_boundmethod_safe_weakref(obj):"""Get weakref constructor appropriate for `obj`. `obj` may be a bound method. Bound method objects must be special-cased because they're usually garbage collected immediately, even if the instance they're bound to persists. Returns: a (weakref constructor, main object) tuple. `weakref constructor` is either :class:`weakref.ref` or :class:`weakref.WeakMethod`. `main object` is the instance that `obj` is bound to if it is a bound method; otherwise `main object` is simply `obj. """try:obj.__func__obj.__self__# Bound methodreturnWeakMethod,obj.__self__exceptAttributeError:# Not a bound methodreturnweakref.ref,objdef_make_lookup_key(receiver,sender,dispatch_uid):ifdispatch_uid:return(dispatch_uid,_make_id(sender))# Issue #9119 - retry-wrapped functions use the underlying function for dispatch_uidelifhasattr(receiver,'_dispatch_uid'):return(receiver._dispatch_uid,_make_id(sender))else:return(_make_id(receiver),_make_id(sender))NONE_ID=_make_id(None)NO_RECEIVERS=object()RECEIVER_RETRY_ERROR="""\Could not process signal receiver %(receiver)s. Retrying %(when)s...\"""
[文档]classSignal:# pragma: no cover"""Create new signal. Keyword Arguments: providing_args (List): A list of the arguments this signal can pass along in a :meth:`send` call. use_caching (bool): Enable receiver cache. name (str): Name of signal, used for debugging purposes. """#: Holds a dictionary of#: ``{receiverkey (id): weakref(receiver)}`` mappings.receivers=Nonedef__init__(self,providing_args=None,use_caching=False,name=None):self.receivers=[]self.providing_args=set(providing_argsifproviding_argsisnotNoneelse[])self.lock=threading.Lock()self.use_caching=use_cachingself.name=name# For convenience we create empty caches even if they are not used.# A note about caching: if use_caching is defined, then for each# distinct sender we cache the receivers that sender has in# 'sender_receivers_cache'. The cache is cleaned when .connect() or# .disconnect() is called and populated on .send().self.sender_receivers_cache=(weakref.WeakKeyDictionary()ifuse_cachingelse{})self._dead_receivers=Falsedef_connect_proxy(self,fun,sender,weak,dispatch_uid):returnself.connect(fun,sender=sender._get_current_object(),weak=weak,dispatch_uid=dispatch_uid,)
[文档]defconnect(self,*args,**kwargs):"""Connect receiver to sender for signal. Arguments: receiver (Callable): A function or an instance method which is to receive signals. Receivers must be hashable objects. if weak is :const:`True`, then receiver must be weak-referenceable. Receivers must be able to accept keyword arguments. If receivers have a `dispatch_uid` attribute, the receiver will not be added if another receiver already exists with that `dispatch_uid`. sender (Any): The sender to which the receiver should respond. Must either be a Python object, or :const:`None` to receive events from any sender. weak (bool): Whether to use weak references to the receiver. By default, the module will attempt to use weak references to the receiver objects. If this parameter is false, then strong references will be used. dispatch_uid (Hashable): An identifier used to uniquely identify a particular instance of a receiver. This will usually be a string, though it may be anything hashable. retry (bool): If the signal receiver raises an exception (e.g. ConnectionError), the receiver will be retried until it runs successfully. A strong ref to the receiver will be stored and the `weak` option will be ignored. """def_handle_options(sender=None,weak=True,dispatch_uid=None,retry=False):def_connect_signal(fun):options={'dispatch_uid':dispatch_uid,'weak':weak}def_retry_receiver(retry_fun):def_try_receiver_over_time(*args,**kwargs):defon_error(exc,intervals,retries):interval=next(intervals)err_msg=RECEIVER_RETRY_ERROR% \
{'receiver':retry_fun,'when':humanize_seconds(interval,'in',' ')}logger.error(err_msg)returnintervalreturnretry_over_time(retry_fun,Exception,args,kwargs,on_error)return_try_receiver_over_timeifretry:options['weak']=Falseifnotdispatch_uid:# if there's no dispatch_uid then we need to set the# dispatch uid to the original func id so we can look# it up later with the original func idoptions['dispatch_uid']=_make_id(fun)fun=_retry_receiver(fun)fun._dispatch_uid=options['dispatch_uid']self._connect_signal(fun,sender,options['weak'],options['dispatch_uid'])returnfunreturn_connect_signalifargsandcallable(args[0]):return_handle_options(*args[1:],**kwargs)(args[0])return_handle_options(*args,**kwargs)
def_connect_signal(self,receiver,sender,weak,dispatch_uid):assertcallable(receiver),'Signal receivers must be callable'ifnotfun_accepts_kwargs(receiver):raiseValueError('Signal receiver must accept keyword arguments.')ifisinstance(sender,PromiseProxy):sender.__then__(self._connect_proxy,receiver,sender,weak,dispatch_uid,)returnreceiverlookup_key=_make_lookup_key(receiver,sender,dispatch_uid)ifweak:ref,receiver_object=_boundmethod_safe_weakref(receiver)receiver=ref(receiver)weakref.finalize(receiver_object,self._remove_receiver)withself.lock:self._clear_dead_receivers()forr_key,_inself.receivers:ifr_key==lookup_key:breakelse:self.receivers.append((lookup_key,receiver))self.sender_receivers_cache.clear()returnreceiver
[文档]defdisconnect(self,receiver=None,sender=None,weak=None,dispatch_uid=None):"""Disconnect receiver from sender for signal. If weak references are used, disconnect needn't be called. The receiver will be removed from dispatch automatically. Arguments: receiver (Callable): The registered receiver to disconnect. May be none if `dispatch_uid` is specified. sender (Any): The registered sender to disconnect. weak (bool): The weakref state to disconnect. dispatch_uid (Hashable): The unique identifier of the receiver to disconnect. """ifweakisnotNone:warnings.warn('Passing `weak` to disconnect has no effect.',CDeprecationWarning,stacklevel=2)lookup_key=_make_lookup_key(receiver,sender,dispatch_uid)disconnected=Falsewithself.lock:self._clear_dead_receivers()forindexinrange(len(self.receivers)):(r_key,_)=self.receivers[index]ifr_key==lookup_key:disconnected=Truedelself.receivers[index]breakself.sender_receivers_cache.clear()returndisconnected
[文档]defsend(self,sender,**named):"""Send signal from sender to all connected receivers. If any receiver raises an error, the exception is returned as the corresponding response. (This is different from the "send" in Django signals. In Celery "send" and "send_robust" do the same thing.) Arguments: sender (Any): The sender of the signal. Either a specific object or :const:`None`. **named (Any): Named arguments which will be passed to receivers. Returns: List: of tuple pairs: `[(receiver, response), … ]`. """responses=[]ifnotself.receiversor \
self.sender_receivers_cache.get(sender)isNO_RECEIVERS:returnresponsesforreceiverinself._live_receivers(sender):try:response=receiver(signal=self,sender=sender,**named)exceptExceptionasexc:# pylint: disable=broad-exceptifnothasattr(exc,'__traceback__'):exc.__traceback__=sys.exc_info()[2]logger.exception('Signal handler %r raised: %r',receiver,exc)responses.append((receiver,exc))else:responses.append((receiver,response))returnresponses
send_robust=send# Compat with Django interface.def_clear_dead_receivers(self):# Warning: caller is assumed to hold self.lockifself._dead_receivers:self._dead_receivers=Falsenew_receivers=[]forrinself.receivers:ifisinstance(r[1],weakref.ReferenceType)andr[1]()isNone:continuenew_receivers.append(r)self.receivers=new_receiversdef_live_receivers(self,sender):"""Filter sequence of receivers to get resolved, live receivers. This checks for weak references and resolves them, then returning only live receivers. """receivers=Noneifself.use_cachingandnotself._dead_receivers:receivers=self.sender_receivers_cache.get(sender)# We could end up here with NO_RECEIVERS even if we do check this# case in .send() prior to calling _Live_receivers() due to# concurrent .send() call.ifreceiversisNO_RECEIVERS:return[]ifreceiversisNone:withself.lock:self._clear_dead_receivers()senderkey=_make_id(sender)receivers=[]for(receiverkey,r_senderkey),receiverinself.receivers:ifr_senderkey==NONE_IDorr_senderkey==senderkey:receivers.append(receiver)ifself.use_caching:ifnotreceivers:self.sender_receivers_cache[sender]=NO_RECEIVERSelse:# Note: we must cache the weakref versions.self.sender_receivers_cache[sender]=receiversnon_weak_receivers=[]forreceiverinreceivers:ifisinstance(receiver,weakref.ReferenceType):# Dereference the weak reference.receiver=receiver()ifreceiverisnotNone:non_weak_receivers.append(receiver)else:non_weak_receivers.append(receiver)returnnon_weak_receiversdef_remove_receiver(self,receiver=None):"""Remove dead receivers from connections."""# Mark that the self..receivers first has dead weakrefs. If so,# we will clean those up in connect, disconnect and _live_receivers# while holding self.lock. Note that doing the cleanup here isn't a# good idea, _remove_receiver() will be called as a side effect of# garbage collection, and so the call can happen wh ile we are already# holding self.lock.self._dead_receivers=Truedef__repr__(self):"""``repr(signal)``."""returnf'<{type(self).__name__}: {self.name} providing_args={self.providing_args!r}>'def__str__(self):"""``str(signal)``."""returnrepr(self)