nameko.testing.services

Utilities for testing nameko services.

Attributes

Exceptions

EntrypointWaiterTimeout

Common base class for all non-exit exceptions.

Classes

MockDependencyProvider

请注意,Extension.__init__bind 过程中以及实例化时都会被调用,因此请避免在此方法中产生副作用。请使用 setup

Once

Entrypoint that spawns a worker exactly once, as soon as

Functions

entrypoint_hook(container, method_name[, ...])

Yield a function providing an entrypoint into a hosted service.

entrypoint_waiter(container, method_name[, timeout, ...])

Context manager that waits until an entrypoint has fired, and

worker_factory(service_cls, **dependencies)

Return an instance of service_cls with its injected dependencies

_replace_dependencies(container, **dependency_map)

replace_dependencies(container, *dependencies, ...)

MockDependencyProvider 的实例替换 container 上的依赖提供者。

restrict_entrypoints(container, *entrypoints)

限制 container 上的入口点,仅限于 entrypoints 中指定的名称。

Module Contents

nameko.testing.services.entrypoint_hook(container, method_name, context_data=None, timeout=30)[源代码]

Yield a function providing an entrypoint into a hosted service.

The yielded function may be called as if it were the bare method defined in the service class. Intended to be used as an integration testing utility.

Parameters:
containerServiceContainer

The container hosting the service owning the entrypoint

method_namestr

The name of the entrypoint decorated method on the service class

context_datadict

Context data to provide for the call, e.g. a language, auth token or session.

timeoutint

Maximum seconds to wait

Usage

To verify that ServiceX and ServiceY are compatible, make an integration test that checks their interaction:

nameko.testing.services.entrypoint_waiter(container, method_name, timeout=30, callback=None)[源代码]

Context manager that waits until an entrypoint has fired, and the generated worker has exited and been torn down.

It yields a nameko.testing.waiting.WaitResult object that can be used to get the result returned (exception raised) by the entrypoint after the waiter has exited.

Parameters:
containerServiceContainer

The container hosting the service owning the entrypoint

method_namestr

The name of the entrypoint decorated method on the service class

timeoutint

Maximum seconds to wait

callbackcallable

Function to conditionally control whether the entrypoint_waiter should exit for a particular invocation

The timeout argument specifies the maximum number of seconds the entrypoint_waiter should wait before exiting. It can be disabled by passing None. The default is 30 seconds.

Optionally allows a callback to be provided which is invoked whenever the entrypoint fires. If provided, the callback must return True for the entrypoint_waiter to exit. The signature for the callback function is:

def callback(worker_ctx, result, exc_info):
    pass

Where there parameters are as follows:

worker_ctx (WorkerContext): WorkerContext of the entrypoint call.

result (object): The return value of the entrypoint.

exc_info (tuple): Tuple as returned by sys.exc_info if the

entrypoint raised an exception, otherwise None.

Usage

::
class Service(object):

name = "service"

@event_handler('srcservice', 'eventtype') def handle_event(self, msg):

return msg

container = ServiceContainer(Service, config) container.start()

# basic with entrypoint_waiter(container, 'handle_event'):

... # action that dispatches event

# giving access to the result with entrypoint_waiter(container, 'handle_event') as result:

... # action that dispatches event

res = result.get()

# with custom timeout with entrypoint_waiter(container, 'handle_event', timeout=5):

... # action that dispatches event

# with callback that waits until entrypoint stops raising def callback(worker_ctx, result, exc_info):

if exc_info is None:

return True

with entrypoint_waiter(container, 'handle_event', callback=callback):

... # action that dispatches event

exception nameko.testing.services.EntrypointWaiterTimeout[源代码]

Bases: Exception

Common base class for all non-exit exceptions.

Initialize self. See help(type(self)) for accurate signature.

nameko.testing.services.worker_factory(service_cls, **dependencies)[源代码]

Return an instance of service_cls with its injected dependencies replaced with MagicMock objects, or as given in dependencies.

Usage

The following example service proxies calls to a "maths" service via an RpcProxy dependency:

from nameko.rpc import RpcProxy, rpc

class ConversionService(object):
    name = "conversions"

    maths_rpc = RpcProxy("maths")

    @rpc
    def inches_to_cm(self, inches):
        return self.maths_rpc.multiply(inches, 2.54)

    @rpc
    def cm_to_inches(self, cms):
        return self.maths_rpc.divide(cms, 2.54)

Use the worker_factory to create an instance of ConversionService with its dependencies replaced by MagicMock objects:

service = worker_factory(ConversionService)

Nameko's entrypoints do not modify the service methods, so instance methods can be called directly with the same signature. The replaced dependencies can be used as any other MagicMock object, so a complete unit test for the conversion service may look like this:

# create worker instance
service = worker_factory(ConversionService)

# replace "maths" service
service.maths_rpc.multiply.side_effect = lambda x, y: x * y
service.maths_rpc.divide.side_effect = lambda x, y: x / y

# test inches_to_cm business logic
assert service.inches_to_cm(300) == 762
service.maths_rpc.multiply.assert_called_once_with(300, 2.54)

# test cms_to_inches business logic
assert service.cms_to_inches(762) == 300
service.maths_rpc.divide.assert_called_once_with(762, 2.54)

Providing Dependencies

The **dependencies kwargs to worker_factory can be used to provide a replacement dependency instead of a mock. For example, to unit test a service against a real database:

If a named dependency provider does not exist on service_cls, a ExtensionNotFound exception is raised.

class nameko.testing.services.MockDependencyProvider(attr_name, dependency=None)[源代码]

Bases: nameko.extensions.DependencyProvider

请注意,Extension.__init__bind 过程中以及实例化时都会被调用,因此请避免在此方法中产生副作用。请使用 setup

此外,binditer_extensions 使用反射来查找扩展可能声明的任何子扩展。扩展上的任何描述符应该预计在反射过程中被调用,这发生在 ServiceContainer.__init__ServiceContainer.setup 之间。

Extension.container 属性提供对绑定到该扩展的 nameko.containers.ServiceContainer 实例的访问,否则为 None

attr_name[源代码]
dependency[源代码]
get_dependency(worker_ctx)[源代码]

在工作者执行之前调用。依赖提供者应返回一个对象,以便容器将其注入到工作者实例中。

nameko.testing.services._replace_dependencies(container, **dependency_map)[源代码]
nameko.testing.services.replace_dependencies(container, *dependencies, **dependency_map)[源代码]

MockDependencyProvider 的实例替换 container 上的依赖提供者。

在 *dependencies 中命名的依赖项将被替换为 MockDependencyProvider ,该提供者注入一个 MagicMock 作为依赖项。

另外,您可以使用关键字参数来命名依赖项并提供 MockDependencyProvider 应该注入的替代值。

返回在 (*dependencies) 参数中指定的每个依赖项的 MockDependencyProvider.dependency ,以便可以检查对被替代依赖项的调用。如果只替换了一个依赖项,则返回单个对象;否则返回一个生成器,按与 dependencies 相同的顺序生成替代项。请注意,通过关键字参数 **dependency_map 指定的任何被替代依赖项将不会被返回。

替代项在容器实例上进行替换,对服务类没有影响。因此,新容器实例不会受到先前实例上的替代项的影响。

用法

from nameko.rpc import RpcProxy, rpc
from nameko.standalone.rpc import ServiceRpcProxy

class ConversionService(object):
    name = "conversions"

    maths_rpc = RpcProxy("maths")

    @rpc
    def inches_to_cm(self, inches):
        return self.maths_rpc.multiply(inches, 2.54)

    @rpc
    def cm_to_inches(self, cms):
        return self.maths_rpc.divide(cms, 2.54)

container = ServiceContainer(ConversionService, config)
mock_maths_rpc = replace_dependencies(container, "maths_rpc")
mock_maths_rpc.divide.return_value = 39.37

container.start()

with ServiceRpcProxy('conversions', config) as proxy:
    proxy.cm_to_inches(100)

# assert that the dependency was called as expected
mock_maths_rpc.divide.assert_called_once_with(100, 2.54)

通过关键字提供特定的替代项:

class StubMaths(object):

    def divide(self, val1, val2):
        return val1 / val2

replace_dependencies(container, maths_rpc=StubMaths())

container.start()

with ServiceRpcProxy('conversions', config) as proxy:
    assert proxy.cm_to_inches(127) == 50.0
nameko.testing.services.restrict_entrypoints(container, *entrypoints)[源代码]

限制 container 上的入口点,仅限于 entrypoints 中指定的名称。

此方法必须在容器启动之前调用。

用法

以下服务定义有两个入口点:

class Service(object):
    name = "service"

    @timer(interval=1)
    def foo(self, arg):
        pass

    @rpc
    def bar(self, arg)
        pass

    @rpc
    def baz(self, arg):
        pass

container = ServiceContainer(Service, config)

要禁用 foo 上的计时器入口点,仅保留 RPC 入口点:

restrict_entrypoints(container, "bar", "baz")

请注意,无法单独识别同一方法上的多个入口点。

class nameko.testing.services.Once(*args, **kwargs)[源代码]

Bases: nameko.extensions.Entrypoint

Entrypoint that spawns a worker exactly once, as soon as the service container started.

Parameters:
expected_exceptions异常类或异常类元组

指定可能由调用者引起的异常(例如,通过提供错误的参数)。 保存在入口点实例中作为 entrypoint.expected_exceptions,供其他扩展(例如监控系统)后续检查。

sensitive_arguments字符串或字符串元组

将参数或参数的一部分标记为敏感。保存在入口点实例中作为 entrypoint.sensitive_arguments, 供其他扩展(例如日志系统)后续检查。

seealso:

nameko.utils.get_redacted_args()

expected_exceptions[源代码]
sensitive_arguments[源代码]
sensitive_variables[源代码]
args[源代码]
kwargs[源代码]
start()[源代码]

在容器成功启动时调用绑定的扩展。

此方法仅在所有其他扩展成功返回 Extension.setup 后被调用。如果扩展对外部事件做出反应,它现在应该开始对此进行响应。

nameko.testing.services.once[源代码]
nameko.testing.services.dummy[源代码]