测试服务¶
哲学¶
Nameko 的约定旨在使测试尽可能简单。服务通常是小型且单一用途的,而依赖注入则使得替换和隔离功能模块变得简单。
以下示例使用 pytest ,这是 Nameko 自身测试套件所使用的,但这些辅助工具与测试框架无关。
单元测试¶
在 Nameko 中,单元测试通常意味着在隔离环境中测试单个服务——换句话说,不依赖于任何或大部分依赖项。
worker_factory()
工具将从给定的服务类创建一个 worker,其依赖项被 mock.MagicMock
对象替代。然后可以通过添加 side_effect
和 return_value
来模拟依赖功能:
""" Service unit testing best practice.
"""
from nameko.rpc import RpcProxy, rpc
from nameko.testing.services import worker_factory
class ConversionService(object):
""" Service under test
"""
name = "conversions"
maths_rpc = RpcProxy("maths")
@rpc
def inches_to_cm(self, inches):
return self.maths_rpc.multiply(inches, 2.54)
@rpc
def cms_to_inches(self, cms):
return self.maths_rpc.divide(cms, 2.54)
def test_conversion_service():
# create worker with mock dependencies
service = worker_factory(ConversionService)
# add side effects to the mock proxy to the "maths" service
service.maths_rpc.multiply.side_effect = lambda x, y: x * y
service.maths_rpc.divide.side_effect = lambda x, y: x / y
# test inches_to_cm business logic
assert service.inches_to_cm(300) == 762
service.maths_rpc.multiply.assert_called_once_with(300, 2.54)
# test cms_to_inches business logic
assert service.cms_to_inches(762) == 300
service.maths_rpc.divide.assert_called_once_with(762, 2.54)
在某些情况下,提供替代依赖项而不是使用 mock 是有帮助的。这可以是一个完全功能的替代品(例如,测试数据库会话),或是一个提供部分功能的轻量级适配器。
""" Service unit testing best practice, with an alternative dependency.
"""
import pytest
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from nameko.rpc import rpc
from nameko.testing.services import worker_factory
# using community extension from http://pypi.python.org/pypi/nameko-sqlalchemy
from nameko_sqlalchemy import Session
Base = declarative_base()
class Result(Base):
__tablename__ = 'model'
id = Column(Integer, primary_key=True)
value = Column(String(64))
class Service:
""" Service under test
"""
name = "service"
db = Session(Base)
@rpc
def save(self, value):
result = Result(value=value)
self.db.add(result)
self.db.commit()
@pytest.fixture
def session():
""" Create a test database and session
"""
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
session_cls = sessionmaker(bind=engine)
return session_cls()
def test_service(session):
# create instance, providing the test database session
service = worker_factory(Service, db=session)
# verify ``save`` logic by querying the test database
service.save("helloworld")
assert session.query(Result.value).all() == [("helloworld",)]
集成测试¶
在 Nameko 中,集成测试意味着测试多个服务之间的接口。推荐的方法是以正常方式运行所有被测试的服务,并通过使用助手“触发”一个入口点来引发行为:
""" Service integration testing best practice.
"""
from nameko.rpc import rpc, RpcProxy
from nameko.testing.utils import get_container
from nameko.testing.services import entrypoint_hook
class ServiceX:
""" Service under test
"""
name = "service_x"
y = RpcProxy("service_y")
@rpc
def remote_method(self, value):
res = "{}-x".format(value)
return self.y.append_identifier(res)
class ServiceY:
""" Service under test
"""
name = "service_y"
@rpc
def append_identifier(self, value):
return "{}-y".format(value)
def test_service_x_y_integration(runner_factory, rabbit_config):
# run services in the normal manner
runner = runner_factory(rabbit_config, ServiceX, ServiceY)
runner.start()
# artificially fire the "remote_method" entrypoint on ServiceX
# and verify response
container = get_container(runner, ServiceX)
with entrypoint_hook(container, "remote_method") as entrypoint:
assert entrypoint("value") == "value-x-y"
请注意,这里 ServiceX
和 ServiceY
之间的接口就像在正常操作下一样。
对于特定测试不在范围内的接口,可以使用以下测试助手之一来禁用:
限制入口点¶
- nameko.testing.services.restrict_entrypoints(container, *entrypoints)[源代码]
限制
container
上的入口点,仅限于entrypoints
中指定的名称。此方法必须在容器启动之前调用。
用法
以下服务定义有两个入口点:
class Service(object): name = "service" @timer(interval=1) def foo(self, arg): pass @rpc def bar(self, arg) pass @rpc def baz(self, arg): pass container = ServiceContainer(Service, config)
要禁用
foo
上的计时器入口点,仅保留 RPC 入口点:restrict_entrypoints(container, "bar", "baz")
请注意,无法单独识别同一方法上的多个入口点。
替换依赖项¶
- nameko.testing.services.replace_dependencies(container, *dependencies, **dependency_map)[源代码]
用 MockDependencyProvider 的实例替换
container
上的依赖提供者。在 *dependencies 中命名的依赖项将被替换为 MockDependencyProvider ,该提供者注入一个 MagicMock 作为依赖项。
另外,您可以使用关键字参数来命名依赖项并提供 MockDependencyProvider 应该注入的替代值。
返回在 (*dependencies) 参数中指定的每个依赖项的 MockDependencyProvider.dependency ,以便可以检查对被替代依赖项的调用。如果只替换了一个依赖项,则返回单个对象;否则返回一个生成器,按与
dependencies
相同的顺序生成替代项。请注意,通过关键字参数 **dependency_map 指定的任何被替代依赖项将不会被返回。替代项在容器实例上进行替换,对服务类没有影响。因此,新容器实例不会受到先前实例上的替代项的影响。
用法
from nameko.rpc import RpcProxy, rpc from nameko.standalone.rpc import ServiceRpcProxy class ConversionService(object): name = "conversions" maths_rpc = RpcProxy("maths") @rpc def inches_to_cm(self, inches): return self.maths_rpc.multiply(inches, 2.54) @rpc def cm_to_inches(self, cms): return self.maths_rpc.divide(cms, 2.54) container = ServiceContainer(ConversionService, config) mock_maths_rpc = replace_dependencies(container, "maths_rpc") mock_maths_rpc.divide.return_value = 39.37 container.start() with ServiceRpcProxy('conversions', config) as proxy: proxy.cm_to_inches(100) # assert that the dependency was called as expected mock_maths_rpc.divide.assert_called_once_with(100, 2.54)
通过关键字提供特定的替代项:
class StubMaths(object): def divide(self, val1, val2): return val1 / val2 replace_dependencies(container, maths_rpc=StubMaths()) container.start() with ServiceRpcProxy('conversions', config) as proxy: assert proxy.cm_to_inches(127) == 50.0
完整示例¶
以下集成测试示例使用了两个作用域限制助手:
"""
该文件定义了几个玩具服务,它们相互作用形成著名的 ACME 公司的商店。
`AcmeShopService` 依赖于 `StockService`、`InvoiceService` 和 `PaymentService` 来履行订单。
这些服务并不是最佳实践示例!它们是为文件底部的测试提供的最小服务。
``test_shop_integration`` 是对 ACME 商店“结账流程”的完整集成测试。
它演示了如何结合测试多个 ACME 服务,包括通过替换某些入口点和依赖项来限制服务交互。
"""
from collections import defaultdict
import pytest
from nameko.extensions import DependencyProvider
from nameko.events import EventDispatcher, event_handler
from nameko.exceptions import RemoteError
from nameko.rpc import rpc, RpcProxy
from nameko.standalone.rpc import ServiceRpcProxy
from nameko.testing.services import replace_dependencies, restrict_entrypoints
from nameko.testing.utils import get_container
from nameko.timer import timer
class NotLoggedInError(Exception):
pass
class ItemOutOfStockError(Exception):
pass
class ItemDoesNotExistError(Exception):
pass
class ShoppingBasket(DependencyProvider):
"""A shopping basket tied to the current ``user_id``."""
def __init__(self):
self.baskets = defaultdict(list)
def get_dependency(self, worker_ctx):
class Basket(object):
def __init__(self, basket):
self._basket = basket
self.worker_ctx = worker_ctx
def add(self, item):
self._basket.append(item)
def __iter__(self):
for item in self._basket:
yield item
try:
user_id = worker_ctx.data["user_id"]
except KeyError:
raise NotLoggedInError()
return Basket(self.baskets[user_id])
class AcmeShopService:
name = "acmeshopservice"
user_basket = ShoppingBasket()
stock_rpc = RpcProxy("stockservice")
invoice_rpc = RpcProxy("invoiceservice")
payment_rpc = RpcProxy("paymentservice")
fire_event = EventDispatcher()
@rpc
def add_to_basket(self, item_code):
"""Add item identified by ``item_code`` to the shopping basket.
This is a toy example! Ignore the obvious race condition.
"""
stock_level = self.stock_rpc.check_stock(item_code)
if stock_level > 0:
self.user_basket.add(item_code)
self.fire_event("item_added_to_basket", item_code)
return item_code
raise ItemOutOfStockError(item_code)
@rpc
def checkout(self):
"""Take payment for all items in the shopping basket."""
total_price = sum(self.stock_rpc.check_price(item) for item in self.user_basket)
# prepare invoice
invoice = self.invoice_rpc.prepare_invoice(total_price)
# take payment
self.payment_rpc.take_payment(invoice)
# fire checkout event if prepare_invoice and take_payment succeeded
checkout_event_data = {"invoice": invoice, "items": list(self.user_basket)}
self.fire_event("checkout_complete", checkout_event_data)
return total_price
class Warehouse(DependencyProvider):
"""A database of items in the warehouse.
This is a toy example! A dictionary is not a database.
"""
def __init__(self):
self.database = {
"anvil": {"price": 100, "stock": 3},
"dehydrated_boulders": {"price": 999, "stock": 12},
"invisible_paint": {"price": 10, "stock": 30},
"toothpicks": {"price": 1, "stock": 0},
}
def get_dependency(self, worker_ctx):
return self.database
class StockService:
name = "stockservice"
warehouse = Warehouse()
@rpc
def check_price(self, item_code):
"""Check the price of an item."""
try:
return self.warehouse[item_code]["price"]
except KeyError:
raise ItemDoesNotExistError(item_code)
@rpc
def check_stock(self, item_code):
"""Check the stock level of an item."""
try:
return self.warehouse[item_code]["stock"]
except KeyError:
raise ItemDoesNotExistError(item_code)
@rpc
@timer(100)
def monitor_stock(self):
"""Periodic stock monitoring method. Can also be triggered manually
over RPC.
This is an expensive process that we don't want to exercise during
integration testing...
"""
raise NotImplementedError()
@event_handler("acmeshopservice", "checkout_complete")
def dispatch_items(self, event_data):
"""Dispatch items from stock on successful checkouts.
This is an expensive process that we don't want to exercise during
integration testing...
"""
raise NotImplementedError()
class AddressBook(DependencyProvider):
"""A database of user details, keyed on user_id."""
def __init__(self):
self.address_book = {
"wile_e_coyote": {
"username": "wile_e_coyote",
"fullname": "Wile E Coyote",
"address": "12 Long Road, High Cliffs, Utah",
},
}
def get_dependency(self, worker_ctx):
def get_user_details():
try:
user_id = worker_ctx.data["user_id"]
except KeyError:
raise NotLoggedInError()
return self.address_book.get(user_id)
return get_user_details
class InvoiceService:
name = "invoiceservice"
get_user_details = AddressBook()
@rpc
def prepare_invoice(self, amount):
"""Prepare an invoice for ``amount`` for the current user."""
address = self.get_user_details().get("address")
fullname = self.get_user_details().get("fullname")
username = self.get_user_details().get("username")
msg = "Dear {}. Please pay ${} to ACME Corp.".format(fullname, amount)
invoice = {
"message": msg,
"amount": amount,
"customer": username,
"address": address,
}
return invoice
class PaymentService:
name = "paymentservice"
@rpc
def take_payment(self, invoice):
"""Take payment from a customer according to ``invoice``.
This is an expensive process that we don't want to exercise during
integration testing...
"""
raise NotImplementedError()
# =============================================================================
# Begin test
# =============================================================================
@pytest.fixture
def rpc_proxy_factory(rabbit_config):
"""Factory fixture for standalone RPC proxies.
Proxies are started automatically so they can be used without a ``with``
statement. All created proxies are stopped at the end of the test, when
this fixture closes.
"""
all_proxies = []
def make_proxy(service_name, **kwargs):
proxy = ServiceRpcProxy(service_name, rabbit_config, **kwargs)
all_proxies.append(proxy)
return proxy.start()
yield make_proxy
for proxy in all_proxies:
proxy.stop()
def test_shop_checkout_integration(rabbit_config, runner_factory, rpc_proxy_factory):
"""Simulate a checkout flow as an integration test.
Requires instances of AcmeShopService, StockService and InvoiceService
to be running. Explicitly replaces the rpc proxy to PaymentService so
that service doesn't need to be hosted.
Also replaces the event dispatcher dependency on AcmeShopService and
disables the timer entrypoint on StockService. Limiting the interactions
of services in this way reduces the scope of the integration test and
eliminates undesirable side-effects (e.g. processing events unnecessarily).
"""
context_data = {"user_id": "wile_e_coyote"}
shop = rpc_proxy_factory("acmeshopservice", context_data=context_data)
runner = runner_factory(
rabbit_config, AcmeShopService, StockService, InvoiceService
)
# replace ``event_dispatcher`` and ``payment_rpc`` dependencies on
# AcmeShopService with ``MockDependencyProvider``\s
shop_container = get_container(runner, AcmeShopService)
fire_event, payment_rpc = replace_dependencies(
shop_container, "fire_event", "payment_rpc"
)
# restrict entrypoints on StockService
stock_container = get_container(runner, StockService)
restrict_entrypoints(stock_container, "check_price", "check_stock")
runner.start()
# add some items to the basket
assert shop.add_to_basket("anvil") == "anvil"
assert shop.add_to_basket("invisible_paint") == "invisible_paint"
# try to buy something that's out of stock
with pytest.raises(RemoteError) as exc_info:
shop.add_to_basket("toothpicks")
assert exc_info.value.exc_type == "ItemOutOfStockError"
# provide a mock response from the payment service
payment_rpc.take_payment.return_value = "Payment complete."
# checkout
res = shop.checkout()
total_amount = 100 + 10
assert res == total_amount
# verify integration with mocked out payment service
payment_rpc.take_payment.assert_called_once_with(
{
"customer": "wile_e_coyote",
"address": "12 Long Road, High Cliffs, Utah",
"amount": total_amount,
"message": "Dear Wile E Coyote. Please pay $110 to ACME Corp.",
}
)
# verify events fired as expected
assert fire_event.call_count == 3
if __name__ == "__main__":
import sys
pytest.main(sys.argv)
其他助手¶
入口点钩子¶
入口点钩子允许手动调用服务入口点。这在集成测试中非常有用,特别是当很难或昂贵地模拟导致入口点被触发的外部事件时。
您可以为调用提供 context_data ,以模拟特定的调用上下文,例如语言、用户代理或身份验证令牌。
import pytest
from nameko.contextdata import Language
from nameko.rpc import rpc
from nameko.testing.services import entrypoint_hook
class HelloService:
""" Service under test
"""
name = "hello_service"
language = Language()
@rpc
def hello(self, name):
greeting = "Hello"
if self.language == "fr":
greeting = "Bonjour"
elif self.language == "de":
greeting = "Gutentag"
return "{}, {}!".format(greeting, name)
@pytest.mark.parametrize("language, greeting", [
("en", "Hello"),
("fr", "Bonjour"),
("de", "Gutentag"),
])
def test_hello_languages(language, greeting, container_factory, rabbit_config):
container = container_factory(HelloService, rabbit_config)
container.start()
context_data = {'language': language}
with entrypoint_hook(container, 'hello', context_data) as hook:
assert hook("Matt") == "{}, Matt!".format(greeting)
入口点等待器¶
入口点等待器是一个上下文管理器,直到指定的入口点被触发并完成时才会退出。这在测试服务之间的异步集成点时非常有用,例如接收事件:
from nameko.events import event_handler
from nameko.standalone.events import event_dispatcher
from nameko.testing.services import entrypoint_waiter
class ServiceB:
""" Event listening service.
"""
name = "service_b"
@event_handler("service_a", "event_type")
def handle_event(self, payload):
print("service b received", payload)
def test_event_interface(container_factory, rabbit_config):
container = container_factory(ServiceB, rabbit_config)
container.start()
dispatch = event_dispatcher(rabbit_config)
# prints "service b received payload" before "exited"
with entrypoint_waiter(container, 'handle_event'):
dispatch("service_a", "event_type", "payload")
print("exited")
请注意,该上下文管理器不仅等待入口点方法的完成,还等待任何依赖项的拆解。例如,基于依赖项的日志记录器(TODO: 链接到捆绑的日志记录器)也会完成。
使用 pytest¶
Nameko 的测试套件使用 pytest,并为您选择使用 pytest 时提供一些有用的配置和固件。
它们包含在 nameko.testing.pytest
中。该模块作为 pytest 插件 通过 setuptools 注册。Pytest 会自动识别并使用它。