"""Async I/O backend support utilities."""importsocketimportthreadingimporttimefromcollectionsimportdequefromqueueimportEmptyfromtimeimportsleepfromweakrefimportWeakKeyDictionaryfromkombu.utils.compatimportdetect_environmentfromceleryimportstatesfromcelery.exceptionsimportTimeoutErrorfromcelery.utils.threadsimportTHREAD_TIMEOUT_MAX__all__=('AsyncBackendMixin','BaseResultConsumer','Drainer','register_drainer',)drainers={}
[文档]defregister_drainer(name):"""Decorator used to register a new result drainer type."""def_inner(cls):drainers[name]=clsreturnclsreturn_inner
[文档]defdrain_events_until(self,p,timeout=None,interval=1,on_interval=None,wait=None):wait=waitorself.result_consumer.drain_eventstime_start=time.monotonic()while1:# Total time spent may exceed a single call to wait()iftimeoutandtime.monotonic()-time_start>=timeout:raisesocket.timeout()try:yieldself.wait_for(p,wait,timeout=interval)exceptsocket.timeout:passifon_interval:on_interval()ifp.ready:# got event on the wanted channel.break
classgreenletDrainer(Drainer):spawn=None_g=None_drain_complete_event=None# event, sended (and recreated) after every drain_events iterationdef_create_drain_complete_event(self):"""create new self._drain_complete_event object"""passdef_send_drain_complete_event(self):"""raise self._drain_complete_event for wakeup .wait_for"""passdef__init__(self,*args,**kwargs):super().__init__(*args,**kwargs)self._started=threading.Event()self._stopped=threading.Event()self._shutdown=threading.Event()self._create_drain_complete_event()defrun(self):self._started.set()whilenotself._stopped.is_set():try:self.result_consumer.drain_events(timeout=1)self._send_drain_complete_event()self._create_drain_complete_event()exceptsocket.timeout:passself._shutdown.set()defstart(self):ifnotself._started.is_set():self._g=self.spawn(self.run)self._started.wait()defstop(self):self._stopped.set()self._send_drain_complete_event()self._shutdown.wait(THREAD_TIMEOUT_MAX)defwait_for(self,p,wait,timeout=None):self.start()ifnotp.ready:self._drain_complete_event.wait(timeout=timeout)@register_drainer('eventlet')classeventletDrainer(greenletDrainer):defspawn(self,func):fromeventletimportsleep,spawng=spawn(func)sleep(0)returngdef_create_drain_complete_event(self):fromeventlet.eventimportEventself._drain_complete_event=Event()def_send_drain_complete_event(self):self._drain_complete_event.send()@register_drainer('gevent')classgeventDrainer(greenletDrainer):defspawn(self,func):importgeventg=gevent.spawn(func)gevent.sleep(0)returngdef_create_drain_complete_event(self):fromgevent.eventimportEventself._drain_complete_event=Event()def_send_drain_complete_event(self):self._drain_complete_event.set()self._create_drain_complete_event()
[文档]classAsyncBackendMixin:"""Mixin for backends that enables the async API."""def_collect_into(self,result,bucket):self.result_consumer.buckets[result]=bucket
[文档]defiter_native(self,result,no_ack=True,**kwargs):self._ensure_not_eager()results=result.resultsifnotresults:raiseStopIteration()# we tell the result consumer to put consumed results# into these buckets.bucket=deque()fornodeinresults:ifnothasattr(node,'_cache'):bucket.append(node)elifnode._cache:bucket.append(node)else:self._collect_into(node,bucket)for_inself._wait_for_pending(result,no_ack=no_ack,**kwargs):whilebucket:node=bucket.popleft()ifnothasattr(node,'_cache'):yieldnode.id,node.childrenelse:yieldnode.id,node._cachewhilebucket:node=bucket.popleft()yieldnode.id,node._cache
[文档]classBaseResultConsumer:"""Manager responsible for consuming result messages."""def__init__(self,backend,app,accept,pending_results,pending_messages):self.backend=backendself.app=appself.accept=acceptself._pending_results=pending_resultsself._pending_messages=pending_messagesself.on_message=Noneself.buckets=WeakKeyDictionary()self.drainer=drainers[detect_environment()](self)
[文档]defon_state_change(self,meta,message):ifself.on_message:self.on_message(meta)ifmeta['status']instates.READY_STATES:task_id=meta['task_id']try:result=self._get_pending_result(task_id)exceptKeyError:# send to buffer in case we received this result# before it was added to _pending_results.self._pending_messages.put(task_id,meta)else:result._maybe_set_cache(meta)buckets=self.bucketstry:# remove bucket for this result, since it's fulfilledbucket=buckets.pop(result)exceptKeyError:passelse:# send to waiter via bucketbucket.append(result)sleep(0)