"""Embedded workers for integration tests."""importloggingimportosimportthreadingfromcontextlibimportcontextmanagerfromtypingimportAny,Iterable,Optional,Unionimportcelery.worker.consumer# noqafromceleryimportCelery,workerfromcelery.resultimport_set_task_join_will_block,allow_join_resultfromcelery.utils.dispatchimportSignalfromcelery.utils.nodenamesimportanon_nodenameWORKER_LOGLEVEL=os.environ.get('WORKER_LOGLEVEL','error')test_worker_starting=Signal(name='test_worker_starting',providing_args={},)test_worker_started=Signal(name='test_worker_started',providing_args={'worker','consumer'},)test_worker_stopped=Signal(name='test_worker_stopped',providing_args={'worker'},)
[文档]classTestWorkController(worker.WorkController):"""Worker that can synchronize on being fully started."""# When this class is imported in pytest files, prevent pytest from thinking# this is a test class__test__=Falselogger_queue=Nonedef__init__(self,*args,**kwargs):# type: (*Any, **Any) -> Noneself._on_started=threading.Event()super().__init__(*args,**kwargs)ifself.pool_cls.__module__.split('.')[-1]=='prefork':frombilliardimportQueueself.logger_queue=Queue()self.pid=os.getpid()try:fromtblibimportpickling_supportpickling_support.install()exceptImportError:pass# collect logs from forked process.# XXX: those logs will appear twice in the live logself.queue_listener=logging.handlers.QueueListener(self.logger_queue,logging.getLogger())self.queue_listener.start()
[文档]defon_consumer_ready(self,consumer):# type: (celery.worker.consumer.Consumer) -> None"""Callback called when the Consumer blueprint is fully started."""self._on_started.set()test_worker_started.send(sender=self.app,worker=self,consumer=consumer)
[文档]defensure_started(self):# type: () -> None"""Wait for worker to be fully up and running. Warning: Worker must be started within a thread for this to work, or it will block forever. """self._on_started.wait()
@contextmanagerdef_start_worker_thread(app:Celery,concurrency:int=1,pool:str='solo',loglevel:Union[str,int]=WORKER_LOGLEVEL,logfile:Optional[str]=None,WorkController:Any=TestWorkController,perform_ping_check:bool=True,shutdown_timeout:float=10.0,**kwargs)->Iterable[worker.WorkController]:"""Start Celery worker in a thread. Yields: celery.worker.Worker: worker instance. """setup_app_for_worker(app,loglevel,logfile)ifperform_ping_check:assert'celery.ping'inapp.tasks# Make sure we can connect to the brokerwithapp.connection(hostname=os.environ.get('TEST_BROKER'))asconn:conn.default_channel.queue_declareworker=WorkController(app=app,concurrency=concurrency,hostname=kwargs.pop("hostname",anon_nodename()),pool=pool,loglevel=loglevel,logfile=logfile,# not allowed to override TestWorkController.on_consumer_readyready_callback=None,without_heartbeat=kwargs.pop("without_heartbeat",True),without_mingle=True,without_gossip=True,**kwargs)t=threading.Thread(target=worker.start,daemon=True)t.start()worker.ensure_started()_set_task_join_will_block(False)try:yieldworkerfinally:fromcelery.workerimportstatestate.should_terminate=0t.join(shutdown_timeout)ift.is_alive():raiseRuntimeError("Worker thread failed to exit within the allocated timeout. ""Consider raising `shutdown_timeout` if your tasks take longer ""to execute.")state.should_terminate=None@contextmanagerdef_start_worker_process(app,concurrency=1,pool='solo',loglevel=WORKER_LOGLEVEL,logfile=None,**kwargs):# type (Celery, int, str, Union[int, str], str, **Any) -> Iterable"""Start worker in separate process. Yields: celery.app.worker.Worker: worker instance. """fromcelery.apps.multiimportCluster,Nodeapp.set_current()cluster=Cluster([Node('testworker1@%h')])cluster.start()try:yieldfinally:cluster.stopwait()
[文档]defsetup_app_for_worker(app:Celery,loglevel:Union[str,int],logfile:str)->None:"""Setup the app to be used for starting an embedded worker."""app.finalize()app.set_current()app.set_default()type(app.log)._setup=Falseapp.log.setup(loglevel=loglevel,logfile=logfile)