"""Worker <-> Worker communication Bootstep."""fromcollectionsimportdefaultdictfromfunctoolsimportpartialfromheapqimportheappushfromoperatorimportitemgetterfromkombuimportConsumerfromkombu.asynchronous.semaphoreimportDummyLockfromkombu.exceptionsimportContentDisallowed,DecodeErrorfromceleryimportbootstepsfromcelery.utils.logimportget_loggerfromcelery.utils.objectsimportBunchfrom.mingleimportMingle__all__=('Gossip',)logger=get_logger(__name__)debug,info=logger.debug,logger.info
[文档]classGossip(bootsteps.ConsumerStep):"""Bootstep consuming events from other workers. This keeps the logical clock value up to date. """label='Gossip'requires=(Mingle,)_cons_stamp_fields=itemgetter('id','clock','hostname','pid','topic','action','cver',)compatible_transports={'amqp','redis'}def__init__(self,c,without_gossip=False,interval=5.0,heartbeat_interval=2.0,**kwargs):self.enabled=notwithout_gossipandself.compatible_transport(c.app)self.app=c.appc.gossip=selfself.Receiver=c.app.events.Receiverself.hostname=c.hostnameself.full_hostname='.'.join([self.hostname,str(c.pid)])self.on=Bunch(node_join=set(),node_leave=set(),node_lost=set(),)self.timer=c.timerifself.enabled:self.state=c.app.events.State(on_node_join=self.on_node_join,on_node_leave=self.on_node_leave,max_tasks_in_memory=1,)ifc.hub:c._mutex=DummyLock()self.update_state=self.state.eventself.interval=intervalself.heartbeat_interval=heartbeat_intervalself._tref=Noneself.consensus_requests=defaultdict(list)self.consensus_replies={}self.event_handlers={'worker.elect':self.on_elect,'worker.elect.ack':self.on_elect_ack,}self.clock=c.app.clockself.election_handlers={'task':self.call_task}super().__init__(c,**kwargs)
[文档]defcall_task(self,task):try:self.app.signature(task).apply_async()exceptExceptionasexc:# pylint: disable=broad-exceptlogger.exception('Could not call task: %r',exc)
[文档]defon_elect(self,event):try:(id_,clock,hostname,pid,topic,action,_)=self._cons_stamp_fields(event)exceptKeyErrorasexc:returnlogger.exception('election request missing field %s',exc)heappush(self.consensus_requests[id_],(clock,f'{hostname}.{pid}',topic,action),)self.dispatcher.send('worker-elect-ack',id=id_)
[文档]defon_elect_ack(self,event):id=event['id']try:replies=self.consensus_replies[id]exceptKeyError:return# not for usalive_workers=set(self.state.alive_workers())replies.append(event['hostname'])iflen(replies)>=len(alive_workers):_,leader,topic,action=self.clock.sort_heap(self.consensus_requests[id],)ifleader==self.full_hostname:info('I won the election %r',id)try:handler=self.election_handlers[topic]exceptKeyError:logger.exception('Unknown election topic %r',topic)else:handler(action)else:info('node %s elected for %r',leader,id)self.consensus_requests.pop(id,None)self.consensus_replies.pop(id,None)
[文档]defon_node_join(self,worker):debug('%s joined the party',worker.hostname)self._call_handlers(self.on.node_join,worker)
[文档]defon_message(self,prepare,message):_type=message.delivery_info['routing_key']# For redis when `fanout_patterns=False` (See Issue #1882)if_type.split('.',1)[0]=='task':returntry:handler=self.event_handlers[_type]exceptKeyError:passelse:returnhandler(message.payload)# proto2: hostname in header; proto1: in bodyhostname=(message.headers.get('hostname')ormessage.payload['hostname'])ifhostname!=self.hostname:try:_,event=prepare(message.payload)self.update_state(event)except(DecodeError,ContentDisallowed,TypeError)asexc:logger.error(exc)else:self.clock.forward()