API概要

nameko.containers

class nameko.containers.ServiceContainer(service_cls: Type, config: dict)[源代码]

服务容器

property interface[源代码]

一个供扩展使用的此容器的接口。

kill(exc_info=None)[源代码]

以半优雅的方式终止容器。

首先终止入口点,然后是任何活跃的工作线程。接下来,终止依赖项。最后,终止任何剩余的托管线程。

如果提供了 exc_info,异常将由 wait() 引发。

spawn_managed_thread(fn: Callable, identifier: str | None = None)[源代码]

生成一个托管线程以代表扩展来运行 fn。 传入的 identifier 将包含在与该线程相关的日志中,默认情况下如果已设置则为 fn.__name__

fn 内部引发的任何未捕获错误将导致容器被终止。

终止生成的线程的责任在于调用者。 如果在 ServiceContainer.stop() 期间所有扩展停止后它们仍在运行,线程将自动被终止。

扩展应该将所有线程生成委托给容器。

spawn_worker(entrypoint: Entrypoint, args: Iterable, kwargs: dict, context_data: Any | None = None, handle_result: Callable[[WorkerContext, Any, OptExcInfo | None]] | None = None)[源代码]

为运行由 entrypoint 装饰的服务方法生成一个工作线程。

argskwargs 用作服务方法的参数。

context_data 用于初始化 WorkerContext

handle_result 是一个可选函数,可能由入口点传入。 它在服务方法返回的结果或引发的错误时被调用。 如果提供,则必须返回一个值用于 resultexc_info,以便传播到依赖项; 这些值可能与服务方法返回的值不同。

start()[源代码]

通过启动该容器的所有扩展来启动容器。

stop()[源代码]

优雅地停止容器。

首先,所有入口点都会被要求执行 stop()。这确保不会启动新的工作线程。

当对扩展调用 stop() 时,扩展有责任优雅地关闭,并且只有在它们停止后才返回。

在所有入口点停止后,容器会等待所有活跃的工作线程完成。

在所有活跃的工作线程停止后,容器会停止所有依赖提供者。

此时,应该不再有托管线程。如果仍然有托管线程,它们将被容器终止。

wait()[源代码]

阻塞直到容器已停止。

如果容器因异常而停止,wait() 将引发该异常。

在托管线程或工作生命周期(例如在 DependencyProvider.worker_setup() 内部) 中引发的任何未处理异常将导致容器被 kill(),并且在 wait() 中引发该异常。

class nameko.containers.WorkerContext(container: ServiceContainer, service: Type, entrypoint: Entrypoint, args=None, kwargs=None, data=None)[源代码]

工作者上下文

property call_id[源代码]

调用ID

nameko.containers.get_container_cls(config: dict) Type[ServiceContainer] | Any[源代码]

获取容器类

nameko.containers.get_service_name(service_cls: Type)[源代码]

获取微服务名称

nameko.extensions

class nameko.extensions.DependencyProvider(*args, **kwargs)[源代码]
bind(container, attr_name: str)[源代码]

获取一个依赖项的实例,以便与 containerattr_name 绑定。

get_dependency(worker_ctx)[源代码]

在工作者执行之前调用。依赖提供者应返回一个对象,以便容器将其注入到工作者实例中。

worker_result(worker_ctx, result=None, exc_info=None)[源代码]

在服务工作者执行结果时调用。

需要处理结果的依赖项应在此处进行处理。此方法在任何工作者完成时会被调用所有 Dependency 实例。

示例:数据库会话依赖项可能会刷新事务。

Parameters:
worker_ctxWorkerContext

spawn_worker()

worker_setup(worker_ctx)[源代码]

在服务工作者执行任务之前调用。

依赖项应在此处进行任何预处理,如果失败则引发异常。

Example: ...

Parameters:
worker_ctxWorkerContext

spawn_worker()

worker_teardown(worker_ctx)[源代码]

在服务工作者执行完任务后调用。

依赖项应在此处进行任何后处理,如果失败则引发异常。

示例:数据库会话依赖项可能会提交会话。

Parameters:
worker_ctxWorkerContext

spawn_worker()

class nameko.extensions.Entrypoint(*args, **kwargs)[源代码]

入口点封装类

bind(container, method_name)[源代码]

获取此入口点的实例,以便与 method_name 绑定到 container

method_name = None[源代码]

记录了RPC调用的方法名称

class nameko.extensions.Extension(*args, **kwargs)[源代码]

请注意,Extension.__init__bind 过程中以及实例化时都会被调用,因此请避免在此方法中产生副作用。请使用 setup

此外,binditer_extensions 使用反射来查找扩展可能声明的任何子扩展。扩展上的任何描述符应该预计在反射过程中被调用,这发生在 ServiceContainer.__init__ServiceContainer.setup 之间。

Extension.container 属性提供对绑定到该扩展的 nameko.containers.ServiceContainer 实例的访问,否则为 None

bind(container)[源代码]

获取当前扩展的实例绑定到 container.

kill()[源代码]

在没有优雅关闭的情况下调用以停止此扩展。

扩展应在此处紧急关闭。这意味着尽快停止,省略清理操作。对于某些依赖项,这可能与 stop() 不同。

例如,messaging.QueueConsumer 类跟踪正在处理的消息和待处理的消息确认。它的 kill 实现会尽快丢弃这些消息并与 Rabbit 断开连接。

在执行 kill 时,扩展不应引发异常,因为容器已经在关闭。相反,它们应该记录适当的信息,并捕获异常,以允许容器继续关闭。

setup()[源代码]

在容器启动之前调用了绑定的扩展。

扩展应在此处进行任何必要的初始化。

start()[源代码]

在容器成功启动时调用绑定的扩展。

此方法仅在所有其他扩展成功返回 Extension.setup 后被调用。如果扩展对外部事件做出反应,它现在应该开始对此进行响应。

stop()[源代码]

在服务容器开始关闭时调用。

扩展应在此处执行任何优雅的关闭操作。

class nameko.extensions.SharedExtension(*args, **kwargs)[源代码]
bind(container)[源代码]

支持共享的绑定实现。

nameko.extensions.iter_extensions(extension)[源代码]

extension 的子扩展进行深度优先迭代器。

nameko.events

提供了对核心消息模块的高级接口。

事件是特殊的消息,可以由一个服务发出,并由其他监听服务处理。

事件由一个标识符和一些数据组成,并使用从 EventDispatcher 实例获得的注入进行调度。

事件是异步调度的。仅保证事件已被调度,并不保证它被监听器接收或处理。

要监听事件,服务必须使用 handle_event() 入口点声明一个处理程序,提供目标服务和事件类型过滤器。

示例:

# 服务 A
def edit_foo(self, id):
    # ...
    self.dispatch('foo_updated', {'id': id})

# 服务 B

@handle_event('service_a', 'foo_updated')
def bar(event_data):
    pass
class nameko.events.EventDispatcher(*args, **kwargs)[源代码]

通过依赖注入提供事件调度方法。

发出的事件将通过服务的事件交换进行调度, 该交换会自动声明为主题交换。 交换的名称将是 {service-name}.events

通过调度器发出的事件将被序列化并发布到事件交换。 事件的类型属性用作路由键,可用于在监听器端进行过滤。

调度器将在事件消息发布后立即返回。 不保证任何服务会接收事件,仅保证事件已成功调度。

示例:

class Spammer(object):
    dispatch_spam = EventDispatcher()

    def emit_spam(self):
        evt_data = '火腿和鸡蛋'
        self.dispatch_spam('spam.ham', evt_data)
get_dependency(worker_ctx)[源代码]

在服务实例上注入一个调度方法

setup()[源代码]

在容器启动之前调用了绑定的扩展。

扩展应在此处进行任何必要的初始化。

class nameko.events.EventHandler(*args, **kwargs)[源代码]
property broadcast_identifier[源代码]

用于 BROADCAST 类型处理程序的唯一字符串,以识别服务实例。

当使用 BROADCAST 处理程序类型时,broadcast_identifier 将附加到队列名称。 它必须唯一地识别接收广播的服务实例。

默认的 broadcast_identifier 是在服务启动时设置的 uuid。 当服务重新启动时,它会改变,这意味着任何未消费的消息 将不会被发送到 '旧' 服务实例的 '新' 实例接收。

@property
def broadcast_identifier(self):
    # 使用 uuid 作为标识符。
    # 当服务重新启动时,标识符将会改变,任何未消费的消息将会丢失
    return uuid.uuid4().hex

因此,默认行为与可靠交付不兼容。

一个能够在服务重启时存活的替代 broadcast_identifier

@property
def broadcast_identifier(self):
    # 使用机器的主机名作为标识符。
    # 这假设在任何给定机器上仅运行一个服务实例
    return socket.gethostname()

如果这两种方法都不合适,可以从配置文件中读取该值

@property
def broadcast_identifier(self):
    return self.config['SERVICE_IDENTIFIER']  # 或类似的

广播队列是独占的,以确保 broadcast_identifier 值是唯一的。

由于此方法是描述符,它将在容器创建期间被调用, 与配置的 handler_type 无关。 有关更多详细信息,请参见 nameko.extensions.Extension

setup()[源代码]

在容器启动之前调用了绑定的扩展。

扩展应在此处进行任何必要的初始化。

exception nameko.events.EventHandlerConfigurationError[源代码]

当事件处理程序配置错误时引发的异常。

nameko.standalone

nameko.standalone.rpc

class nameko.standalone.rpc.ClusterProxy(worker_ctx, reply_listener)[源代码]

一个单线程的 RPC 代理,用于服务集群。可以通过属性访问各个服务,这些属性返回服务代理。代理上的方法调用会转换为对服务的 RPC 调用,并直接返回响应。

允许未托管在 Nameko 中的服务向 Nameko 集群发出 RPC 请求。通常用作上下文管理器,但也可以手动启动和停止。

这类似于服务代理,但可以为所有服务的调用使用一个单独的回复队列,而一组服务代理则会为每个代理拥有一个回复队列。

用法

作为上下文管理器使用:

with ClusterRpcProxy(config) as proxy:
    proxy.service.method()
    proxy.other_service.method()

等效的调用,手动启动和停止:

proxy = ClusterRpcProxy(config)
proxy = proxy.start()
proxy.targetservice.method()
proxy.other_service.method()
proxy.stop()

如果调用了 start() ,则必须最终调用 stop() 以关闭与代理的连接。

您还可以提供 context_data ,这是一个数据字典,将被序列化到 AMQP 消息头中,并指定自定义的工作上下文类以序列化它们。

当服务名称在 Python 中不合法时,您也可以使用类似字典的语法:

with ClusterRpcProxy(config) as proxy:
    proxy['service-name'].method()
    proxy['other-service'].method()
class nameko.standalone.rpc.ConsumeEvent(queue_consumer: PollingQueueConsumer, correlation_id: str)[源代码]

具有与 eventlet.Event 相同接口的 RPC 消费者的事件。

wait()[源代码]

对其 queue_consumer 进行阻塞调用,直到处理完具有给定 correlation_id 的消息。

在阻塞调用退出时, self.send() 将被调用,并传入接收到的消息的主体(参见 handle_message() )。

异常将被直接引发。

class nameko.standalone.rpc.PollingQueueConsumer(timeout=None)[源代码]

实现了 QueueConsumer 的最小接口。 它不是在单独的线程中处理消息,而是提供了一种轮询方法,以阻塞直到到达具有相同关联 ID 的 RPC 代理调用的消息。

class nameko.standalone.rpc.ServiceRpcProxy(service_name, *args, **kwargs)[源代码]

一个单线程的 RPC 代理,用于命名服务。代理上的方法调用会转换为对服务的 RPC 调用,并直接返回响应。

允许未托管在 Nameko 中的服务向 Nameko 集群发出 RPC 请求。通常用作上下文管理器,但也可以手动启动和停止。

用法

作为上下文管理器使用:

with ServiceRpcProxy('targetservice', config) as proxy:
    proxy.method()

等效的调用,手动启动和停止:

targetservice_proxy = ServiceRpcProxy('targetservice', config)
proxy = targetservice_proxy.start()
proxy.method()
targetservice_proxy.stop()

如果调用了 start() ,则必须最终调用 stop() 以关闭与代理的连接。

您还可以提供 context_data ,这是一个数据字典,将被序列化到 AMQP 消息头中,并指定自定义的工作上下文类以序列化它们。

class nameko.standalone.rpc.SingleThreadedReplyListener(*args, **kwargs)[源代码]

一个使用自定义队列消费者和 ConsumeEventReplyListener

nameko.standalone.events

nameko.standalone.events.event_dispatcher(nameko_config, **kwargs)[源代码]

返回一个用于分发 Nameko 事件的函数。

nameko.standalone.events.get_event_exchange(service_name, config)[源代码]

获取 service_name 事件的交换机。

nameko.rpc

class nameko.rpc.MethodProxy(worker_ctx, service_name, method_name, reply_listener: ReplyListener, **options)[源代码]

RPC 方法代理

publisher_cls[源代码]

Publisher 的别名

property serializer[源代码]

用于发布消息有效负载时的默认序列化器。

必须作为一个 kombu 序列化器 注册。

class nameko.rpc.ReplyListener(*args, **kwargs)[源代码]
setup()[源代码]

在容器启动之前调用了绑定的扩展。

扩展应在此处进行任何必要的初始化。

stop()[源代码]

在服务容器开始关闭时调用。

扩展应在此处执行任何优雅的关闭操作。

class nameko.rpc.Rpc(*args, **kwargs)[源代码]
setup()[源代码]

在容器启动之前调用了绑定的扩展。

扩展应在此处进行任何必要的初始化。

stop()[源代码]

在服务容器开始关闭时调用。

扩展应在此处执行任何优雅的关闭操作。

class nameko.rpc.RpcConsumer(*args, **kwargs)[源代码]
setup()[源代码]

在容器启动之前调用了绑定的扩展。

扩展应在此处进行任何必要的初始化。

stop()[源代码]

停止 RpcConsumer。

当最后一个 Rpc 子类从队列消费者中注销时,RpcConsumer 将正常注销。如果没有注册任何提供者,则我们应该在被请求停止时立即从队列消费者注销。

unregister_provider(provider)[源代码]

注销提供者。

阻塞直到此 RpcConsumer 从其队列消费者中注销,这只有在所有提供者都请求注销时才会发生。

class nameko.rpc.RpcProxy(*args, **kwargs)[源代码]
get_dependency(worker_ctx)[源代码]

在工作者执行之前调用。依赖提供者应返回一个对象,以便容器将其注入到工作者实例中。

class nameko.rpc.RpcReply(reply_event: Event)[源代码]

解析RPC响应内容

nameko.runners

class nameko.runners.ServiceRunner(config: dict)[源代码]

允许用户并发提供多个服务。 调用者可以为多个服务类注册名称,然后使用 start 方法来提供它们,并使用 stop 和 kill 方法 来停止它们。wait 方法将阻塞,直到所有服务停止。

示例:

runner = ServiceRunner(config)
runner.add_service(Foobar)
runner.add_service(Spam)

add_sig_term_handler(runner.kill)

runner.start()

runner.wait()
add_service(cls: Type)[源代码]

将服务类添加到运行器中。 对于给定的服务名称,最多只能有一个服务类。 服务类必须在调用 start() 之前注册。

kill()[源代码]

并发杀死所有正在运行的容器。 该方法将在所有容器停止之前将阻塞。

start()[源代码]

启动所有注册的服务。

每个服务都会使用 __init__ 方法中提供的容器类创建一个新容器。

所有容器将并发启动,该方法将在所有容器完成启动例程之前阻塞。

stop()[源代码]

并发停止所有正在运行的容器。 该方法在所有容器停止之前将阻塞。

wait()[源代码]

等待所有正在运行的容器停止。

nameko.runners.run_services(config: dict, *services: Type, **kwargs)[源代码]

为上下文块提供多个服务。 调用者可以指定多个服务类,然后在退出上下文块时 停止(默认)或杀死它们。

示例:

with run_services(config, Foobar, Spam) as runner:
    # 与服务交互并在退出块时停止它们

# 服务已停止

可以通过关键字参数指定额外的配置,以供 :class:ServiceRunner 实例使用:

with run_services(config, Foobar, Spam, kill_on_exit=True):
    # 与服务交互

# 服务已被杀死
Parameters:
configdict

用于实例化服务容器的配置

services服务定义

在上下文块中提供的服务

kill_on_exitbool (default=False)

如果为 True,在退出上下文块时对服务容器调用 kill()。 否则在退出块时将调用 stop()

Returns:

配置好的 ServiceRunner 实例