"""Heartbeat service.This is the internal thread responsible for sending heartbeat eventsat regular intervals (may not be an actual thread)."""fromcelery.signalsimportheartbeat_sentfromcelery.utils.sysinfoimportload_averagefrom.stateimportSOFTWARE_INFO,active_requests,all_total_count__all__=('Heart',)
[文档]classHeart:"""Timer sending heartbeats at regular intervals. Arguments: timer (kombu.asynchronous.timer.Timer): Timer to use. eventer (celery.events.EventDispatcher): Event dispatcher to use. interval (float): Time in seconds between sending heartbeats. Default is 2 seconds. """def__init__(self,timer,eventer,interval=None):self.timer=timerself.eventer=eventerself.interval=float(intervalor2.0)self.tref=None# Make event dispatcher start/stop us when enabled/disabled.self.eventer.on_enabled.add(self.start)self.eventer.on_disabled.add(self.stop)# Only send heartbeat_sent signal if it has receivers.self._send_sent_signal=(heartbeat_sent.sendifheartbeat_sent.receiverselseNone)def_send(self,event,retry=True):ifself._send_sent_signalisnotNone:self._send_sent_signal(sender=self)returnself.eventer.send(event,freq=self.interval,active=len(active_requests),processed=all_total_count[0],loadavg=load_average(),retry=retry,**SOFTWARE_INFO)