"""Beat command-line program.This module is the 'program-version' of :mod:`celery.beat`.It does everything necessary to run that moduleas an actual application, like installing signal handlersand so on."""from__future__importannotationsimportnumbersimportsocketimportsysfromdatetimeimportdatetimefromsignalimportSignalsfromtypesimportFrameTypefromtypingimportAnyfromceleryimportVERSION_BANNER,Celery,beat,platformsfromcelery.utils.importsimportqualnamefromcelery.utils.logimportLOG_LEVELS,get_loggerfromcelery.utils.timeimporthumanize_seconds__all__=('Beat',)STARTUP_INFO_FMT="""LocalTime -> {timestamp}Configuration -> . broker -> {conninfo} . loader -> {loader} . scheduler -> {scheduler}{scheduler_info} . logfile -> {logfile}@%{loglevel} . maxinterval -> {hmax_interval} ({max_interval}s)""".strip()logger=get_logger('celery.beat')
[文档]classBeat:"""Beat as a service."""Service=beat.Serviceapp:Celery=Nonedef__init__(self,max_interval:int|None=None,app:Celery|None=None,socket_timeout:int=30,pidfile:str|None=None,no_color:bool|None=None,loglevel:str='WARN',logfile:str|None=None,schedule:str|None=None,scheduler:str|None=None,scheduler_cls:str|None=None,# XXX use schedulerredirect_stdouts:bool|None=None,redirect_stdouts_level:str|None=None,quiet:bool=False,**kwargs:Any)->None:self.app=app=apporself.appeither=self.app.eitherself.loglevel=loglevelself.logfile=logfileself.schedule=either('beat_schedule_filename',schedule)self.scheduler_cls=either('beat_scheduler',scheduler,scheduler_cls)self.redirect_stdouts=either('worker_redirect_stdouts',redirect_stdouts)self.redirect_stdouts_level=either('worker_redirect_stdouts_level',redirect_stdouts_level)self.quiet=quietself.max_interval=max_intervalself.socket_timeout=socket_timeoutself.no_color=no_colorself.colored=app.log.colored(self.logfile,enabled=notno_colorifno_colorisnotNoneelseno_color,)self.pidfile=pidfileifnotisinstance(self.loglevel,numbers.Integral):self.loglevel=LOG_LEVELS[self.loglevel.upper()]
[文档]defrun(self)->None:ifnotself.quiet:print(str(self.colored.cyan(f'celery beat v{VERSION_BANNER} is starting.')))self.init_loader()self.set_process_title()self.start_scheduler()
[文档]definit_loader(self)->None:# Run the worker init handler.# (Usually imports task modules and such.)self.app.loader.init_worker()self.app.finalize()
[文档]definstall_sync_handler(self,service:beat.Service)->None:"""Install a `SIGTERM` + `SIGINT` handler saving the schedule."""def_sync(signum:Signals,frame:FrameType)->None:service.sync()raiseSystemExit()platforms.signals.update(SIGTERM=_sync,SIGINT=_sync)