"""Worker-level Bootsteps."""importatexitimportwarningsfromkombu.asynchronousimportHubas_Hubfromkombu.asynchronousimportget_event_loop,set_event_loopfromkombu.asynchronous.semaphoreimportDummyLock,LaxBoundedSemaphorefromkombu.asynchronous.timerimportTimeras_Timerfromceleryimportbootstepsfromcelery._stateimport_set_task_join_will_blockfromcelery.exceptionsimportImproperlyConfiguredfromcelery.platformsimportIS_WINDOWSfromcelery.utils.logimportworker_loggeraslogger__all__=('Timer','Hub','Pool','Beat','StateDB','Consumer')GREEN_POOLS={'eventlet','gevent'}ERR_B_GREEN="""\-B option doesn't work with eventlet/gevent pools: \use standalone beat instead.\"""W_POOL_SETTING="""The worker_pool setting shouldn't be used to select the eventlet/geventpools, instead you *must use the -P* argument so that patches are appliedas early as possible."""
[文档]defcreate(self,w):ifw.use_eventloop:# does not use dedicated timer thread.w.timer=_Timer(max_interval=10.0)else:ifnotw.timer_cls:# Default Timer is set by the pool, as for example, the# eventlet pool needs a custom timer implementation.w.timer_cls=w.pool_cls.Timerw.timer=self.instantiate(w.timer_cls,max_interval=w.timer_precision,on_error=self.on_timer_error,on_tick=self.on_timer_tick)
[文档]defon_timer_tick(self,delay):logger.debug('Timer wake-up! Next ETA %s secs.',delay)
[文档]classHub(bootsteps.StartStopStep):"""Worker starts the event loop."""requires=(Timer,)def__init__(self,w,**kwargs):w.hub=Nonesuper().__init__(w,**kwargs)
def_patch_thread_primitives(self,w):# make clock use dummy lockw.app.clock.mutex=DummyLock()# multiprocessing's ApplyResult uses this lock.try:frombilliardimportpoolexceptImportError:passelse:pool.Lock=DummyLock
[文档]classPool(bootsteps.StartStopStep):"""Bootstep managing the worker pool. Describes how to initialize the worker pool, and starts and stops the pool during worker start-up/shutdown. Adds attributes: * autoscale * pool * max_concurrency * min_concurrency """requires=(Hub,)def__init__(self,w,autoscale=None,**kwargs):w.pool=Nonew.max_concurrency=Nonew.min_concurrency=w.concurrencyself.optimization=w.optimizationifisinstance(autoscale,str):max_c,_,min_c=autoscale.partition(',')autoscale=[int(max_c),min_candint(min_c)or0]w.autoscale=autoscaleifw.autoscale:w.max_concurrency,w.min_concurrency=w.autoscalesuper().__init__(w,**kwargs)
[文档]defcreate(self,w):semaphore=Nonemax_restarts=Noneifw.app.conf.worker_poolinGREEN_POOLS:# pragma: no coverwarnings.warn(UserWarning(W_POOL_SETTING))threaded=notw.use_eventlooporIS_WINDOWSprocs=w.min_concurrencyw.process_task=w._process_taskifnotthreaded:semaphore=w.semaphore=LaxBoundedSemaphore(procs)w._quick_acquire=w.semaphore.acquirew._quick_release=w.semaphore.releasemax_restarts=100ifw.pool_putlocksandw.pool_cls.uses_semaphore:w.process_task=w._process_task_semallow_restart=w.pool_restartspool=w.pool=self.instantiate(w.pool_cls,w.min_concurrency,initargs=(w.app,w.hostname),maxtasksperchild=w.max_tasks_per_child,max_memory_per_child=w.max_memory_per_child,timeout=w.time_limit,soft_timeout=w.soft_time_limit,putlocks=w.pool_putlocksandthreaded,lost_worker_timeout=w.worker_lost_wait,threads=threaded,max_restarts=max_restarts,allow_restart=allow_restart,forking_enable=True,semaphore=semaphore,sched_strategy=self.optimization,app=w.app,)_set_task_join_will_block(pool.task_join_will_block)returnpool
[文档]classBeat(bootsteps.StartStopStep):"""Step used to embed a beat process. Enabled when the ``beat`` argument is set. """label='Beat'conditional=Truedef__init__(self,w,beat=False,**kwargs):self.enabled=w.beat=beatw.beat=Nonesuper().__init__(w,beat=beat,**kwargs)
[文档]classStateDB(bootsteps.Step):"""Bootstep that sets up between-restart state database file."""def__init__(self,w,**kwargs):self.enabled=w.statedbw._persistence=Nonesuper().__init__(w,**kwargs)