nameko.testing.pytest 源代码

from __future__ import absolute_import

import pytest


# all imports are inline to make sure they happen after eventlet.monkey_patch
# which is called in pytest_load_initial_conftests
# (calling monkey_patch at import time breaks the pytest capturemanager - see
#  https://github.com/eventlet/eventlet/pull/239)


[文档] def parse_config_option(text): import yaml if '=' in text: key, value = text.strip().split('=', 1) return key, yaml.safe_load(value) else: return text, True
[文档] def pytest_addoption(parser): import ssl parser.addoption( '--blocking-detection', action='store_true', dest='blocking_detection', default=False, help='turn on eventlet hub blocking detection') parser.addoption( "--amqp-uri", "--rabbit-amqp-uri", action="store", dest='RABBIT_AMQP_URI', default='pyamqp://guest:guest@localhost:5672/', help=( "URI for the RabbitMQ broker. Any specified virtual host will be " "ignored because tests run in their own isolated vhost." )) parser.addoption( "--rabbit-api-uri", "--rabbit-ctl-uri", action="store", dest='RABBIT_API_URI', default='http://guest:guest@localhost:15672', help=("URI for RabbitMQ management interface.") ) parser.addoption( '--amqp-ssl-port', action='store', dest='AMQP_SSL_PORT', default=5671, help='Port number for SSL connection') parser.addoption( '--amqp-ssl-option', type=parse_config_option, action='append', dest='AMQP_SSL_OPTIONS', metavar='KEY=VALUE', default=[ ('ca_certs', 'certs/cacert.pem'), ('certfile', 'certs/clientcert.pem'), ('keyfile', 'certs/clientkey.pem'), ('cert_reqs', ssl.CERT_REQUIRED) ], help=( 'SSL connection options for passing to ssl.wrap_socket.' 'Multiple options may be given. Values are parsed as YAML, ' 'hence the following example is valid: \n' '--amqp-ssl-option certfile=clientcert.pem ' '--amqp-ssl-option ssl_version=!!python/name:ssl.PROTOCOL_TLSv1_2' ) ) parser.addoption( "--suppress-nameko-eventlet-notification", action="store_true", dest="suppress_eventlet_notification", default=False, help="suppress nameko warning about eventlet monkey-patch", )
[文档] def pytest_configure(config): if config.option.blocking_detection: # pragma: no cover from eventlet import debug debug.hub_blocking_detection(True)
[文档] def pytest_sessionstart(session): if not session.config.option.suppress_eventlet_notification: from eventlet import patcher if not any( map(patcher.is_monkey_patched, ["os", "socket", "thread"]) ): # pragma: no cover -- covered in subprocess pytest import warnings warnings.warn( "For versions after 2.13.0, it is recommended to use the `nameko test` " "cli command. Nameko's pytest plugin no longer applies the eventlet " "monkeypatch as a pytest hook. This was removed to prevent polluting " "the environment in case a monkeypatch was not desired. " "\n" "If you need to invoke pytest directly instead of using `nameko test`, " "you can install the pytest-eventlet plugin, which just performs the " "automatic monkeypatching that was removed from Nameko. Alternatively " "you can apply the monkeypatch manually in your conftest.py file." "\n" "This warning can be suppressed with the " "--suppress-nameko-eventlet-notification pytest option." )
@pytest.fixture
[文档] def empty_config(): return {}
@pytest.fixture
[文档] def mock_container(request, empty_config): from mock import create_autospec from nameko.constants import DEFAULT_SERIALIZER, SERIALIZER_CONFIG_KEY from nameko.containers import ServiceContainer container = create_autospec(ServiceContainer) container.config = empty_config container.config[SERIALIZER_CONFIG_KEY] = DEFAULT_SERIALIZER container.serializer = container.config[SERIALIZER_CONFIG_KEY] container.accept = [DEFAULT_SERIALIZER] return container
@pytest.fixture(scope='session')
[文档] def rabbit_manager(request): from nameko.testing import rabbit config = request.config return rabbit.Client(config.getoption('RABBIT_API_URI'))
@pytest.fixture(scope='session')
[文档] def vhost_pipeline(request, rabbit_manager): try: from collections.abc import Iterable except ImportError: # pragma: no cover # py2 compatibility from collections import Iterable # pylint: disable=E0611 import random import socket import string from kombu.pools import connections from requests.exceptions import HTTPError from six.moves.urllib.parse import urlparse # pylint: disable=E0401 from nameko.testing.utils import ResourcePipeline from nameko.utils.retry import retry rabbit_amqp_uri = request.config.getoption('RABBIT_AMQP_URI') uri_parts = urlparse(rabbit_amqp_uri) username = uri_parts.username def create(): vhost = "nameko_test_{}".format( "".join(random.choice(string.ascii_lowercase) for _ in range(10)) ) rabbit_manager.create_vhost(vhost) rabbit_manager.set_vhost_permissions( vhost, username, '.*', '.*', '.*' ) return vhost @retry(for_exceptions=(HTTPError, socket.timeout), delay=1, max_attempts=9) def destroy(vhost): rabbit_manager.delete_vhost(vhost) # make sure connections for this vhost are also destroyed. vhost_pools = [ pool for key, pool in list(connections.items()) if isinstance(key, Iterable) and key[4] == vhost ] for pool in vhost_pools: pool.force_close_all() pipeline = ResourcePipeline(create, destroy) with pipeline.run() as vhosts: yield vhosts
@pytest.fixture()
[文档] def rabbit_config(request, vhost_pipeline, rabbit_manager): from six.moves.urllib.parse import urlparse # pylint: disable=E0401 rabbit_amqp_uri = request.config.getoption('RABBIT_AMQP_URI') uri_parts = urlparse(rabbit_amqp_uri) username = uri_parts.username with vhost_pipeline.get() as vhost: amqp_uri = "{uri.scheme}://{uri.netloc}/{vhost}".format( uri=uri_parts, vhost=vhost ) conf = { 'AMQP_URI': amqp_uri, 'username': username, 'vhost': vhost } yield conf
@pytest.fixture()
[文档] def rabbit_ssl_config(request, rabbit_config): from six.moves.urllib.parse import urlparse # pylint: disable=E0401 ssl_options = request.config.getoption('AMQP_SSL_OPTIONS') ssl_options = {key: value for key, value in ssl_options} or True amqp_ssl_port = request.config.getoption('AMQP_SSL_PORT') uri_parts = urlparse(rabbit_config['AMQP_URI']) amqp_ssl_uri = uri_parts._replace( netloc=uri_parts.netloc.replace( str(uri_parts.port), str(amqp_ssl_port)) ).geturl() conf = { 'AMQP_URI': amqp_ssl_uri, 'username': rabbit_config['username'], 'vhost': rabbit_config['vhost'], 'AMQP_SSL': ssl_options, } return conf
@pytest.fixture
[文档] def amqp_uri(rabbit_config): from nameko.constants import AMQP_URI_CONFIG_KEY return rabbit_config[AMQP_URI_CONFIG_KEY]
@pytest.fixture(autouse=True)
[文档] def fast_teardown(request): """ This fixture fixes the order of the `container_factory`, `runner_factory` and `rabbit_config` fixtures to get the fastest possible teardown of tests that use them. Without this fixture, the teardown order depends on the fixture resolution defined by the test, for example:: def test_foo(container_factory, rabbit_config): pass # rabbit_config tears down first def test_bar(rabbit_config, container_factory): pass # container_factory tears down first This fixture ensures the teardown order is: 1. `fast_teardown` (this fixture) 2. `rabbit_config` 3. `container_factory` / `runner_factory` That is, `rabbit_config` teardown, which removes the vhost created for the test, happens *before* the consumers are stopped. Deleting the vhost causes the broker to sends a "basic-cancel" message to any connected consumers, which will include the consumers in all containers created by the `container_factory` and `runner_factory` fixtures. This speeds up test teardown because the "basic-cancel" breaks the consumers' `drain_events` loop (http://bit.do/kombu-drain-events) which would otherwise wait for up to a second for the socket read to time out before gracefully shutting down. For even faster teardown, we monkeypatch the consumers to ensure they don't try to reconnect between the "basic-cancel" and being explicitly stopped when their container is killed. In older versions of RabbitMQ, the monkeypatch also protects against a race-condition that can lead to hanging tests. Modern RabbitMQ raises a `NotAllowed` exception if you try to connect to a vhost that doesn't exist, but older versions (including 3.4.3, used by Travis) just raise a `socket.error`. This is classed as a recoverable error, and consumers attempt to reconnect. Kombu's reconnection code blocks until a connection is established, so consumers that attempt to reconnect before being killed get stuck there. """ from kombu.mixins import ConsumerMixin reorder_fixtures = ('container_factory', 'runner_factory', 'rabbit_config') for fixture in reorder_fixtures: if fixture in request.fixturenames: request.getfixturevalue(fixture) # pragma: no cover consumers = [] # monkeypatch the ConsumerMixin constructor to stash a reference to # each instance orig_init = ConsumerMixin.__init__ def __init__(self, *args, **kwargs): orig_init(self, *args, **kwargs) consumers.append(self) ConsumerMixin.__init__ = __init__ yield ConsumerMixin.__init__ = orig_init # set the `should_stop` attribute on all consumers *before* the rabbit # vhost is killed, so that they don't try to reconnect before they're # explicitly killed when their container stops. for consumer in consumers: consumer.should_stop = True
@pytest.fixture
[文档] def get_message_from_queue(amqp_uri): from nameko.amqp import get_connection def get(queue_name, ack=True, block=True, timeout=1, accept=None): with get_connection(amqp_uri) as conn: queue = conn.SimpleQueue(queue_name) # queue doesn't allow passing of `accept` to its consumer, # so we patch it on after instantiation if accept is not None: queue.consumer.accept = accept message = queue.get(block=block, timeout=timeout) if ack: message.ack() else: message.requeue() queue.close() return message return get
@pytest.fixture
[文档] def container_factory(): from nameko.containers import get_container_cls all_containers = [] def make_container(service_cls, config): container_cls = get_container_cls(config) container = container_cls(service_cls, config) all_containers.append(container) return container yield make_container for c in all_containers: try: c.kill() except: # pragma: no cover pass
@pytest.fixture
[文档] def runner_factory(): from nameko.runners import ServiceRunner all_runners = [] def make_runner(config, *service_classes): runner = ServiceRunner(config) for service_cls in service_classes: runner.add_service(service_cls) all_runners.append(runner) return runner yield make_runner for r in all_runners: try: r.kill() except: # pragma: no cover pass
@pytest.fixture
[文档] def predictable_call_ids(request): import itertools from mock import patch with patch('nameko.containers.new_call_id', autospec=True) as get_id: get_id.side_effect = (str(i) for i in itertools.count()) yield get_id
@pytest.fixture()
[文档] def web_config(empty_config): from nameko.constants import WEB_SERVER_CONFIG_KEY from nameko.testing.utils import find_free_port port = find_free_port() cfg = empty_config cfg[WEB_SERVER_CONFIG_KEY] = "127.0.0.1:{}".format(port) return cfg
@pytest.fixture()
[文档] def web_config_port(web_config): from nameko.constants import WEB_SERVER_CONFIG_KEY from nameko.web.server import parse_address return parse_address(web_config[WEB_SERVER_CONFIG_KEY]).port
@pytest.fixture()
[文档] def web_session(web_config_port): from requests import Session from werkzeug.urls import url_join class WebSession(Session): def request(self, method, url, *args, **kwargs): url = url_join('http://127.0.0.1:%d/' % web_config_port, url) return Session.request(self, method, url, *args, **kwargs) sess = WebSession() with sess: yield sess
@pytest.fixture()
[文档] def websocket(web_config_port): import eventlet from nameko.testing.websocket import make_virtual_socket active_sockets = [] def socket_creator(): ws_app, wait_for_sock = make_virtual_socket( '127.0.0.1', web_config_port) gr = eventlet.spawn(ws_app.run_forever) active_sockets.append((gr, ws_app)) socket = wait_for_sock() socket.app = ws_app return socket try: yield socket_creator finally: for gr, ws_app in active_sockets: try: ws_app.close() except Exception: # pragma: no cover pass gr.kill()