"""Worker <-> Worker Sync at startup (Bootstep)."""fromceleryimportbootstepsfromcelery.utils.logimportget_loggerfrom.eventsimportEvents__all__=('Mingle',)logger=get_logger(__name__)debug,info,exception=logger.debug,logger.info,logger.exception
[文档]classMingle(bootsteps.StartStopStep):"""Bootstep syncing state with neighbor workers. At startup, or upon consumer restart, this will: - Sync logical clocks. - Sync revoked tasks. """label='Mingle'requires=(Events,)compatible_transports={'amqp','redis','gcpubsub'}def__init__(self,c,without_mingle=False,**kwargs):self.enabled=notwithout_mingleandself.compatible_transport(c.app)super().__init__(c,without_mingle=without_mingle,**kwargs)
[文档]defsync(self,c):info('mingle: searching for neighbors')replies=self.send_hello(c)ifreplies:info('mingle: sync with %s nodes',len([replyforreply,valueinreplies.items()ifvalue]))[self.on_node_reply(c,nodename,reply)fornodename,replyinreplies.items()ifreply]info('mingle: sync complete')else:info('mingle: all alone')
[文档]defsend_hello(self,c):inspect=c.app.control.inspect(timeout=1.0,connection=c.connection)our_revoked=c.controller.state.revokedreplies=inspect.hello(c.hostname,our_revoked._data)or{}replies.pop(c.hostname,None)# delete my own responsereturnreplies
[文档]defon_node_reply(self,c,nodename,reply):debug('mingle: processing reply from %s',nodename)try:self.sync_with_node(c,**reply)exceptMemoryError:raiseexceptExceptionasexc:# pylint: disable=broad-exceptexception('mingle: sync with %s failed: %r',nodename,exc)