"""ArangoDb result store backend."""# pylint: disable=W1202,W0703fromdatetimeimporttimedeltafromkombu.utils.objectsimportcached_propertyfromkombu.utils.urlimport_parse_urlfromcelery.exceptionsimportImproperlyConfiguredfrom.baseimportKeyValueStoreBackendtry:frompyArangoimportconnectionaspy_arango_connectionfrompyArango.theExceptionsimportAQLQueryErrorexceptImportError:py_arango_connection=AQLQueryError=None__all__=('ArangoDbBackend',)
[文档]classArangoDbBackend(KeyValueStoreBackend):"""ArangoDb backend. Sample url "arangodb://username:password@host:port/database/collection" *arangodb_backend_settings* is where the settings are present (in the app.conf) Settings should contain the host, port, username, password, database name, collection name else the default will be chosen. Default database name and collection name is celery. Raises ------ celery.exceptions.ImproperlyConfigured: if module :pypi:`pyArango` is not available. """host='127.0.0.1'port='8529'database='celery'collection='celery'username=Nonepassword=None# protocol is not supported in backend url (http is taken as default)http_protocol='http'verify=False# Use str as arangodb key not byteskey_t=strdef__init__(self,url=None,*args,**kwargs):"""Parse the url or load the settings from settings object."""super().__init__(*args,**kwargs)ifpy_arango_connectionisNone:raiseImproperlyConfigured('You need to install the pyArango library to use the ''ArangoDb backend.',)self.url=urlifurlisNone:host=port=database=collection=username=password=Noneelse:(_schema,host,port,username,password,database_collection,_query)=_parse_url(url)ifdatabase_collectionisNone:database=collection=Noneelse:database,collection=database_collection.split('/')config=self.app.conf.get('arangodb_backend_settings',None)ifconfigisnotNone:ifnotisinstance(config,dict):raiseImproperlyConfigured('ArangoDb backend settings should be grouped in a dict',)else:config={}self.host=hostorconfig.get('host',self.host)self.port=int(portorconfig.get('port',self.port))self.http_protocol=config.get('http_protocol',self.http_protocol)self.verify=config.get('verify',self.verify)self.database=databaseorconfig.get('database',self.database)self.collection= \
collectionorconfig.get('collection',self.collection)self.username=usernameorconfig.get('username',self.username)self.password=passwordorconfig.get('password',self.password)self.arangodb_url="{http_protocol}://{host}:{port}".format(http_protocol=self.http_protocol,host=self.host,port=self.port)self._connection=None@propertydefconnection(self):"""Connect to the arangodb server."""ifself._connectionisNone:self._connection=py_arango_connection.Connection(arangoURL=self.arangodb_url,username=self.username,password=self.password,verify=self.verify)returnself._connection@propertydefdb(self):"""Database Object to the given database."""returnself.connection[self.database]@cached_propertydefexpires_delta(self):returntimedelta(seconds=0ifself.expiresisNoneelseself.expires)
[文档]defmget(self,keys):ifkeysisNone:returnquery=self.db.AQLQuery("FOR k IN @keys RETURN DOCUMENT(@@collection, k).task",rawResults=True,bindVars={"@collection":self.collection,"keys":keysifisinstance(keys,list)elselist(keys),},)whileTrue:yield fromquerytry:query.nextBatch()exceptStopIteration:break
[文档]defdelete(self,key):ifkeyisNone:returnself.db.AQLQuery("REMOVE {_key: @key} IN @@collection",bindVars={"@collection":self.collection,"key":key,},)
[文档]defcleanup(self):ifnotself.expires:returncheckpoint=(self.app.now()-self.expires_delta).isoformat()self.db.AQLQuery(""" FOR record IN @@collection FILTER record.task.date_done < @checkpoint REMOVE record IN @@collection """,bindVars={"@collection":self.collection,"checkpoint":checkpoint,},)