"""s3 result store backend."""fromkombu.utils.encodingimportbytes_to_strfromcelery.exceptionsimportImproperlyConfiguredfrom.baseimportKeyValueStoreBackendtry:importboto3importbotocoreexceptImportError:boto3=Nonebotocore=None__all__=('S3Backend',)
[文档]classS3Backend(KeyValueStoreBackend):"""An S3 task result store. Raises: celery.exceptions.ImproperlyConfigured: if module :pypi:`boto3` is not available, if the :setting:`aws_access_key_id` or setting:`aws_secret_access_key` are not set, or it the :setting:`bucket` is not set. """def__init__(self,**kwargs):super().__init__(**kwargs)ifnotboto3ornotbotocore:raiseImproperlyConfigured('You must install boto3''to use s3 backend')conf=self.app.confself.endpoint_url=conf.get('s3_endpoint_url',None)self.aws_region=conf.get('s3_region',None)self.aws_access_key_id=conf.get('s3_access_key_id',None)self.aws_secret_access_key=conf.get('s3_secret_access_key',None)self.bucket_name=conf.get('s3_bucket',None)ifnotself.bucket_name:raiseImproperlyConfigured('Missing bucket name')self.base_path=conf.get('s3_base_path',None)self._s3_resource=self._connect_to_s3()def_get_s3_object(self,key):key_bucket_path=self.base_path+keyifself.base_pathelsekeyreturnself._s3_resource.Object(self.bucket_name,key_bucket_path)