"""Secure serializer."""fromkombu.serializationimportdumps,loads,registryfromkombu.utils.encodingimportbytes_to_str,ensure_bytes,str_to_bytesfromcelery.app.defaultsimportDEFAULT_SECURITY_DIGESTfromcelery.utils.serializationimportb64decode,b64encodefrom.certificateimportCertificate,FSCertStorefrom.keyimportPrivateKeyfrom.utilsimportget_digest_algorithm,reraise_errors__all__=('SecureSerializer','register_auth')# Note: we guarantee that this value won't appear in the serialized data,# so we can use it as a separator.# If you change this value, make sure it's not present in the serialized data.DEFAULT_SEPARATOR=str_to_bytes("\x00\x01")
[文档]defserialize(self,data):"""Serialize data structure into string."""assertself._keyisnotNoneassertself._certisnotNonewithreraise_errors('Unable to serialize: {0!r}',(Exception,)):content_type,content_encoding,body=dumps(data,serializer=self._serializer)# What we sign is the serialized body, not the body itself.# this way the receiver doesn't have to decode the contents# to verify the signature (and thus avoiding potential flaws# in the decoding step).body=ensure_bytes(body)returnself._pack(body,content_type,content_encoding,signature=self._key.sign(body,self._digest),signer=self._cert.get_id())
[文档]defdeserialize(self,data):"""Deserialize data structure from string."""assertself._cert_storeisnotNonewithreraise_errors('Unable to deserialize: {0!r}',(Exception,)):payload=self._unpack(data)signature,signer,body=(payload['signature'],payload['signer'],payload['body'])self._cert_store[signer].verify(body,signature,self._digest)returnloads(body,payload['content_type'],payload['content_encoding'],force=True)