from __future__ import absolute_import, unicode_literals, annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from _typeshed import OptExcInfo
import inspect
import sys
import uuid
from collections import deque
from logging import getLogger
from typing import (
Type,
Union,
Any,
Dict,
List,
Tuple,
Optional,
AbstractSet,
Callable,
Iterable,
Mapping,
)
import eventlet
import six
from eventlet.event import Event
from eventlet.greenpool import GreenPool
from greenlet import GreenletExit # pylint: disable=E0611
from nameko import serialization
from nameko.constants import (
CALL_ID_STACK_CONTEXT_KEY,
DEFAULT_MAX_WORKERS,
DEFAULT_PARENT_CALLS_TRACKED,
MAX_WORKERS_CONFIG_KEY,
PARENT_CALLS_CONFIG_KEY,
)
from nameko.exceptions import ConfigurationError, ContainerBeingKilled
from nameko.extensions import (
ENTRYPOINT_EXTENSIONS_ATTR,
is_dependency,
iter_extensions,
DependencyProvider,
Extension,
Entrypoint,
)
from nameko.log_helpers import make_timing_logger
from nameko.utils import import_from_path
from nameko.utils.concurrency import SpawningSet, SpawningProxy
[文档]
_log = getLogger(__name__)
[文档]
_log_time = make_timing_logger(_log)
if six.PY2: # pragma: no cover
[文档]
is_method = inspect.ismethod
else: # pragma: no cover
is_method = inspect.isfunction
[文档]
def get_service_name(service_cls: Type):
"""获取微服务名称"""
service_name = getattr(service_cls, "name", None)
if service_name is None:
raise ConfigurationError(
"Service class 必须定义 `name` 属性 ({}.{})".format(
service_cls.__module__, service_cls.__name__
)
)
if not isinstance(service_name, six.string_types):
raise ConfigurationError(
"Service name 属性必须是string类型 ({}.{}.name)".format(
service_cls.__module__, service_cls.__name__
)
)
return service_name
[文档]
def get_container_cls(config: dict) -> Union[Type[ServiceContainer], Any]:
"""获取容器类"""
class_path = config.get("SERVICE_CONTAINER_CLS")
return import_from_path(class_path) or ServiceContainer
[文档]
def new_call_id():
return str(uuid.uuid4())
[文档]
class WorkerContext(object):
"""工作者上下文"""
[文档]
_call_id_stack = None
# _parent_call_id_stack = None
[文档]
_parent_call_id_stack = []
def __init__(
self,
container: ServiceContainer,
service: Type, # 服务类的实例
entrypoint: Entrypoint,
args=None,
kwargs=None,
data=None,
):
[文档]
self.container = container
[文档]
self.config: dict = self.container.config
[文档]
self.service = service
[文档]
self.entrypoint = entrypoint
[文档]
self.service_name = self.container.service_name
[文档]
self.args: Tuple = args if args is not None else ()
[文档]
self.kwargs: Mapping = kwargs if kwargs is not None else {}
[文档]
self.data: dict = data if data is not None else {}
self._parent_call_id_stack = self.data.pop(CALL_ID_STACK_CONTEXT_KEY, [])
@property
[文档]
def call_id_stack(self):
if self._call_id_stack is None:
parent_calls_tracked = self.container.config.get(
PARENT_CALLS_CONFIG_KEY, DEFAULT_PARENT_CALLS_TRACKED
)
stack_length = parent_calls_tracked + 1
self._call_id_stack = deque(maxlen=stack_length)
self._call_id_stack.extend(self._parent_call_id_stack)
self._call_id_stack.append(self.call_id)
return list(self._call_id_stack)
@property
[文档]
def call_id(self):
"""调用ID"""
if self._call_id is None:
self._call_id = "{}.{}.{}".format(
self.service_name, self.entrypoint.method_name, new_call_id()
)
return self._call_id
@property
[文档]
def context_data(self):
data = self.data.copy()
data[CALL_ID_STACK_CONTEXT_KEY] = self.call_id_stack
return data
@property
[文档]
def origin_call_id(self):
if self._parent_call_id_stack:
return self._parent_call_id_stack[0]
@property
[文档]
def immediate_parent_call_id(self):
if self._parent_call_id_stack:
return self._parent_call_id_stack[-1]
[文档]
def __repr__(self):
cls_name = type(self).__name__
service_name = self.service_name
method_name = self.entrypoint.method_name
return "<{} [{}.{}] at 0x{:x}>".format(
cls_name, service_name, method_name, id(self)
)
[文档]
class ServiceContainer(object):
"""服务容器"""
def __init__(self, service_cls: Type, config: dict):
[文档]
self.service_cls: Type[Any] = service_cls
[文档]
self.config: Dict[str, Any] = config
[文档]
self.service_name: str = get_service_name(service_cls)
[文档]
self.shared_extensions: dict = {}
[文档]
self.max_workers: int = (
config.get(MAX_WORKERS_CONFIG_KEY) or DEFAULT_MAX_WORKERS
)
# 向 kombu 中注册序列化方式
self.serializer, self.accept = serialization.setup(self.config)
[文档]
self.entrypoints = SpawningSet()
[文档]
self.dependencies = SpawningSet()
[文档]
self.subextensions = SpawningSet()
# 提取 服务类 service 中 的 属于 DependencyProvider 子类 的定义
[文档]
dependency_attrs: List[Tuple[str, DependencyProvider]] = inspect.getmembers(
service_cls, is_dependency
)
for attr_name, dependency in dependency_attrs:
bound = dependency.bind(self.interface, attr_name)
self.dependencies.add(bound)
self.subextensions.update(iter_extensions(bound))
# 提取 服务类 service 中 的 属于 函数 的定义, 作为入口点
[文档]
entrypoints_methods: List[Tuple[str, Entrypoint]] = inspect.getmembers(
service_cls, is_method
)
for method_name, method in entrypoints_methods:
entrypoints: List[Entrypoint] = getattr(
method, ENTRYPOINT_EXTENSIONS_ATTR, []
)
for entrypoint in entrypoints:
bound = entrypoint.bind(self.interface, method_name)
self.entrypoints.add(bound)
self.subextensions.update(iter_extensions(bound))
[文档]
self.started = False
[文档]
self._worker_pool = GreenPool(size=self.max_workers)
[文档]
self._worker_threads: Dict[WorkerContext, eventlet.greenthread.GreenThread] = {}
[文档]
self._managed_threads: Dict[
eventlet.greenthread.GreenThread, Optional[str]
] = {}
[文档]
self._being_killed: bool = False
[文档]
self._died = Event()
@property
[文档]
def extensions(self):
return SpawningSet(self.entrypoints | self.dependencies | self.subextensions)
@property
[文档]
def interface(self):
"""一个供扩展使用的此容器的接口。"""
return self
[文档]
def start(self):
"""通过启动该容器的所有扩展来启动容器。"""
_log.debug("starting %s", self)
self.started = True
with _log_time("started %s", self):
self.extensions.all.setup() # 实际是调用的 Extention 子类 中 的 setup 函数,
self.extensions.all.start() # 实际是调用的 Extention 子类 中 的 start 函数,
[文档]
def stop(self):
"""优雅地停止容器。
首先,所有入口点都会被要求执行 `stop()`。这确保不会启动新的工作线程。
当对扩展调用 `stop()` 时,扩展有责任优雅地关闭,并且只有在它们停止后才返回。
在所有入口点停止后,容器会等待所有活跃的工作线程完成。
在所有活跃的工作线程停止后,容器会停止所有依赖提供者。
此时,应该不再有托管线程。如果仍然有托管线程,它们将被容器终止。
"""
if self._died.ready():
_log.debug("already stopped %s", self)
return
if self._being_killed:
# 当一个容器由一个运行器托管并在其 kill 方法中让出控制时,这种竞争条件可能会发生;
# 如果调度不幸,运行器将尝试在 `self._died` 结果之前调用 `stop()`。
_log.debug("already being killed %s", self)
try:
self._died.wait()
except Exception:
pass # don't re-raise if we died with an exception
return
_log.debug("stopping %s", self)
with _log_time("stopped %s", self):
# 入口点必须在依赖项之前停止,以确保正在运行的工作线程能够成功完成。
self.entrypoints.all.stop()
# 可能仍然有一些正在运行的工作线程,我们必须等待它们完成后才能停止依赖项。
self._worker_pool.waitall()
# 现在可以安全地停止任何依赖项,因为没有活动的工作线程可能正在使用它。
self.dependencies.all.stop()
# 最后,停止剩余的扩展。
self.subextensions.all.stop()
# 以及它们生成的任何托管线程。
self._kill_managed_threads()
self.started = False
# if `kill` is called after `stop`, they race to send this
# 如果在 `stop` 之后调用 `kill` ,它们会竞争发送这个。
if not self._died.ready():
self._died.send(None)
[文档]
def kill(self, exc_info=None):
"""以半优雅的方式终止容器。
首先终止入口点,然后是任何活跃的工作线程。接下来,终止依赖项。最后,终止任何剩余的托管线程。
如果提供了 ``exc_info``,异常将由 :meth:`~wait` 引发。
"""
if self._being_killed:
# 如果托管线程在容器被终止时以异常退出,或者多个错误同时发生,就会发生这种情况
_log.debug("已经在终止 %s ... 等待死亡", self)
try:
self._died.wait()
except Exception:
pass # 如果我们以异常死亡,则不重新引发
return
self._being_killed = True
if self._died.ready():
_log.debug("已经停止 %s", self)
return
if exc_info is not None:
_log.info("因 %s 而终止 %s", self, exc_info[1])
else:
_log.info("终止 %s", self)
# 防止在终止过程中抛出异常的扩展;容器已经因异常而死亡,因此忽略其他任何异常
def safely_kill_extensions(ext_set):
try:
ext_set.kill()
except Exception as exc:
_log.warning("扩展在终止期间引发了 `%s`", exc)
safely_kill_extensions(self.entrypoints.all)
self._kill_worker_threads()
safely_kill_extensions(self.extensions.all)
self._kill_managed_threads()
self.started = False
# 如果在 `stop` 之后调用 `kill`,它们会竞争发送这个
if not self._died.ready():
self._died.send(None, exc_info)
[文档]
def wait(self):
"""阻塞直到容器已停止。
如果容器因异常而停止,``wait()`` 将引发该异常。
在托管线程或工作生命周期(例如在 :meth:`DependencyProvider.worker_setup` 内部)
中引发的任何未处理异常将导致容器被 ``kill()``,并且在 ``wait()`` 中引发该异常。
"""
return self._died.wait()
[文档]
def spawn_worker(
self,
entrypoint: Entrypoint,
args: Iterable,
kwargs: dict,
context_data: Optional[Any] = None,
handle_result: Optional[
Callable[[WorkerContext, Any, Optional[OptExcInfo]]]
] = None,
):
"""为运行由 `entrypoint` 装饰的服务方法生成一个工作线程。
``args`` 和 ``kwargs`` 用作服务方法的参数。
``context_data`` 用于初始化 ``WorkerContext``。
``handle_result`` 是一个可选函数,可能由入口点传入。
它在服务方法返回的结果或引发的错误时被调用。
如果提供,则必须返回一个值用于 ``result`` 和 ``exc_info``,以便传播到依赖项;
这些值可能与服务方法返回的值不同。
"""
if self._being_killed:
_log.info("由于正在被终止,阻止工作线程的生成")
raise ContainerBeingKilled()
service = self.service_cls()
worker_ctx = WorkerContext(
self, service, entrypoint, args, kwargs, data=context_data
)
_log.debug("生成 %s", worker_ctx)
gt = self._worker_pool.spawn(self._run_worker, worker_ctx, handle_result)
gt.link(self._handle_worker_thread_exited, worker_ctx)
self._worker_threads[worker_ctx] = gt
return worker_ctx
[文档]
def spawn_managed_thread(self, fn: Callable, identifier: Optional[str] = None):
"""生成一个托管线程以代表扩展来运行 ``fn``。
传入的 `identifier` 将包含在与该线程相关的日志中,默认情况下如果已设置则为 `fn.__name__`。
在 ``fn`` 内部引发的任何未捕获错误将导致容器被终止。
终止生成的线程的责任在于调用者。
如果在 :meth:`ServiceContainer.stop` 期间所有扩展停止后它们仍在运行,线程将自动被终止。
扩展应该将所有线程生成委托给容器。
"""
if identifier is None:
identifier = getattr(fn, "__name__", "<unknown>")
gt = eventlet.spawn(fn)
self._managed_threads[gt] = identifier
gt.link(self._handle_managed_thread_exited, identifier)
return gt
[文档]
def _run_worker(
self,
worker_ctx: WorkerContext,
handle_result: Optional[Callable[[WorkerContext, Any, Optional[OptExcInfo]]]],
):
_log.debug("正在设置 %s", worker_ctx)
_log.debug(
"对于 %s 的调用栈: %s", worker_ctx, "->".join(worker_ctx.call_id_stack)
)
with _log_time("运行工作线程 %s", worker_ctx):
self._inject_dependencies(worker_ctx)
self._worker_setup(worker_ctx)
result = exc_info = None
method_name = worker_ctx.entrypoint.method_name
method: Callable = getattr(worker_ctx.service, method_name) # type: ignore
try:
_log.debug("正在调用处理器 %s", worker_ctx)
with _log_time("运行处理器 %s", worker_ctx):
# 动态调用 RPC 远程方法
result = method(*worker_ctx.args, **worker_ctx.kwargs)
except Exception as exc:
if isinstance(exc, worker_ctx.entrypoint.expected_exceptions):
_log.warning(
"(预期的) 处理工作线程 %s 时发生错误: %s",
worker_ctx,
exc,
exc_info=True,
)
else:
_log.exception("处理工作线程 %s 时发生错误: %s", worker_ctx, exc)
exc_info = sys.exc_info()
if handle_result is not None:
_log.debug("处理结果 %s", worker_ctx)
with _log_time("处理结果完成 %s", worker_ctx):
result, exc_info = handle_result(worker_ctx, result, exc_info)
with _log_time("拆除工作线程 %s", worker_ctx):
self._worker_result(worker_ctx, result, exc_info)
# 我们不再需要这个,
# 打破循环意味着这可以立即回收,
# 而不是等待垃圾回收清扫
del exc_info
self._worker_teardown(worker_ctx)
[文档]
def _inject_dependencies(self, worker_ctx: WorkerContext):
"""注入依赖"""
for provider in self.dependencies:
dependency = provider.get_dependency(worker_ctx) # 代理调用
setattr(worker_ctx.service, provider.attr_name, dependency)
[文档]
def _worker_setup(self, worker_ctx):
"""调用提供者的 接口: worker_setup"""
for provider in self.dependencies:
provider.worker_setup(worker_ctx) # 代理调用
[文档]
def _worker_result(self, worker_ctx: WorkerContext, result, exc_info):
"""调用提供者的 接口: worker_result"""
_log.debug("signalling result for %s", worker_ctx)
for provider in self.dependencies:
provider.worker_result(worker_ctx, result, exc_info)
[文档]
def _worker_teardown(self, worker_ctx: WorkerContext):
"""调用提供者的 接口: worker_teardown"""
for provider in self.dependencies:
provider.worker_teardown(worker_ctx) # 代理调用
[文档]
def _kill_worker_threads(self):
"""终止任何当前正在执行的工作线程。
参见 :meth:`ServiceContainer.spawn_worker`
"""
num_workers = len(self._worker_threads) # 代理调用
if num_workers:
_log.warning("正在终止 %s 个活跃工作线程", num_workers)
for worker_ctx, gt in list(self._worker_threads.items()):
_log.warning("正在终止 %s 的活跃工作线程", worker_ctx)
gt.kill()
[文档]
def _kill_managed_threads(self):
"""终止任何当前正在执行的托管线程。
参见 :meth:`ServiceContainer.spawn_managed_thread`
"""
num_threads = len(self._managed_threads)
if num_threads:
_log.warning("正在终止 %s 个托管线程", num_threads)
for gt, identifier in list(self._managed_threads.items()):
_log.warning("正在终止托管线程 `%s`", identifier)
gt.kill()
[文档]
def _handle_worker_thread_exited(
self, gt: eventlet.greenthread.GreenThread, worker_ctx: WorkerContext
):
self._worker_threads.pop(worker_ctx, None)
self._handle_thread_exited(gt)
[文档]
def _handle_managed_thread_exited(
self, gt: eventlet.greenthread.GreenThread, extension: Iterable[Extension]
):
self._managed_threads.pop(gt, None)
self._handle_thread_exited(gt)
[文档]
def _handle_thread_exited(self, gt: eventlet.greenthread.GreenThread):
try:
gt.wait()
except GreenletExit:
# 我们对容器终止的线程不太在意
# 这可能在 stop() 和 kill() 中发生,如果扩展
# 没有正确处理它们的线程
_log.debug("%s 线程被容器终止", self)
except Exception:
_log.critical("%s 线程以错误退出", self, exc_info=True)
# 在线程中引发的任何未捕获错误都是意外行为
# 可能是扩展或容器中的错误。
# 为了安全起见,我们调用 self.kill() 来终止我们的依赖项,并
# 提供在 self.wait() 中引发的异常信息。
self.kill(sys.exc_info())
[文档]
def __repr__(self):
service_name = self.service_name
return "<ServiceContainer [{}] at 0x{:x}>".format(service_name, id(self))