测试服务

哲学

Nameko 的约定旨在使测试尽可能简单。服务通常是小型且单一用途的,而依赖注入则使得替换和隔离功能模块变得简单。

以下示例使用 pytest ,这是 Nameko 自身测试套件所使用的,但这些辅助工具与测试框架无关。

单元测试

在 Nameko 中,单元测试通常意味着在隔离环境中测试单个服务——换句话说,不依赖于任何或大部分依赖项。

worker_factory() 工具将从给定的服务类创建一个 worker,其依赖项被 mock.MagicMock 对象替代。然后可以通过添加 side_effectreturn_value 来模拟依赖功能:

""" Service unit testing best practice.
"""

from nameko.rpc import RpcProxy, rpc
from nameko.testing.services import worker_factory


class ConversionService(object):
    """ Service under test
    """
    name = "conversions"

    maths_rpc = RpcProxy("maths")

    @rpc
    def inches_to_cm(self, inches):
        return self.maths_rpc.multiply(inches, 2.54)

    @rpc
    def cms_to_inches(self, cms):
        return self.maths_rpc.divide(cms, 2.54)


def test_conversion_service():
    # create worker with mock dependencies
    service = worker_factory(ConversionService)

    # add side effects to the mock proxy to the "maths" service
    service.maths_rpc.multiply.side_effect = lambda x, y: x * y
    service.maths_rpc.divide.side_effect = lambda x, y: x / y

    # test inches_to_cm business logic
    assert service.inches_to_cm(300) == 762
    service.maths_rpc.multiply.assert_called_once_with(300, 2.54)

    # test cms_to_inches business logic
    assert service.cms_to_inches(762) == 300
    service.maths_rpc.divide.assert_called_once_with(762, 2.54)

在某些情况下,提供替代依赖项而不是使用 mock 是有帮助的。这可以是一个完全功能的替代品(例如,测试数据库会话),或是一个提供部分功能的轻量级适配器。

""" Service unit testing best practice, with an alternative dependency.
"""

import pytest
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker

from nameko.rpc import rpc
from nameko.testing.services import worker_factory

# using community extension from http://pypi.python.org/pypi/nameko-sqlalchemy
from nameko_sqlalchemy import Session


Base = declarative_base()


class Result(Base):
    __tablename__ = 'model'
    id = Column(Integer, primary_key=True)
    value = Column(String(64))


class Service:
    """ Service under test
    """
    name = "service"

    db = Session(Base)

    @rpc
    def save(self, value):
        result = Result(value=value)
        self.db.add(result)
        self.db.commit()


@pytest.fixture
def session():
    """ Create a test database and session
    """
    engine = create_engine('sqlite:///:memory:')
    Base.metadata.create_all(engine)
    session_cls = sessionmaker(bind=engine)
    return session_cls()


def test_service(session):

    # create instance, providing the test database session
    service = worker_factory(Service, db=session)

    # verify ``save`` logic by querying the test database
    service.save("helloworld")
    assert session.query(Result.value).all() == [("helloworld",)]

集成测试

在 Nameko 中,集成测试意味着测试多个服务之间的接口。推荐的方法是以正常方式运行所有被测试的服务,并通过使用助手“触发”一个入口点来引发行为:

""" Service integration testing best practice.
"""

from nameko.rpc import rpc, RpcProxy
from nameko.testing.utils import get_container
from nameko.testing.services import entrypoint_hook


class ServiceX:
    """ Service under test
    """
    name = "service_x"

    y = RpcProxy("service_y")

    @rpc
    def remote_method(self, value):
        res = "{}-x".format(value)
        return self.y.append_identifier(res)


class ServiceY:
    """ Service under test
    """
    name = "service_y"

    @rpc
    def append_identifier(self, value):
        return "{}-y".format(value)


def test_service_x_y_integration(runner_factory, rabbit_config):

    # run services in the normal manner
    runner = runner_factory(rabbit_config, ServiceX, ServiceY)
    runner.start()

    # artificially fire the "remote_method" entrypoint on ServiceX
    # and verify response
    container = get_container(runner, ServiceX)
    with entrypoint_hook(container, "remote_method") as entrypoint:
        assert entrypoint("value") == "value-x-y"

请注意,这里 ServiceXServiceY 之间的接口就像在正常操作下一样。

对于特定测试不在范围内的接口,可以使用以下测试助手之一来禁用:

限制入口点

nameko.testing.services.restrict_entrypoints(container, *entrypoints)[源代码]

限制 container 上的入口点,仅限于 entrypoints 中指定的名称。

此方法必须在容器启动之前调用。

用法

以下服务定义有两个入口点:

class Service(object):
    name = "service"

    @timer(interval=1)
    def foo(self, arg):
        pass

    @rpc
    def bar(self, arg)
        pass

    @rpc
    def baz(self, arg):
        pass

container = ServiceContainer(Service, config)

要禁用 foo 上的计时器入口点,仅保留 RPC 入口点:

restrict_entrypoints(container, "bar", "baz")

请注意,无法单独识别同一方法上的多个入口点。

替换依赖项

nameko.testing.services.replace_dependencies(container, *dependencies, **dependency_map)[源代码]

MockDependencyProvider 的实例替换 container 上的依赖提供者。

在 *dependencies 中命名的依赖项将被替换为 MockDependencyProvider ,该提供者注入一个 MagicMock 作为依赖项。

另外,您可以使用关键字参数来命名依赖项并提供 MockDependencyProvider 应该注入的替代值。

返回在 (*dependencies) 参数中指定的每个依赖项的 MockDependencyProvider.dependency ,以便可以检查对被替代依赖项的调用。如果只替换了一个依赖项,则返回单个对象;否则返回一个生成器,按与 dependencies 相同的顺序生成替代项。请注意,通过关键字参数 **dependency_map 指定的任何被替代依赖项将不会被返回。

替代项在容器实例上进行替换,对服务类没有影响。因此,新容器实例不会受到先前实例上的替代项的影响。

用法

from nameko.rpc import RpcProxy, rpc
from nameko.standalone.rpc import ServiceRpcProxy

class ConversionService(object):
    name = "conversions"

    maths_rpc = RpcProxy("maths")

    @rpc
    def inches_to_cm(self, inches):
        return self.maths_rpc.multiply(inches, 2.54)

    @rpc
    def cm_to_inches(self, cms):
        return self.maths_rpc.divide(cms, 2.54)

container = ServiceContainer(ConversionService, config)
mock_maths_rpc = replace_dependencies(container, "maths_rpc")
mock_maths_rpc.divide.return_value = 39.37

container.start()

with ServiceRpcProxy('conversions', config) as proxy:
    proxy.cm_to_inches(100)

# assert that the dependency was called as expected
mock_maths_rpc.divide.assert_called_once_with(100, 2.54)

通过关键字提供特定的替代项:

class StubMaths(object):

    def divide(self, val1, val2):
        return val1 / val2

replace_dependencies(container, maths_rpc=StubMaths())

container.start()

with ServiceRpcProxy('conversions', config) as proxy:
    assert proxy.cm_to_inches(127) == 50.0

完整示例

以下集成测试示例使用了两个作用域限制助手:

"""
该文件定义了几个玩具服务,它们相互作用形成著名的 ACME 公司的商店。
`AcmeShopService` 依赖于 `StockService`、`InvoiceService` 和 `PaymentService` 来履行订单。
这些服务并不是最佳实践示例!它们是为文件底部的测试提供的最小服务。

``test_shop_integration`` 是对 ACME 商店“结账流程”的完整集成测试。
它演示了如何结合测试多个 ACME 服务,包括通过替换某些入口点和依赖项来限制服务交互。
"""

from collections import defaultdict

import pytest

from nameko.extensions import DependencyProvider
from nameko.events import EventDispatcher, event_handler
from nameko.exceptions import RemoteError
from nameko.rpc import rpc, RpcProxy
from nameko.standalone.rpc import ServiceRpcProxy
from nameko.testing.services import replace_dependencies, restrict_entrypoints
from nameko.testing.utils import get_container
from nameko.timer import timer


class NotLoggedInError(Exception):
    pass


class ItemOutOfStockError(Exception):
    pass


class ItemDoesNotExistError(Exception):
    pass


class ShoppingBasket(DependencyProvider):
    """A shopping basket tied to the current ``user_id``."""

    def __init__(self):
        self.baskets = defaultdict(list)

    def get_dependency(self, worker_ctx):
        class Basket(object):
            def __init__(self, basket):
                self._basket = basket
                self.worker_ctx = worker_ctx

            def add(self, item):
                self._basket.append(item)

            def __iter__(self):
                for item in self._basket:
                    yield item

        try:
            user_id = worker_ctx.data["user_id"]
        except KeyError:
            raise NotLoggedInError()
        return Basket(self.baskets[user_id])


class AcmeShopService:
    name = "acmeshopservice"

    user_basket = ShoppingBasket()
    stock_rpc = RpcProxy("stockservice")
    invoice_rpc = RpcProxy("invoiceservice")
    payment_rpc = RpcProxy("paymentservice")

    fire_event = EventDispatcher()

    @rpc
    def add_to_basket(self, item_code):
        """Add item identified by ``item_code`` to the shopping basket.

        This is a toy example! Ignore the obvious race condition.
        """
        stock_level = self.stock_rpc.check_stock(item_code)
        if stock_level > 0:
            self.user_basket.add(item_code)
            self.fire_event("item_added_to_basket", item_code)
            return item_code

        raise ItemOutOfStockError(item_code)

    @rpc
    def checkout(self):
        """Take payment for all items in the shopping basket."""
        total_price = sum(self.stock_rpc.check_price(item) for item in self.user_basket)

        # prepare invoice
        invoice = self.invoice_rpc.prepare_invoice(total_price)

        # take payment
        self.payment_rpc.take_payment(invoice)

        # fire checkout event if prepare_invoice and take_payment succeeded
        checkout_event_data = {"invoice": invoice, "items": list(self.user_basket)}
        self.fire_event("checkout_complete", checkout_event_data)
        return total_price


class Warehouse(DependencyProvider):
    """A database of items in the warehouse.

    This is a toy example! A dictionary is not a database.
    """

    def __init__(self):
        self.database = {
            "anvil": {"price": 100, "stock": 3},
            "dehydrated_boulders": {"price": 999, "stock": 12},
            "invisible_paint": {"price": 10, "stock": 30},
            "toothpicks": {"price": 1, "stock": 0},
        }

    def get_dependency(self, worker_ctx):
        return self.database


class StockService:
    name = "stockservice"

    warehouse = Warehouse()

    @rpc
    def check_price(self, item_code):
        """Check the price of an item."""
        try:
            return self.warehouse[item_code]["price"]
        except KeyError:
            raise ItemDoesNotExistError(item_code)

    @rpc
    def check_stock(self, item_code):
        """Check the stock level of an item."""
        try:
            return self.warehouse[item_code]["stock"]
        except KeyError:
            raise ItemDoesNotExistError(item_code)

    @rpc
    @timer(100)
    def monitor_stock(self):
        """Periodic stock monitoring method. Can also be triggered manually
        over RPC.

        This is an expensive process that we don't want to exercise during
        integration testing...
        """
        raise NotImplementedError()

    @event_handler("acmeshopservice", "checkout_complete")
    def dispatch_items(self, event_data):
        """Dispatch items from stock on successful checkouts.

        This is an expensive process that we don't want to exercise during
        integration testing...
        """
        raise NotImplementedError()


class AddressBook(DependencyProvider):
    """A database of user details, keyed on user_id."""

    def __init__(self):
        self.address_book = {
            "wile_e_coyote": {
                "username": "wile_e_coyote",
                "fullname": "Wile E Coyote",
                "address": "12 Long Road, High Cliffs, Utah",
            },
        }

    def get_dependency(self, worker_ctx):
        def get_user_details():
            try:
                user_id = worker_ctx.data["user_id"]
            except KeyError:
                raise NotLoggedInError()
            return self.address_book.get(user_id)

        return get_user_details


class InvoiceService:
    name = "invoiceservice"

    get_user_details = AddressBook()

    @rpc
    def prepare_invoice(self, amount):
        """Prepare an invoice for ``amount`` for the current user."""
        address = self.get_user_details().get("address")
        fullname = self.get_user_details().get("fullname")
        username = self.get_user_details().get("username")

        msg = "Dear {}. Please pay ${} to ACME Corp.".format(fullname, amount)
        invoice = {
            "message": msg,
            "amount": amount,
            "customer": username,
            "address": address,
        }
        return invoice


class PaymentService:
    name = "paymentservice"

    @rpc
    def take_payment(self, invoice):
        """Take payment from a customer according to ``invoice``.

        This is an expensive process that we don't want to exercise during
        integration testing...
        """
        raise NotImplementedError()


# =============================================================================
# Begin test
# =============================================================================


@pytest.fixture
def rpc_proxy_factory(rabbit_config):
    """Factory fixture for standalone RPC proxies.

    Proxies are started automatically so they can be used without a ``with``
    statement. All created proxies are stopped at the end of the test, when
    this fixture closes.
    """
    all_proxies = []

    def make_proxy(service_name, **kwargs):
        proxy = ServiceRpcProxy(service_name, rabbit_config, **kwargs)
        all_proxies.append(proxy)
        return proxy.start()

    yield make_proxy

    for proxy in all_proxies:
        proxy.stop()


def test_shop_checkout_integration(rabbit_config, runner_factory, rpc_proxy_factory):
    """Simulate a checkout flow as an integration test.

    Requires instances of AcmeShopService, StockService and InvoiceService
    to be running. Explicitly replaces the rpc proxy to PaymentService so
    that service doesn't need to be hosted.

    Also replaces the event dispatcher dependency on AcmeShopService and
    disables the timer entrypoint on StockService. Limiting the interactions
    of services in this way reduces the scope of the integration test and
    eliminates undesirable side-effects (e.g. processing events unnecessarily).
    """
    context_data = {"user_id": "wile_e_coyote"}
    shop = rpc_proxy_factory("acmeshopservice", context_data=context_data)

    runner = runner_factory(
        rabbit_config, AcmeShopService, StockService, InvoiceService
    )

    # replace ``event_dispatcher`` and ``payment_rpc``  dependencies on
    # AcmeShopService with ``MockDependencyProvider``\s
    shop_container = get_container(runner, AcmeShopService)
    fire_event, payment_rpc = replace_dependencies(
        shop_container, "fire_event", "payment_rpc"
    )

    # restrict entrypoints on StockService
    stock_container = get_container(runner, StockService)
    restrict_entrypoints(stock_container, "check_price", "check_stock")

    runner.start()

    # add some items to the basket
    assert shop.add_to_basket("anvil") == "anvil"
    assert shop.add_to_basket("invisible_paint") == "invisible_paint"

    # try to buy something that's out of stock
    with pytest.raises(RemoteError) as exc_info:
        shop.add_to_basket("toothpicks")
    assert exc_info.value.exc_type == "ItemOutOfStockError"

    # provide a mock response from the payment service
    payment_rpc.take_payment.return_value = "Payment complete."

    # checkout
    res = shop.checkout()

    total_amount = 100 + 10
    assert res == total_amount

    # verify integration with mocked out payment service
    payment_rpc.take_payment.assert_called_once_with(
        {
            "customer": "wile_e_coyote",
            "address": "12 Long Road, High Cliffs, Utah",
            "amount": total_amount,
            "message": "Dear Wile E Coyote. Please pay $110 to ACME Corp.",
        }
    )

    # verify events fired as expected
    assert fire_event.call_count == 3


if __name__ == "__main__":
    import sys

    pytest.main(sys.argv)

其他助手

入口点钩子

入口点钩子允许手动调用服务入口点。这在集成测试中非常有用,特别是当很难或昂贵地模拟导致入口点被触发的外部事件时。

您可以为调用提供 context_data ,以模拟特定的调用上下文,例如语言、用户代理或身份验证令牌。

import pytest

from nameko.contextdata import Language
from nameko.rpc import rpc
from nameko.testing.services import entrypoint_hook


class HelloService:
    """ Service under test
    """
    name = "hello_service"

    language = Language()

    @rpc
    def hello(self, name):
        greeting = "Hello"
        if self.language == "fr":
            greeting = "Bonjour"
        elif self.language == "de":
            greeting = "Gutentag"

        return "{}, {}!".format(greeting, name)


@pytest.mark.parametrize("language, greeting", [
    ("en", "Hello"),
    ("fr", "Bonjour"),
    ("de", "Gutentag"),
])
def test_hello_languages(language, greeting, container_factory, rabbit_config):

    container = container_factory(HelloService, rabbit_config)
    container.start()

    context_data = {'language': language}
    with entrypoint_hook(container, 'hello', context_data) as hook:
        assert hook("Matt") == "{}, Matt!".format(greeting)

入口点等待器

入口点等待器是一个上下文管理器,直到指定的入口点被触发并完成时才会退出。这在测试服务之间的异步集成点时非常有用,例如接收事件:

from nameko.events import event_handler
from nameko.standalone.events import event_dispatcher
from nameko.testing.services import entrypoint_waiter


class ServiceB:
    """ Event listening service.
    """
    name = "service_b"

    @event_handler("service_a", "event_type")
    def handle_event(self, payload):
        print("service b received", payload)


def test_event_interface(container_factory, rabbit_config):

    container = container_factory(ServiceB, rabbit_config)
    container.start()

    dispatch = event_dispatcher(rabbit_config)

    # prints "service b received payload" before "exited"
    with entrypoint_waiter(container, 'handle_event'):
        dispatch("service_a", "event_type", "payload")
    print("exited")

请注意,该上下文管理器不仅等待入口点方法的完成,还等待任何依赖项的拆解。例如,基于依赖项的日志记录器(TODO: 链接到捆绑的日志记录器)也会完成。

使用 pytest

Nameko 的测试套件使用 pytest,并为您选择使用 pytest 时提供一些有用的配置和固件。

它们包含在 nameko.testing.pytest 中。该模块作为 pytest 插件 通过 setuptools 注册。Pytest 会自动识别并使用它。