"""Worker name utilities."""from__future__importannotationsimportosimportsocketfromfunctoolsimportpartialfromkombu.entityimportExchange,Queuefrom.functionalimportmemoizefrom.textimportsimple_format#: Exchange for worker direct queues.WORKER_DIRECT_EXCHANGE=Exchange('C.dq2')#: Format for worker direct queue names.WORKER_DIRECT_QUEUE_FORMAT='{hostname}.dq2'#: Separator for worker node name and hostname.NODENAME_SEP='@'NODENAME_DEFAULT='celery'gethostname=memoize(1,Cache=dict)(socket.gethostname)__all__=('worker_direct','gethostname','nodename','anon_nodename','nodesplit','default_nodename','node_format','host_format',)
[文档]defworker_direct(hostname:str|Queue)->Queue:"""Return the :class:`kombu.Queue` being a direct route to a worker. Arguments: hostname (str, ~kombu.Queue): The fully qualified node name of a worker (e.g., ``w1@example.com``). If passed a :class:`kombu.Queue` instance it will simply return that instead. """ifisinstance(hostname,Queue):returnhostnamereturnQueue(WORKER_DIRECT_QUEUE_FORMAT.format(hostname=hostname),WORKER_DIRECT_EXCHANGE,hostname,)
[文档]defnodename(name:str,hostname:str)->str:"""Create node name from name/hostname pair."""returnNODENAME_SEP.join((name,hostname))
[文档]defanon_nodename(hostname:str|None=None,prefix:str='gen')->str:"""Return the nodename for this process (not a worker). This is used for e.g. the origin task message field. """returnnodename(''.join([prefix,str(os.getpid())]),hostnameorgethostname())
[文档]defnodesplit(name:str)->tuple[None,str]|list[str]:"""Split node name into tuple of name/hostname."""parts=name.split(NODENAME_SEP,1)iflen(parts)==1:returnNone,parts[0]returnparts
[文档]defdefault_nodename(hostname:str)->str:"""Return the default nodename for this process."""name,host=nodesplit(hostnameor'')returnnodename(nameorNODENAME_DEFAULT,hostorgethostname())
[文档]defnode_format(s:str,name:str,**extra:dict)->str:"""Format worker node name (name@host.com)."""shortname,host=nodesplit(name)returnhost_format(s,host,shortnameorNODENAME_DEFAULT,p=name,**extra)