nameko.testing.waiting 源代码

import sys
from contextlib import contextmanager
from threading import Semaphore

import six
from mock import patch


[文档] class WaitResult(object):
[文档] sentinel = object()
[文档] res = sentinel
[文档] exc_info = None
[文档] class NotReady(Exception): pass
@property
[文档] def has_result(self): return ( self.res is not self.sentinel or self.exc_info is not None )
[文档] def send(self, res, exc_info): if not self.has_result: self.res = res self.exc_info = exc_info
[文档] def get(self): if not self.has_result: raise WaitResult.NotReady() if self.exc_info is not None: six.reraise(*self.exc_info) return self.res
@contextmanager
[文档] def wait_for_call(obj, target, callback=None): sem = Semaphore(0) result = WaitResult() unpatched = getattr(obj, target) def maybe_release(args, kwargs, res, exc_info): should_release = True if callable(callback): should_release = callback(args, kwargs, res, exc_info) if should_release: result.send(res, exc_info) sem.release() def wraps(*args, **kwargs): res = None exc_info = None try: res = unpatched(*args, **kwargs) except Exception: exc_info = sys.exc_info() maybe_release(args, kwargs, res, exc_info) if exc_info is not None: six.reraise(*exc_info) return res with patch.object(obj, target, new=wraps): yield result sem.acquire()