"""WorkController can be used to instantiate in-process workers.The command-line interface for the worker is in :mod:`celery.bin.worker`,while the worker program is in :mod:`celery.apps.worker`.The worker program is responsible for adding signal handlers,setting up logging, etc. This is a bare-bones worker withoutglobal side-effects (i.e., except for the global state stored in:mod:`celery.worker.state`).The worker consists of several components, all managed by bootsteps(mod:`celery.bootsteps`)."""importosimportsysfromdatetimeimportdatetime,timezonefromtimeimportsleepfrombilliardimportcpu_countfromkombu.utils.compatimportdetect_environmentfromceleryimportbootstepsfromceleryimportconcurrencyas_concurrencyfromceleryimportsignalsfromcelery.bootstepsimportRUN,TERMINATEfromcelery.exceptionsimportImproperlyConfigured,TaskRevokedError,WorkerTerminatefromcelery.platformsimportEX_FAILURE,create_pidlockfromcelery.utils.importsimportreload_from_cwdfromcelery.utils.logimportmlevelfromcelery.utils.logimportworker_loggerasloggerfromcelery.utils.nodenamesimportdefault_nodename,worker_directfromcelery.utils.textimportstr_to_listfromcelery.utils.threadsimportdefault_socket_timeoutfrom.importstatetry:importresourceexceptImportError:resource=None__all__=('WorkController',)#: Default socket timeout at shutdown.SHUTDOWN_SOCKET_TIMEOUT=5.0SELECT_UNKNOWN_QUEUE="""Trying to select queue subset of {0!r}, but queue {1} isn'tdefined in the `task_queues` setting.If you want to automatically declare unknown queues you canenable the `task_create_missing_queues` setting."""DESELECT_UNKNOWN_QUEUE="""Trying to deselect queue subset of {0!r}, but queue {1} isn'tdefined in the `task_queues` setting."""
[文档]classWorkController:"""Unmanaged worker instance."""app=Nonepidlock=Noneblueprint=Nonepool=Nonesemaphore=None#: contains the exit code if a :exc:`SystemExit` event is handled.exitcode=None
[文档]defsetup_instance(self,queues=None,ready_callback=None,pidfile=None,include=None,use_eventloop=None,exclude_queues=None,**kwargs):self.pidfile=pidfileself.setup_queues(queues,exclude_queues)self.setup_includes(str_to_list(include))# Set default concurrencyifnotself.concurrency:try:self.concurrency=cpu_count()exceptNotImplementedError:self.concurrency=2# Optionsself.loglevel=mlevel(self.loglevel)self.ready_callback=ready_callbackorself.on_consumer_ready# this connection won't establish, only used for paramsself._conninfo=self.app.connection_for_read()self.use_eventloop=(self.should_use_eventloop()ifuse_eventloopisNoneelseuse_eventloop)self.options=kwargssignals.worker_init.send(sender=self)# Initialize bootstepsself.pool_cls=_concurrency.get_implementation(self.pool_cls)self.steps=[]self.on_init_blueprint()self.blueprint=self.Blueprint(steps=self.app.steps['worker'],on_start=self.on_start,on_close=self.on_close,on_stopped=self.on_stopped,)self.blueprint.apply(self,**kwargs)
[文档]defsetup_includes(self,includes):# Update celery_include to have all known task modules, so that we# ensure all task modules are imported in case an execv happens.prev=tuple(self.app.conf.include)ifincludes:prev+=tuple(includes)[self.app.loader.import_task_module(m)forminincludes]self.include=includestask_modules={task.__class__.__module__fortaskinself.app.tasks.values()}self.app.conf.include=tuple(set(prev)|task_modules)
def_process_task_sem(self,req):returnself._quick_acquire(self._process_task,req)def_process_task(self,req):"""Process task by sending it to the pool of workers."""try:req.execute_using_pool(self.pool)exceptTaskRevokedError:try:self._quick_release()# Issue 877exceptAttributeError:pass
[文档]defstop(self,in_sighandler=False,exitcode=None):"""Graceful shutdown of the worker server (Warm shutdown)."""ifexitcodeisnotNone:self.exitcode=exitcodeifself.blueprint.state==RUN:self.signal_consumer_close()ifnotin_sighandlerorself.pool.signal_safe:self._shutdown(warm=True)self._send_worker_shutdown()
[文档]defterminate(self,in_sighandler=False):"""Not so graceful shutdown of the worker server (Cold shutdown)."""ifself.blueprint.state!=TERMINATE:self.signal_consumer_close()ifnotin_sighandlerorself.pool.signal_safe:self._shutdown(warm=False)
def_shutdown(self,warm=True):# if blueprint does not exist it means that we had an# error before the bootsteps could be initialized.ifself.blueprintisnotNone:withdefault_socket_timeout(SHUTDOWN_SOCKET_TIMEOUT):# Issue 975self.blueprint.stop(self,terminate=notwarm)self.blueprint.join()
[文档]defrusage(self):ifresourceisNone:raiseNotImplementedError('rusage not supported by this platform')s=resource.getrusage(resource.RUSAGE_SELF)return{'utime':s.ru_utime,'stime':s.ru_stime,'maxrss':s.ru_maxrss,'ixrss':s.ru_ixrss,'idrss':s.ru_idrss,'isrss':s.ru_isrss,'minflt':s.ru_minflt,'majflt':s.ru_majflt,'nswap':s.ru_nswap,'inblock':s.ru_inblock,'oublock':s.ru_oublock,'msgsnd':s.ru_msgsnd,'msgrcv':s.ru_msgrcv,'nsignals':s.ru_nsignals,'nvcsw':s.ru_nvcsw,'nivcsw':s.ru_nivcsw,}
[文档]defsetup_defaults(self,concurrency=None,loglevel='WARN',logfile=None,task_events=None,pool=None,consumer_cls=None,timer_cls=None,timer_precision=None,autoscaler_cls=None,pool_putlocks=None,pool_restarts=None,optimization=None,O=None,# O maps to -O=fairstatedb=None,time_limit=None,soft_time_limit=None,scheduler=None,pool_cls=None,# XXX use poolstate_db=None,# XXX use statedbtask_time_limit=None,# XXX use time_limittask_soft_time_limit=None,# XXX use soft_time_limitscheduler_cls=None,# XXX use schedulerschedule_filename=None,max_tasks_per_child=None,prefetch_multiplier=None,disable_rate_limits=None,worker_lost_wait=None,max_memory_per_child=None,**_kw):either=self.app.eitherself.loglevel=loglevelself.logfile=logfileself.concurrency=either('worker_concurrency',concurrency)self.task_events=either('worker_send_task_events',task_events)self.pool_cls=either('worker_pool',pool,pool_cls)self.consumer_cls=either('worker_consumer',consumer_cls)self.timer_cls=either('worker_timer',timer_cls)self.timer_precision=either('worker_timer_precision',timer_precision,)self.optimization=optimizationorOself.autoscaler_cls=either('worker_autoscaler',autoscaler_cls)self.pool_putlocks=either('worker_pool_putlocks',pool_putlocks)self.pool_restarts=either('worker_pool_restarts',pool_restarts)self.statedb=either('worker_state_db',statedb,state_db)self.schedule_filename=either('beat_schedule_filename',schedule_filename,)self.scheduler=either('beat_scheduler',scheduler,scheduler_cls)self.time_limit=either('task_time_limit',time_limit,task_time_limit)self.soft_time_limit=either('task_soft_time_limit',soft_time_limit,task_soft_time_limit,)self.max_tasks_per_child=either('worker_max_tasks_per_child',max_tasks_per_child,)self.max_memory_per_child=either('worker_max_memory_per_child',max_memory_per_child,)self.prefetch_multiplier=int(either('worker_prefetch_multiplier',prefetch_multiplier,))self.disable_rate_limits=either('worker_disable_rate_limits',disable_rate_limits,)self.worker_lost_wait=either('worker_lost_wait',worker_lost_wait)
[文档]defwait_for_soft_shutdown(self):"""Wait :setting:`worker_soft_shutdown_timeout` if soft shutdown is enabled. To enable soft shutdown, set the :setting:`worker_soft_shutdown_timeout` in the configuration. Soft shutdown can be used to allow the worker to finish processing few more tasks before initiating a cold shutdown. This mechanism allows the worker to finish short tasks that are already in progress and requeue long-running tasks to be picked up by another worker. .. warning:: If there are no tasks in the worker, the worker will not wait for the soft shutdown timeout even if it is set as it makes no sense to wait for the timeout when there are no tasks to process. """app=self.apprequests=tuple(state.active_requests)ifapp.conf.worker_enable_soft_shutdown_on_idle:requests=Trueifapp.conf.worker_soft_shutdown_timeout>0andrequests:log=f"Initiating Soft Shutdown, terminating in {app.conf.worker_soft_shutdown_timeout} seconds"logger.warning(log)sleep(app.conf.worker_soft_shutdown_timeout)