"""Consul result store backend.- :class:`ConsulBackend` implements KeyValueStoreBackend to store results in the key-value store of Consul."""fromkombu.utils.encodingimportbytes_to_strfromkombu.utils.urlimportparse_urlfromcelery.backends.baseimportKeyValueStoreBackendfromcelery.exceptionsimportImproperlyConfiguredfromcelery.utils.logimportget_loggertry:importconsulexceptImportError:consul=Nonelogger=get_logger(__name__)__all__=('ConsulBackend',)CONSUL_MISSING="""\You need to install the python-consul library in order to use \the Consul result store backend."""
[文档]classConsulBackend(KeyValueStoreBackend):"""Consul.io K/V store backend for Celery."""consul=consulsupports_autoexpire=Trueconsistency='consistent'path=Nonedef__init__(self,*args,**kwargs):super().__init__(*args,**kwargs)ifself.consulisNone:raiseImproperlyConfigured(CONSUL_MISSING)## By default, for correctness, we use a client connection per# operation. If set, self.one_client will be used for all operations.# This provides for the original behaviour to be selected, and is# also convenient for mocking in the unit tests.#self.one_client=Noneself._init_from_params(**parse_url(self.url))def_init_from_params(self,hostname,port,virtual_host,**params):logger.debug('Setting on Consul client to connect to %s:%d',hostname,port)self.path=virtual_hostself.hostname=hostnameself.port=port## Optionally, allow a single client connection to be used to reduce# the connection load on Consul by adding a "one_client=1" parameter# to the URL.#ifparams.get('one_client',None):self.one_client=self.client()
[文档]defget(self,key):key=self._key_to_consul_key(key)logger.debug('Trying to fetch key %s from Consul',key)try:_,data=self.client().kv.get(key)returndata['Value']exceptTypeError:pass
[文档]defset(self,key,value):"""Set a key in Consul. Before creating the key it will create a session inside Consul where it creates a session with a TTL The key created afterwards will reference to the session's ID. If the session expires it will remove the key so that results can auto expire from the K/V store """session_name=bytes_to_str(key)key=self._key_to_consul_key(key)logger.debug('Trying to create Consul session %s with TTL %d',session_name,self.expires)client=self.client()session_id=client.session.create(name=session_name,behavior='delete',ttl=self.expires)logger.debug('Created Consul session %s',session_id)logger.debug('Writing key %s to Consul',key)returnclient.kv.put(key=key,value=value,acquire=session_id)
[文档]defdelete(self,key):key=self._key_to_consul_key(key)logger.debug('Removing key %s from Consul',key)returnself.client().kv.delete(key)