"""Pool Autoscaling.This module implements the internal thread responsiblefor growing and shrinking the pool according to thecurrent autoscale settings.The autoscale thread is only enabled ifthe :option:`celery worker --autoscale` option is used."""importosimportthreadingfromtimeimportmonotonic,sleepfromkombu.asynchronous.semaphoreimportDummyLockfromceleryimportbootstepsfromcelery.utils.logimportget_loggerfromcelery.utils.threadsimportbgThreadfrom.importstatefrom.componentsimportPool__all__=('Autoscaler','WorkerComponent')logger=get_logger(__name__)debug,info,error=logger.debug,logger.info,logger.errorAUTOSCALE_KEEPALIVE=float(os.environ.get('AUTOSCALE_KEEPALIVE',30))
[文档]classWorkerComponent(bootsteps.StartStopStep):"""Bootstep that starts the autoscaler thread/timer in the worker."""label='Autoscaler'conditional=Truerequires=(Pool,)def__init__(self,w,**kwargs):self.enabled=w.autoscalew.autoscaler=None
[文档]classAutoscaler(bgThread):"""Background thread to autoscale pool workers."""def__init__(self,pool,max_concurrency,min_concurrency=0,worker=None,keepalive=AUTOSCALE_KEEPALIVE,mutex=None):super().__init__()self.pool=poolself.mutex=mutexorthreading.Lock()self.max_concurrency=max_concurrencyself.min_concurrency=min_concurrencyself.keepalive=keepaliveself._last_scale_up=Noneself.worker=workerassertself.keepalive,'cannot scale down too fast.'
def_grow(self,n):info('Scaling up %s processes.',n)self.pool.grow(n)def_shrink(self,n):info('Scaling down %s processes.',n)try:self.pool.shrink(n)exceptValueError:debug("Autoscaler won't scale down: all processes busy.")exceptExceptionasexc:error('Autoscaler: scale_down: %r',exc,exc_info=True)def_update_consumer_prefetch_count(self,new_max):diff=new_max-self.max_concurrencyifdiff:self.worker.consumer._update_prefetch_count(diff)