"""CouchDB result store backend."""fromkombu.utils.encodingimportbytes_to_strfromkombu.utils.urlimport_parse_urlfromcelery.exceptionsimportImproperlyConfiguredfrom.baseimportKeyValueStoreBackendtry:importpycouchdbexceptImportError:pycouchdb=None__all__=('CouchBackend',)ERR_LIB_MISSING="""\You need to install the pycouchdb library to use the CouchDB result backend\"""
[文档]classCouchBackend(KeyValueStoreBackend):"""CouchDB backend. Raises: celery.exceptions.ImproperlyConfigured: if module :pypi:`pycouchdb` is not available. """container='default'scheme='http'host='localhost'port=5984username=Nonepassword=Nonedef__init__(self,url=None,*args,**kwargs):super().__init__(*args,**kwargs)self.url=urlifpycouchdbisNone:raiseImproperlyConfigured(ERR_LIB_MISSING)uscheme=uhost=uport=uname=upass=ucontainer=Noneifurl:_,uhost,uport,uname,upass,ucontainer,_=_parse_url(url)ucontainer=ucontainer.strip('/')ifucontainerelseNoneself.scheme=uschemeorself.schemeself.host=uhostorself.hostself.port=int(uportorself.port)self.container=ucontainerorself.containerself.username=unameorself.usernameself.password=upassorself.passwordself._connection=Nonedef_get_connection(self):"""Connect to the CouchDB server."""ifself.usernameandself.password:conn_string=f'{self.scheme}://{self.username}:{self.password}@{self.host}:{self.port}'server=pycouchdb.Server(conn_string,authmethod='basic')else:conn_string=f'{self.scheme}://{self.host}:{self.port}'server=pycouchdb.Server(conn_string)try:returnserver.database(self.container)exceptpycouchdb.exceptions.NotFound:returnserver.create(self.container)@propertydefconnection(self):ifself._connectionisNone:self._connection=self._get_connection()returnself._connection