"""File-system result store backend."""importlocaleimportosfromdatetimeimportdatetimefromkombu.utils.encodingimportensure_bytesfromceleryimportuuidfromcelery.backends.baseimportKeyValueStoreBackendfromcelery.exceptionsimportImproperlyConfigureddefault_encoding=locale.getpreferredencoding(False)E_NO_PATH_SET='You need to configure a path for the file-system backend'E_PATH_NON_CONFORMING_SCHEME=('A path for the file-system backend should conform to the file URI scheme')E_PATH_INVALID="""\The configured path for the file-system backend does notwork correctly, please make sure that it exists and hasthe correct permissions.\"""
[文档]classFilesystemBackend(KeyValueStoreBackend):"""File-system result backend. Arguments: url (str): URL to the directory we should use open (Callable): open function to use when opening files unlink (Callable): unlink function to use when deleting files sep (str): directory separator (to join the directory with the key) encoding (str): encoding used on the file-system """def__init__(self,url=None,open=open,unlink=os.unlink,sep=os.sep,encoding=default_encoding,*args,**kwargs):super().__init__(*args,**kwargs)self.url=urlpath=self._find_path(url)# Remove forwarding "/" for Windows osifos.name=="nt"andpath.startswith("/"):path=path[1:]# We need the path and separator as bytes objectsself.path=path.encode(encoding)self.sep=sep.encode(encoding)self.open=openself.unlink=unlink# Let's verify that we've everything setup rightself._do_directory_test(b'.fs-backend-'+uuid().encode(encoding))def__reduce__(self,args=(),kwargs=None):kwargs={}ifnotkwargselsekwargsreturnsuper().__reduce__(args,{**kwargs,'url':self.url})def_find_path(self,url):ifnoturl:raiseImproperlyConfigured(E_NO_PATH_SET)ifurl.startswith('file://localhost/'):returnurl[16:]ifurl.startswith('file://'):returnurl[7:]raiseImproperlyConfigured(E_PATH_NON_CONFORMING_SCHEME)def_do_directory_test(self,key):try:self.set(key,b'test value')assertself.get(key)==b'test value'self.delete(key)exceptOSError:raiseImproperlyConfigured(E_PATH_INVALID)def_filename(self,key):returnself.sep.join((self.path,key))