nameko.runners 源代码

from __future__ import absolute_import

from contextlib import contextmanager
from logging import getLogger

from typing import Type, Dict, Any, List

from nameko.containers import get_container_cls, get_service_name, ServiceContainer
from nameko.utils.concurrency import SpawningProxy


[文档] _log = getLogger(__name__)
[文档] class ServiceRunner(object): """允许用户并发提供多个服务。 调用者可以为多个服务类注册名称,然后使用 start 方法来提供它们,并使用 stop 和 kill 方法 来停止它们。wait 方法将阻塞,直到所有服务停止。 示例:: runner = ServiceRunner(config) runner.add_service(Foobar) runner.add_service(Spam) add_sig_term_handler(runner.kill) runner.start() runner.wait() """ def __init__(self, config: dict): """初始化一个服务运行者 :param config: 配置字典,从yaml配置文件中解析出来的, 默认为只有1个 AMQP 地址 :type config: dict """
[文档] self.service_map: Dict[str, ServiceContainer] = {}
[文档] self.config: Dict[str, Any] = config
[文档] self.container_cls: Type[ServiceContainer] = get_container_cls(config)
@property
[文档] def service_names(self): return self.service_map.keys()
@property
[文档] def containers(self): return self.service_map.values()
[文档] def add_service(self, cls: Type): """将服务类添加到运行器中。 对于给定的服务名称,最多只能有一个服务类。 服务类必须在调用 start() 之前注册。 """ service_name = get_service_name(cls) container = self.container_cls(cls, self.config) # 用容器类封装一层service类 self.service_map[service_name] = container
[文档] def start(self): """启动所有注册的服务。 每个服务都会使用 __init__ 方法中提供的容器类创建一个新容器。 所有容器将并发启动,该方法将在所有容器完成启动例程之前阻塞。 """ service_names = ", ".join(self.service_names) _log.info("启动服务: %s", service_names) # 批量调用容器类的 start 方法. SpawningProxy(self.containers).start() _log.debug("服务已启动: %s", service_names)
[文档] def stop(self): """并发停止所有正在运行的容器。 该方法在所有容器停止之前将阻塞。 """ service_names = ", ".join(self.service_names) _log.info("停止服务: %s", service_names) # 批量调用容器类的 stop 方法. SpawningProxy(self.containers).stop() _log.debug("服务已停止: %s", service_names)
[文档] def kill(self): """并发杀死所有正在运行的容器。 该方法将在所有容器停止之前将阻塞。 """ service_names = ", ".join(self.service_names) _log.info("杀死服务: %s", service_names) # 批量调用容器类的 kill 方法. SpawningProxy(self.containers).kill() _log.debug("服务已被杀死: %s ", service_names)
[文档] def wait(self): """等待所有正在运行的容器停止。""" try: SpawningProxy(self.containers, abort_on_error=True).wait() except Exception: # 如果一个容器失败,停止它的同伴并重新引发异常 self.stop() raise
@contextmanager
[文档] def run_services(config: dict, *services: Type, **kwargs): """为上下文块提供多个服务。 调用者可以指定多个服务类,然后在退出上下文块时 停止(默认)或杀死它们。 示例:: with run_services(config, Foobar, Spam) as runner: # 与服务交互并在退出块时停止它们 # 服务已停止 可以通过关键字参数指定额外的配置,以供 :class:``ServiceRunner`` 实例使用:: with run_services(config, Foobar, Spam, kill_on_exit=True): # 与服务交互 # 服务已被杀死 :Parameters: config : dict 用于实例化服务容器的配置 services : 服务定义 在上下文块中提供的服务 kill_on_exit : bool (default=False) 如果为 ``True``,在退出上下文块时对服务容器调用 ``kill()``。 否则在退出块时将调用 ``stop()``。 :Returns: 配置好的 :class:`ServiceRunner` 实例 """ kill_on_exit = kwargs.pop("kill_on_exit", False) runner = ServiceRunner(config) for service in services: runner.add_service(service) runner.start() yield runner if kill_on_exit: runner.kill() else: runner.stop()