"""Prefork execution pool.Pool implementation using :mod:`multiprocessing`."""importosfrombilliardimportforking_enablefrombilliard.commonimportREMAP_SIGTERM,TERM_SIGNAMEfrombilliard.poolimportCLOSE,RUNfrombilliard.poolimportPoolasBlockingPoolfromceleryimportplatforms,signalsfromcelery._stateimport_set_task_join_will_block,set_default_appfromcelery.appimporttracefromcelery.concurrency.baseimportBasePoolfromcelery.utils.functionalimportnoopfromcelery.utils.logimportget_loggerfrom.asynpoolimportAsynPool__all__=('TaskPool','process_initializer','process_destructor')#: List of signals to reset when a child process starts.WORKER_SIGRESET={'SIGTERM','SIGHUP','SIGTTIN','SIGTTOU','SIGUSR1',}#: List of signals to ignore when a child process starts.ifREMAP_SIGTERM:WORKER_SIGIGNORE={'SIGINT',TERM_SIGNAME}else:WORKER_SIGIGNORE={'SIGINT'}logger=get_logger(__name__)warning,debug=logger.warning,logger.debug
[文档]defprocess_initializer(app,hostname):"""Pool child process initializer. Initialize the child pool process to ensure the correct app instance is used and things like logging works. """# Each running worker gets SIGKILL by OS when main process exits.platforms.set_pdeathsig('SIGKILL')_set_task_join_will_block(True)platforms.signals.reset(*WORKER_SIGRESET)platforms.signals.ignore(*WORKER_SIGIGNORE)platforms.set_mp_process_title('celeryd',hostname=hostname)# This is for Windows and other platforms not supporting# fork(). Note that init_worker makes sure it's only# run once per process.app.loader.init_worker()app.loader.init_worker_process()logfile=os.environ.get('CELERY_LOG_FILE')orNoneiflogfileand'%i'inlogfile.lower():# logfile path will differ so need to set up logging again.app.log.already_setup=Falseapp.log.setup(int(os.environ.get('CELERY_LOG_LEVEL',0)or0),logfile,bool(os.environ.get('CELERY_LOG_REDIRECT',False)),str(os.environ.get('CELERY_LOG_REDIRECT_LEVEL')),hostname=hostname)ifos.environ.get('FORKED_BY_MULTIPROCESSING'):# pool did execv after forktrace.setup_worker_optimizations(app,hostname)else:app.set_current()set_default_app(app)app.finalize()trace._tasks=app._tasks# enables fast_trace_task optimization.# rebuild execution handler for all tasks.fromcelery.app.traceimportbuild_tracerforname,taskinapp.tasks.items():task.__trace__=build_tracer(name,task,app.loader,hostname,app=app)fromcelery.workerimportstateasworker_stateworker_state.reset_state()signals.worker_process_init.send(sender=None)
[文档]defprocess_destructor(pid,exitcode):"""Pool child process destructor. Dispatch the :signal:`worker_process_shutdown` signal. """signals.worker_process_shutdown.send(sender=None,pid=pid,exitcode=exitcode,)
[文档]classTaskPool(BasePool):"""Multiprocessing Pool implementation."""Pool=AsynPoolBlockingPool=BlockingPooluses_semaphore=Truewrite_stats=None
[文档]defon_start(self):forking_enable(self.forking_enable)Pool=(self.BlockingPoolifself.options.get('threads',True)elseself.Pool)proc_alive_timeout=(self.app.conf.worker_proc_alive_timeoutifself.appelseNone)P=self._pool=Pool(processes=self.limit,initializer=process_initializer,on_process_exit=process_destructor,enable_timeouts=True,synack=False,proc_alive_timeout=proc_alive_timeout,**self.options)# Create proxy methodsself.on_apply=P.apply_asyncself.maintain_pool=P.maintain_poolself.terminate_job=P.terminate_jobself.grow=P.growself.shrink=P.shrinkself.flush=getattr(P,'flush',None)# FIXME add to billiard
[文档]defon_stop(self):"""Gracefully stop the pool."""ifself._poolisnotNoneandself._pool._statein(RUN,CLOSE):self._pool.close()self._pool.join()self._pool=None
[文档]defon_terminate(self):"""Force terminate the pool."""ifself._poolisnotNone:self._pool.terminate()self._pool=None