"""Utilities for safely pickling exceptions."""importdatetimeimportnumbersimportsysfrombase64importb64decodeasbase64decodefrombase64importb64encodeasbase64encodefromfunctoolsimportpartialfrominspectimportgetmrofromitertoolsimporttakewhilefromkombu.utils.encodingimportbytes_to_str,safe_repr,str_to_bytestry:importcPickleaspickleexceptImportError:importpickle__all__=('UnpickleableExceptionWrapper','subclass_exception','find_pickleable_exception','create_exception_cls','get_pickleable_exception','get_pickleable_etype','get_pickled_exception','strtobool',)#: List of base classes we probably don't want to reduce to.unwanted_base_classes=(Exception,BaseException,object)STRTOBOOL_DEFAULT_TABLE={'false':False,'no':False,'0':False,'true':True,'yes':True,'1':True,'on':True,'off':False}
[文档]defsubclass_exception(name,parent,module):"""Create new exception class."""returntype(name,(parent,),{'__module__':module})
[文档]deffind_pickleable_exception(exc,loads=pickle.loads,dumps=pickle.dumps):"""Find first pickleable exception base class. With an exception instance, iterate over its super classes (by MRO) and find the first super exception that's pickleable. It does not go below :exc:`Exception` (i.e., it skips :exc:`Exception`, :class:`BaseException` and :class:`object`). If that happens you should use :exc:`UnpickleableException` instead. Arguments: exc (BaseException): An exception instance. loads: decoder to use. dumps: encoder to use Returns: Exception: Nearest pickleable parent exception class (except :exc:`Exception` and parents), or if the exception is pickleable it will return :const:`None`. """exc_args=getattr(exc,'args',[])forsuperclsinitermro(exc.__class__,unwanted_base_classes):try:superexc=supercls(*exc_args)loads(dumps(superexc))exceptException:# pylint: disable=broad-exceptpasselse:returnsuperexc
[文档]defcreate_exception_cls(name,module,parent=None):"""Dynamically create an exception class."""ifnotparent:parent=Exceptionreturnsubclass_exception(name,parent,module)
defensure_serializable(items,encoder):"""Ensure items will serialize. For a given list of arbitrary objects, return the object or a string representation, safe for serialization. Arguments: items (Iterable[Any]): Objects to serialize. encoder (Callable): Callable function to serialize with. """safe_exc_args=[]forarginitems:try:encoder(arg)safe_exc_args.append(arg)exceptException:# pylint: disable=broad-exceptsafe_exc_args.append(safe_repr(arg))returntuple(safe_exc_args)
[文档]classUnpickleableExceptionWrapper(Exception):"""Wraps unpickleable exceptions. Arguments: exc_module (str): See :attr:`exc_module`. exc_cls_name (str): See :attr:`exc_cls_name`. exc_args (Tuple[Any, ...]): See :attr:`exc_args`. Example: >>> def pickle_it(raising_function): ... try: ... raising_function() ... except Exception as e: ... exc = UnpickleableExceptionWrapper( ... e.__class__.__module__, ... e.__class__.__name__, ... e.args, ... ) ... pickle.dumps(exc) # Works fine. """#: The module of the original exception.exc_module=None#: The name of the original exception class.exc_cls_name=None#: The arguments for the original exception.exc_args=Nonedef__init__(self,exc_module,exc_cls_name,exc_args,text=None):safe_exc_args=ensure_serializable(exc_args,lambdav:pickle.loads(pickle.dumps(v)))self.exc_module=exc_moduleself.exc_cls_name=exc_cls_nameself.exc_args=safe_exc_argsself.text=textsuper().__init__(exc_module,exc_cls_name,safe_exc_args,text)
[文档]defget_pickled_exception(exc):"""Reverse of :meth:`get_pickleable_exception`."""ifisinstance(exc,UnpickleableExceptionWrapper):returnexc.restore()returnexc
[文档]defstrtobool(term,table=None):"""Convert common terms for true/false to bool. Examples (true/false/yes/no/on/off/1/0). """iftableisNone:table=STRTOBOOL_DEFAULT_TABLEifisinstance(term,str):try:returntable[term.lower()]exceptKeyError:raiseTypeError(f'Cannot coerce {term!r} to type bool')returnterm
def_datetime_to_json(dt):# See "Date Time String Format" in the ECMA-262 specification.ifisinstance(dt,datetime.datetime):r=dt.isoformat()ifdt.microsecond:r=r[:23]+r[26:]ifr.endswith('+00:00'):r=r[:-6]+'Z'returnrelifisinstance(dt,datetime.time):r=dt.isoformat()ifdt.microsecond:r=r[:12]returnrelse:returndt.isoformat()defjsonify(obj,builtin_types=(numbers.Real,str),key=None,keyfilter=None,unknown_type_filter=None):"""Transform object making it suitable for json serialization."""fromkombu.abstractimportObjectasKombuDictType_jsonify=partial(jsonify,builtin_types=builtin_types,key=key,keyfilter=keyfilter,unknown_type_filter=unknown_type_filter)ifisinstance(obj,KombuDictType):obj=obj.as_dict(recurse=True)ifobjisNoneorisinstance(obj,builtin_types):returnobjelifisinstance(obj,(tuple,list)):return[_jsonify(v)forvinobj]elifisinstance(obj,dict):return{k:_jsonify(v,key=k)fork,vinobj.items()if(keyfilter(k)ifkeyfilterelse1)}elifisinstance(obj,(datetime.date,datetime.time)):return_datetime_to_json(obj)elifisinstance(obj,datetime.timedelta):returnstr(obj)else:ifunknown_type_filterisNone:raiseValueError(f'Unsupported type: {type(obj)!r}{obj!r} (parent: {key})')returnunknown_type_filter(obj)defraise_with_context(exc):exc_info=sys.exc_info()ifnotexc_info:raiseexcelifexc_info[1]isexc:raiseraiseexcfromexc_info[1]