"""The CosmosDB/SQL backend for Celery (experimental)."""fromkombu.utilsimportcached_propertyfromkombu.utils.encodingimportbytes_to_strfromkombu.utils.urlimport_parse_urlfromcelery.exceptionsimportImproperlyConfiguredfromcelery.utils.logimportget_loggerfrom.baseimportKeyValueStoreBackendtry:importpydocumentdbfrompydocumentdb.document_clientimportDocumentClientfrompydocumentdb.documentsimportConnectionPolicy,ConsistencyLevel,PartitionKindfrompydocumentdb.errorsimportHTTPFailurefrompydocumentdb.retry_optionsimportRetryOptionsexceptImportError:pydocumentdb=DocumentClient=ConsistencyLevel=PartitionKind= \
HTTPFailure=ConnectionPolicy=RetryOptions=None__all__=("CosmosDBSQLBackend",)ERROR_NOT_FOUND=404ERROR_EXISTS=409LOGGER=get_logger(__name__)
[文档]classCosmosDBSQLBackend(KeyValueStoreBackend):"""CosmosDB/SQL backend for Celery."""def__init__(self,url=None,database_name=None,collection_name=None,consistency_level=None,max_retry_attempts=None,max_retry_wait_time=None,*args,**kwargs):super().__init__(*args,**kwargs)ifpydocumentdbisNone:raiseImproperlyConfigured("You need to install the pydocumentdb library to use the ""CosmosDB backend.")conf=self.app.confself._endpoint,self._key=self._parse_url(url)self._database_name=(database_nameorconf["cosmosdbsql_database_name"])self._collection_name=(collection_nameorconf["cosmosdbsql_collection_name"])try:self._consistency_level=getattr(ConsistencyLevel,consistency_levelorconf["cosmosdbsql_consistency_level"])exceptAttributeError:raiseImproperlyConfigured("Unknown CosmosDB consistency level")self._max_retry_attempts=(max_retry_attemptsorconf["cosmosdbsql_max_retry_attempts"])self._max_retry_wait_time=(max_retry_wait_timeorconf["cosmosdbsql_max_retry_wait_time"])@classmethoddef_parse_url(cls,url):_,host,port,_,password,_,_=_parse_url(url)ifnothostornotpassword:raiseImproperlyConfigured("Invalid URL")ifnotport:port=443scheme="https"ifport==443else"http"endpoint=f"{scheme}://{host}:{port}"returnendpoint,password@cached_propertydef_client(self):"""Return the CosmosDB/SQL client. If this is the first call to the property, the client is created and the database and collection are initialized if they don't yet exist. """connection_policy=ConnectionPolicy()connection_policy.RetryOptions=RetryOptions(max_retry_attempt_count=self._max_retry_attempts,max_wait_time_in_seconds=self._max_retry_wait_time)client=DocumentClient(self._endpoint,{"masterKey":self._key},connection_policy=connection_policy,consistency_level=self._consistency_level)self._create_database_if_not_exists(client)self._create_collection_if_not_exists(client)returnclientdef_create_database_if_not_exists(self,client):try:client.CreateDatabase({"id":self._database_name})exceptHTTPFailureasex:ifex.status_code!=ERROR_EXISTS:raiseelse:LOGGER.info("Created CosmosDB database %s",self._database_name)def_create_collection_if_not_exists(self,client):try:client.CreateCollection(self._database_link,{"id":self._collection_name,"partitionKey":{"paths":["/id"],"kind":PartitionKind.Hash}})exceptHTTPFailureasex:ifex.status_code!=ERROR_EXISTS:raiseelse:LOGGER.info("Created CosmosDB collection %s/%s",self._database_name,self._collection_name)@cached_propertydef_database_link(self):return"dbs/"+self._database_name@cached_propertydef_collection_link(self):returnself._database_link+"/colls/"+self._collection_namedef_get_document_link(self,key):returnself._collection_link+"/docs/"+key@classmethoddef_get_partition_key(cls,key):ifnotkeyorkey.isspace():raiseValueError("Key cannot be none, empty or whitespace.")return{"partitionKey":key}
[文档]defget(self,key):"""Read the value stored at the given key. Args: key: The key for which to read the value. """key=bytes_to_str(key)LOGGER.debug("Getting CosmosDB document %s/%s/%s",self._database_name,self._collection_name,key)try:document=self._client.ReadDocument(self._get_document_link(key),self._get_partition_key(key))exceptHTTPFailureasex:ifex.status_code!=ERROR_NOT_FOUND:raisereturnNoneelse:returndocument.get("value")
[文档]defset(self,key,value):"""Store a value for a given key. Args: key: The key at which to store the value. value: The value to store. """key=bytes_to_str(key)LOGGER.debug("Creating CosmosDB document %s/%s/%s",self._database_name,self._collection_name,key)self._client.CreateDocument(self._collection_link,{"id":key,"value":value},self._get_partition_key(key))
[文档]defmget(self,keys):"""Read all the values for the provided keys. Args: keys: The list of keys to read. """return[self.get(key)forkeyinkeys]
[文档]defdelete(self,key):"""Delete the value at a given key. Args: key: The key of the value to delete. """key=bytes_to_str(key)LOGGER.debug("Deleting CosmosDB document %s/%s/%s",self._database_name,self._collection_name,key)self._client.DeleteDocument(self._get_document_link(key),self._get_partition_key(key))