nameko.testing.utils

Common testing utilities.

Attributes

Classes

AnyInstanceOf

DummyProvider

入口点封装类

ResourcePipeline

Creates and destroys resources in background threads.

Functions

get_extension(container, extension_cls, **match_attrs)

Inspect container.extensions and return the first item that is

get_container(runner, service_cls)

Inspect runner.containers and return the first item that is

wait_for_call(timeout, mock_method)

Return a context manager that waits timeout seconds for

wait_for_worker_idle(container[, timeout])

Blocks until container has no running workers.

assert_stops_raising(fn[, exception_type, timeout, ...])

Assert that fn returns successfully within timeout

unpack_mock_call(call)

get_rabbit_connections(vhost, rabbit_manager)

reset_rabbit_connections(vhost, rabbit_manager)

find_free_port([host])

Module Contents

nameko.testing.utils.get_extension(container, extension_cls, **match_attrs)[源代码]

Inspect container.extensions and return the first item that is an instance of extension_cls.

Optionally also require that the instance has an attribute with a particular value as given in the match_attrs kwargs.

nameko.testing.utils.get_container(runner, service_cls)[源代码]

Inspect runner.containers and return the first item that is hosting an instance of service_cls.

nameko.testing.utils.wait_for_call(timeout, mock_method)[源代码]

Return a context manager that waits timeout seconds for mock_method to be called, yielding the mock if so.

Raises an eventlet.Timeout if the method was not called within timeout seconds.

nameko.testing.utils.wait_for_worker_idle(container, timeout=10)[源代码]

Blocks until container has no running workers.

Raises an eventlet.Timeout if the method was not called within timeout seconds.

nameko.testing.utils.assert_stops_raising(fn, exception_type=Exception, timeout=10, interval=0.1)[源代码]

Assert that fn returns successfully within timeout seconds, trying every interval seconds.

If exception_type is provided, fail unless the exception thrown is an instance of exception_type. If not specified, any :class:`Exception instance is allowed.

nameko.testing.utils.MockCallArgs[源代码]
nameko.testing.utils.unpack_mock_call(call)[源代码]
class nameko.testing.utils.AnyInstanceOf(cls)[源代码]

Bases: object

cls[源代码]
__eq__(other)[源代码]
__ne__(other)[源代码]
__repr__()[源代码]
nameko.testing.utils.ANY_PARTIAL[源代码]
class nameko.testing.utils.DummyProvider(method_name=None)[源代码]

Bases: nameko.extensions.Entrypoint

入口点封装类

Parameters:
expected_exceptions异常类或异常类元组

指定可能由调用者引起的异常(例如,通过提供错误的参数)。 保存在入口点实例中作为 entrypoint.expected_exceptions,供其他扩展(例如监控系统)后续检查。

sensitive_arguments字符串或字符串元组

将参数或参数的一部分标记为敏感。保存在入口点实例中作为 entrypoint.sensitive_arguments, 供其他扩展(例如日志系统)后续检查。

seealso:

nameko.utils.get_redacted_args()

method_name[源代码]

记录了RPC调用的方法名称

nameko.testing.utils.get_rabbit_connections(vhost, rabbit_manager)[源代码]
nameko.testing.utils.reset_rabbit_connections(vhost, rabbit_manager)[源代码]
nameko.testing.utils.find_free_port(host='127.0.0.1')[源代码]
class nameko.testing.utils.ResourcePipeline(create, destroy, size=3)[源代码]

Bases: object

Creates and destroys resources in background threads.

Creates up to size resources ahead of time so the caller avoids waiting for lazy creation.

STOP[源代码]
ready[源代码]
trash[源代码]
size[源代码]
create[源代码]
destroy[源代码]
_start()[源代码]
_shutdown()[源代码]
_create()[源代码]
_destroy()[源代码]
get()[源代码]
run()[源代码]