"""Google Cloud Storage result store backend for Celery."""fromconcurrent.futuresimportThreadPoolExecutorfromdatetimeimportdatetime,timedeltafromosimportgetpidfromthreadingimportRLockfromkombu.utils.encodingimportbytes_to_strfromkombu.utils.functionalimportdictfilterfromkombu.utils.urlimporturl_to_partsfromcelery.canvasimportmaybe_signaturefromcelery.exceptionsimportChordError,ImproperlyConfiguredfromcelery.resultimportGroupResult,allow_join_resultfromcelery.utils.logimportget_loggerfrom.baseimportKeyValueStoreBackendtry:importrequestsfromgoogle.api_coreimportretryfromgoogle.api_core.exceptionsimportConflictfromgoogle.api_core.retryimportif_exception_typefromgoogle.cloudimportstoragefromgoogle.cloud.storageimportClientfromgoogle.cloud.storage.retryimportDEFAULT_RETRYexceptImportError:storage=Nonetry:fromgoogle.cloudimportfirestore,firestore_admin_v1exceptImportError:firestore=Nonefirestore_admin_v1=None__all__=('GCSBackend',)logger=get_logger(__name__)classGCSBackendBase(KeyValueStoreBackend):"""Google Cloud Storage task result backend."""def__init__(self,**kwargs):ifnotstorage:raiseImproperlyConfigured('You must install google-cloud-storage to use gcs backend')super().__init__(**kwargs)self._client_lock=RLock()self._pid=getpid()self._retry_policy=DEFAULT_RETRYself._client=Noneconf=self.app.confifself.url:url_params=self._params_from_url()conf.update(**dictfilter(url_params))self.bucket_name=conf.get('gcs_bucket')ifnotself.bucket_name:raiseImproperlyConfigured('Missing bucket name: specify gcs_bucket to use gcs backend')self.project=conf.get('gcs_project')ifnotself.project:raiseImproperlyConfigured('Missing project:specify gcs_project to use gcs backend')self.base_path=conf.get('gcs_base_path','').strip('/')self._threadpool_maxsize=int(conf.get('gcs_threadpool_maxsize',10))self.ttl=float(conf.get('gcs_ttl')or0)ifself.ttl<0:raiseImproperlyConfigured(f'Invalid ttl: {self.ttl} must be greater than or equal to 0')elifself.ttl:ifnotself._is_bucket_lifecycle_rule_exists():raiseImproperlyConfigured(f'Missing lifecycle rule to use gcs backend with ttl on 'f'bucket: {self.bucket_name}')defget(self,key):key=bytes_to_str(key)blob=self._get_blob(key)try:returnblob.download_as_bytes(retry=self._retry_policy)exceptstorage.blob.NotFound:returnNonedefset(self,key,value):key=bytes_to_str(key)blob=self._get_blob(key)ifself.ttl:blob.custom_time=datetime.utcnow()+timedelta(seconds=self.ttl)blob.upload_from_string(value,retry=self._retry_policy)defdelete(self,key):key=bytes_to_str(key)blob=self._get_blob(key)ifblob.exists():blob.delete(retry=self._retry_policy)defmget(self,keys):withThreadPoolExecutor()aspool:returnlist(pool.map(self.get,keys))@propertydefclient(self):"""Returns a storage client."""# make sure it's thread-safe, as creating a new client is expensivewithself._client_lock:ifself._clientandself._pid==getpid():returnself._client# make sure each process gets its own connection after a forkself._client=Client(project=self.project)self._pid=getpid()# config the number of connections to the serveradapter=requests.adapters.HTTPAdapter(pool_connections=self._threadpool_maxsize,pool_maxsize=self._threadpool_maxsize,max_retries=3,)client_http=self._client._httpclient_http.mount("https://",adapter)client_http._auth_request.session.mount("https://",adapter)returnself._client@propertydefbucket(self):returnself.client.bucket(self.bucket_name)def_get_blob(self,key):key_bucket_path=f'{self.base_path}/{key}'ifself.base_pathelsekeyreturnself.bucket.blob(key_bucket_path)def_is_bucket_lifecycle_rule_exists(self):bucket=self.bucketbucket.reload()forruleinbucket.lifecycle_rules:ifrule['action']['type']=='Delete':returnTruereturnFalsedef_params_from_url(self):url_parts=url_to_parts(self.url)return{'gcs_bucket':url_parts.hostname,'gcs_base_path':url_parts.path,**url_parts.query,}
[文档]classGCSBackend(GCSBackendBase):"""Google Cloud Storage task result backend. Uses Firestore for chord ref count. """implements_incr=Truesupports_native_join=True# Firestore parameters_collection_name='celery'_field_count='chord_count'_field_expires='expires_at'def__init__(self,**kwargs):ifnot(firestoreandfirestore_admin_v1):raiseImproperlyConfigured('You must install google-cloud-firestore to use gcs backend')super().__init__(**kwargs)self._firestore_lock=RLock()self._firestore_client=Noneself.firestore_project=self.app.conf.get('firestore_project',self.project)ifnotself._is_firestore_ttl_policy_enabled():raiseImproperlyConfigured(f'Missing TTL policy to use gcs backend with ttl on 'f'Firestore collection: {self._collection_name} 'f'project: {self.firestore_project}')@propertydeffirestore_client(self):"""Returns a firestore client."""# make sure it's thread-safe, as creating a new client is expensivewithself._firestore_lock:ifself._firestore_clientandself._pid==getpid():returnself._firestore_client# make sure each process gets its own connection after a forkself._firestore_client=firestore.Client(project=self.firestore_project)self._pid=getpid()returnself._firestore_clientdef_is_firestore_ttl_policy_enabled(self):client=firestore_admin_v1.FirestoreAdminClient()name=(f"projects/{self.firestore_project}"f"/databases/(default)/collectionGroups/{self._collection_name}"f"/fields/{self._field_expires}")request=firestore_admin_v1.GetFieldRequest(name=name)field=client.get_field(request=request)ttl_config=field.ttl_configreturnttl_configandttl_config.statein{firestore_admin_v1.Field.TtlConfig.State.ACTIVE,firestore_admin_v1.Field.TtlConfig.State.CREATING,}def_apply_chord_incr(self,header_result_args,body,**kwargs):key=self.get_key_for_chord(header_result_args[0]).decode()self._expire_chord_key(key,86400)returnsuper()._apply_chord_incr(header_result_args,body,**kwargs)
[文档]defon_chord_part_return(self,request,state,result,**kwargs):"""Chord part return callback. Called for each task in the chord. Increments the counter stored in Firestore. If the counter reaches the number of tasks in the chord, the callback is called. If the callback raises an exception, the chord is marked as errored. If the callback returns a value, the chord is marked as successful. """app=self.appgid=request.groupifnotgid:returnkey=self.get_key_for_chord(gid)val=self.incr(key)size=request.chord.get("chord_size")ifsizeisNone:deps=self._restore_deps(gid,request)ifdepsisNone:returnsize=len(deps)ifval>size:# pragma: no coverlogger.warning('Chord counter incremented too many times for %r',gid)elifval==size:# Read the deps once, to reduce the number of reads from GCS ($$)deps=self._restore_deps(gid,request)ifdepsisNone:returncallback=maybe_signature(request.chord,app=app)j=deps.join_nativetry:withallow_join_result():ret=j(timeout=app.conf.result_chord_join_timeout,propagate=True,)exceptExceptionasexc:# pylint: disable=broad-excepttry:culprit=next(deps._failed_join_report())reason='Dependency {0.id} raised {1!r}'.format(culprit,exc,)exceptStopIteration:reason=repr(exc)logger.exception('Chord %r raised: %r',gid,reason)self.chord_error_from_stack(callback,ChordError(reason))else:try:callback.delay(ret)exceptExceptionasexc:# pylint: disable=broad-exceptlogger.exception('Chord %r raised: %r',gid,exc)self.chord_error_from_stack(callback,ChordError(f'Callback error: {exc!r}'),)finally:deps.delete()# Firestore doesn't have an exact ttl policy, so delete the key.self._delete_chord_key(key)
def_restore_deps(self,gid,request):app=self.apptry:deps=GroupResult.restore(gid,backend=self)exceptExceptionasexc:# pylint: disable=broad-exceptcallback=maybe_signature(request.chord,app=app)logger.exception('Chord %r raised: %r',gid,exc)self.chord_error_from_stack(callback,ChordError(f'Cannot restore group: {exc!r}'),)returnifdepsisNone:try:raiseValueError(gid)exceptValueErrorasexc:callback=maybe_signature(request.chord,app=app)logger.exception('Chord callback %r raised: %r',gid,exc)self.chord_error_from_stack(callback,ChordError(f'GroupResult {gid} no longer exists'),)returndepsdef_delete_chord_key(self,key):doc=self._firestore_document(key)doc.delete()def_expire_chord_key(self,key,expires):"""Set TTL policy for a Firestore document. Firestore ttl data is typically deleted within 24 hours after its expiration date. """val_expires=datetime.utcnow()+timedelta(seconds=expires)doc=self._firestore_document(key)doc.set({self._field_expires:val_expires},merge=True)def_firestore_document(self,key):returnself.firestore_client.collection(self._collection_name).document(bytes_to_str(key))