"""Couchbase result store backend."""fromkombu.utils.urlimport_parse_urlfromcelery.exceptionsimportImproperlyConfiguredfrom.baseimportKeyValueStoreBackendtry:fromcouchbase.authimportPasswordAuthenticatorfromcouchbase.clusterimportClusterexceptImportError:Cluster=PasswordAuthenticator=Nonetry:fromcouchbase_core._libcouchbaseimportFMT_AUTOexceptImportError:FMT_AUTO=None__all__=('CouchbaseBackend',)
[文档]classCouchbaseBackend(KeyValueStoreBackend):"""Couchbase backend. Raises: celery.exceptions.ImproperlyConfigured: if module :pypi:`couchbase` is not available. """bucket='default'host='localhost'port=8091username=Nonepassword=Nonequiet=Falsesupports_autoexpire=Truetimeout=2.5# Use str as couchbase key not byteskey_t=strdef__init__(self,url=None,*args,**kwargs):kwargs.setdefault('expires_type',int)super().__init__(*args,**kwargs)self.url=urlifClusterisNone:raiseImproperlyConfigured('You need to install the couchbase library to use the ''Couchbase backend.',)uhost=uport=uname=upass=ubucket=Noneifurl:_,uhost,uport,uname,upass,ubucket,_=_parse_url(url)ubucket=ubucket.strip('/')ifubucketelseNoneconfig=self.app.conf.get('couchbase_backend_settings',None)ifconfigisnotNone:ifnotisinstance(config,dict):raiseImproperlyConfigured('Couchbase backend settings should be grouped in a dict',)else:config={}self.host=uhostorconfig.get('host',self.host)self.port=int(uportorconfig.get('port',self.port))self.bucket=ubucketorconfig.get('bucket',self.bucket)self.username=unameorconfig.get('username',self.username)self.password=upassorconfig.get('password',self.password)self._connection=Nonedef_get_connection(self):"""Connect to the Couchbase server."""ifself._connectionisNone:ifself.hostandself.port:uri=f"couchbase://{self.host}:{self.port}"else:uri=f"couchbase://{self.host}"ifself.usernameandself.password:opt=PasswordAuthenticator(self.username,self.password)else:opt=Nonecluster=Cluster(uri,opt)bucket=cluster.bucket(self.bucket)self._connection=bucket.default_collection()returnself._connection@propertydefconnection(self):returnself._get_connection()
[文档]defset(self,key,value):# Since 4.0.0 value is JSONType in couchbase lib, so parameter format isn't neededifFMT_AUTOisnotNone:self.connection.upsert(key,value,ttl=self.expires,format=FMT_AUTO)else:self.connection.upsert(key,value,ttl=self.expires)