API概要¶
nameko.containers¶
- class nameko.containers.ServiceContainer(service_cls: Type, config: dict)[源代码]
服务容器
- property interface[源代码]
一个供扩展使用的此容器的接口。
- kill(exc_info=None)[源代码]
以半优雅的方式终止容器。
首先终止入口点,然后是任何活跃的工作线程。接下来,终止依赖项。最后,终止任何剩余的托管线程。
如果提供了
exc_info
,异常将由wait()
引发。
- spawn_managed_thread(fn: Callable, identifier: str | None = None)[源代码]
生成一个托管线程以代表扩展来运行
fn
。 传入的 identifier 将包含在与该线程相关的日志中,默认情况下如果已设置则为 fn.__name__。在
fn
内部引发的任何未捕获错误将导致容器被终止。终止生成的线程的责任在于调用者。 如果在
ServiceContainer.stop()
期间所有扩展停止后它们仍在运行,线程将自动被终止。扩展应该将所有线程生成委托给容器。
- spawn_worker(entrypoint: Entrypoint, args: Iterable, kwargs: dict, context_data: Any | None = None, handle_result: Callable[[WorkerContext, Any, OptExcInfo | None]] | None = None)[源代码]
为运行由 entrypoint 装饰的服务方法生成一个工作线程。
args
和kwargs
用作服务方法的参数。context_data
用于初始化WorkerContext
。handle_result
是一个可选函数,可能由入口点传入。 它在服务方法返回的结果或引发的错误时被调用。 如果提供,则必须返回一个值用于result
和exc_info
,以便传播到依赖项; 这些值可能与服务方法返回的值不同。
- start()[源代码]
通过启动该容器的所有扩展来启动容器。
- stop()[源代码]
优雅地停止容器。
首先,所有入口点都会被要求执行 stop()。这确保不会启动新的工作线程。
当对扩展调用 stop() 时,扩展有责任优雅地关闭,并且只有在它们停止后才返回。
在所有入口点停止后,容器会等待所有活跃的工作线程完成。
在所有活跃的工作线程停止后,容器会停止所有依赖提供者。
此时,应该不再有托管线程。如果仍然有托管线程,它们将被容器终止。
- wait()[源代码]
阻塞直到容器已停止。
如果容器因异常而停止,
wait()
将引发该异常。在托管线程或工作生命周期(例如在
DependencyProvider.worker_setup()
内部) 中引发的任何未处理异常将导致容器被kill()
,并且在wait()
中引发该异常。
- class nameko.containers.WorkerContext(container: ServiceContainer, service: Type, entrypoint: Entrypoint, args=None, kwargs=None, data=None)[源代码]
工作者上下文
- property call_id[源代码]
调用ID
- nameko.containers.get_container_cls(config: dict) Type[ServiceContainer] | Any [源代码]
获取容器类
- nameko.containers.get_service_name(service_cls: Type)[源代码]
获取微服务名称
nameko.extensions¶
- class nameko.extensions.DependencyProvider(*args, **kwargs)[源代码]
- bind(container, attr_name: str)[源代码]
获取一个依赖项的实例,以便与 container 和 attr_name 绑定。
- get_dependency(worker_ctx)[源代码]
在工作者执行之前调用。依赖提供者应返回一个对象,以便容器将其注入到工作者实例中。
- worker_result(worker_ctx, result=None, exc_info=None)[源代码]
在服务工作者执行结果时调用。
需要处理结果的依赖项应在此处进行处理。此方法在任何工作者完成时会被调用所有 Dependency 实例。
示例:数据库会话依赖项可能会刷新事务。
- Parameters:
- worker_ctx
WorkerContext
- worker_ctx
- worker_setup(worker_ctx)[源代码]
在服务工作者执行任务之前调用。
依赖项应在此处进行任何预处理,如果失败则引发异常。
Example: ...
- Parameters:
- worker_ctx
WorkerContext
- worker_ctx
- worker_teardown(worker_ctx)[源代码]
在服务工作者执行完任务后调用。
依赖项应在此处进行任何后处理,如果失败则引发异常。
示例:数据库会话依赖项可能会提交会话。
- Parameters:
- worker_ctx
WorkerContext
- worker_ctx
- class nameko.extensions.Entrypoint(*args, **kwargs)[源代码]
入口点封装类
- bind(container, method_name)[源代码]
获取此入口点的实例,以便与 method_name 绑定到 container。
- method_name = None[源代码]
记录了RPC调用的方法名称
- class nameko.extensions.Extension(*args, **kwargs)[源代码]
请注意,Extension.__init__ 在 bind 过程中以及实例化时都会被调用,因此请避免在此方法中产生副作用。请使用 setup。
此外,bind 和 iter_extensions 使用反射来查找扩展可能声明的任何子扩展。扩展上的任何描述符应该预计在反射过程中被调用,这发生在 ServiceContainer.__init__ 和 ServiceContainer.setup 之间。
Extension.container 属性提供对绑定到该扩展的 nameko.containers.ServiceContainer 实例的访问,否则为 None。
- bind(container)[源代码]
获取当前扩展的实例绑定到 container.
- kill()[源代码]
在没有优雅关闭的情况下调用以停止此扩展。
扩展应在此处紧急关闭。这意味着尽快停止,省略清理操作。对于某些依赖项,这可能与 stop() 不同。
例如,messaging.QueueConsumer 类跟踪正在处理的消息和待处理的消息确认。它的 kill 实现会尽快丢弃这些消息并与 Rabbit 断开连接。
在执行 kill 时,扩展不应引发异常,因为容器已经在关闭。相反,它们应该记录适当的信息,并捕获异常,以允许容器继续关闭。
- setup()[源代码]
在容器启动之前调用了绑定的扩展。
扩展应在此处进行任何必要的初始化。
- start()[源代码]
在容器成功启动时调用绑定的扩展。
此方法仅在所有其他扩展成功返回 Extension.setup 后被调用。如果扩展对外部事件做出反应,它现在应该开始对此进行响应。
- stop()[源代码]
在服务容器开始关闭时调用。
扩展应在此处执行任何优雅的关闭操作。
- nameko.extensions.iter_extensions(extension)[源代码]
对 extension 的子扩展进行深度优先迭代器。
nameko.events¶
提供了对核心消息模块的高级接口。
事件是特殊的消息,可以由一个服务发出,并由其他监听服务处理。
事件由一个标识符和一些数据组成,并使用从 EventDispatcher
实例获得的注入进行调度。
事件是异步调度的。仅保证事件已被调度,并不保证它被监听器接收或处理。
要监听事件,服务必须使用 handle_event()
入口点声明一个处理程序,提供目标服务和事件类型过滤器。
示例:
# 服务 A
def edit_foo(self, id):
# ...
self.dispatch('foo_updated', {'id': id})
# 服务 B
@handle_event('service_a', 'foo_updated')
def bar(event_data):
pass
- class nameko.events.EventDispatcher(*args, **kwargs)[源代码]
通过依赖注入提供事件调度方法。
发出的事件将通过服务的事件交换进行调度, 该交换会自动声明为主题交换。 交换的名称将是 {service-name}.events。
通过调度器发出的事件将被序列化并发布到事件交换。 事件的类型属性用作路由键,可用于在监听器端进行过滤。
调度器将在事件消息发布后立即返回。 不保证任何服务会接收事件,仅保证事件已成功调度。
示例:
class Spammer(object): dispatch_spam = EventDispatcher() def emit_spam(self): evt_data = '火腿和鸡蛋' self.dispatch_spam('spam.ham', evt_data)
- get_dependency(worker_ctx)[源代码]
在服务实例上注入一个调度方法
- setup()[源代码]
在容器启动之前调用了绑定的扩展。
扩展应在此处进行任何必要的初始化。
- class nameko.events.EventHandler(*args, **kwargs)[源代码]
- property broadcast_identifier[源代码]
用于 BROADCAST 类型处理程序的唯一字符串,以识别服务实例。
当使用 BROADCAST 处理程序类型时,broadcast_identifier 将附加到队列名称。 它必须唯一地识别接收广播的服务实例。
默认的 broadcast_identifier 是在服务启动时设置的 uuid。 当服务重新启动时,它会改变,这意味着任何未消费的消息 将不会被发送到 '旧' 服务实例的 '新' 实例接收。
@property def broadcast_identifier(self): # 使用 uuid 作为标识符。 # 当服务重新启动时,标识符将会改变,任何未消费的消息将会丢失 return uuid.uuid4().hex
因此,默认行为与可靠交付不兼容。
一个能够在服务重启时存活的替代 broadcast_identifier 是
@property def broadcast_identifier(self): # 使用机器的主机名作为标识符。 # 这假设在任何给定机器上仅运行一个服务实例 return socket.gethostname()
如果这两种方法都不合适,可以从配置文件中读取该值
@property def broadcast_identifier(self): return self.config['SERVICE_IDENTIFIER'] # 或类似的
广播队列是独占的,以确保 broadcast_identifier 值是唯一的。
由于此方法是描述符,它将在容器创建期间被调用, 与配置的 handler_type 无关。 有关更多详细信息,请参见
nameko.extensions.Extension
。
- setup()[源代码]
在容器启动之前调用了绑定的扩展。
扩展应在此处进行任何必要的初始化。
- exception nameko.events.EventHandlerConfigurationError[源代码]
当事件处理程序配置错误时引发的异常。
nameko.standalone¶
nameko.standalone.rpc¶
- class nameko.standalone.rpc.ClusterProxy(worker_ctx, reply_listener)[源代码]
一个单线程的 RPC 代理,用于服务集群。可以通过属性访问各个服务,这些属性返回服务代理。代理上的方法调用会转换为对服务的 RPC 调用,并直接返回响应。
允许未托管在 Nameko 中的服务向 Nameko 集群发出 RPC 请求。通常用作上下文管理器,但也可以手动启动和停止。
这类似于服务代理,但可以为所有服务的调用使用一个单独的回复队列,而一组服务代理则会为每个代理拥有一个回复队列。
用法
作为上下文管理器使用:
with ClusterRpcProxy(config) as proxy: proxy.service.method() proxy.other_service.method()
等效的调用,手动启动和停止:
proxy = ClusterRpcProxy(config) proxy = proxy.start() proxy.targetservice.method() proxy.other_service.method() proxy.stop()
如果调用了
start()
,则必须最终调用stop()
以关闭与代理的连接。您还可以提供
context_data
,这是一个数据字典,将被序列化到 AMQP 消息头中,并指定自定义的工作上下文类以序列化它们。当服务名称在 Python 中不合法时,您也可以使用类似字典的语法:
with ClusterRpcProxy(config) as proxy: proxy['service-name'].method() proxy['other-service'].method()
- class nameko.standalone.rpc.ConsumeEvent(queue_consumer: PollingQueueConsumer, correlation_id: str)[源代码]
具有与 eventlet.Event 相同接口的 RPC 消费者的事件。
- wait()[源代码]
对其 queue_consumer 进行阻塞调用,直到处理完具有给定 correlation_id 的消息。
在阻塞调用退出时, self.send() 将被调用,并传入接收到的消息的主体(参见
handle_message()
)。异常将被直接引发。
- class nameko.standalone.rpc.PollingQueueConsumer(timeout=None)[源代码]
实现了
QueueConsumer
的最小接口。 它不是在单独的线程中处理消息,而是提供了一种轮询方法,以阻塞直到到达具有相同关联 ID 的 RPC 代理调用的消息。
- class nameko.standalone.rpc.ServiceRpcProxy(service_name, *args, **kwargs)[源代码]
一个单线程的 RPC 代理,用于命名服务。代理上的方法调用会转换为对服务的 RPC 调用,并直接返回响应。
允许未托管在 Nameko 中的服务向 Nameko 集群发出 RPC 请求。通常用作上下文管理器,但也可以手动启动和停止。
用法
作为上下文管理器使用:
with ServiceRpcProxy('targetservice', config) as proxy: proxy.method()
等效的调用,手动启动和停止:
targetservice_proxy = ServiceRpcProxy('targetservice', config) proxy = targetservice_proxy.start() proxy.method() targetservice_proxy.stop()
如果调用了
start()
,则必须最终调用stop()
以关闭与代理的连接。您还可以提供
context_data
,这是一个数据字典,将被序列化到 AMQP 消息头中,并指定自定义的工作上下文类以序列化它们。
- class nameko.standalone.rpc.SingleThreadedReplyListener(*args, **kwargs)[源代码]
一个使用自定义队列消费者和 ConsumeEvent 的 ReplyListener 。
nameko.standalone.events¶
- nameko.standalone.events.event_dispatcher(nameko_config, **kwargs)[源代码]
返回一个用于分发 Nameko 事件的函数。
- nameko.standalone.events.get_event_exchange(service_name, config)[源代码]
获取
service_name
事件的交换机。
nameko.rpc¶
- class nameko.rpc.MethodProxy(worker_ctx, service_name, method_name, reply_listener: ReplyListener, **options)[源代码]
RPC 方法代理
- property serializer[源代码]
用于发布消息有效负载时的默认序列化器。
必须作为一个 kombu 序列化器 注册。
- class nameko.rpc.ReplyListener(*args, **kwargs)[源代码]
- setup()[源代码]
在容器启动之前调用了绑定的扩展。
扩展应在此处进行任何必要的初始化。
- stop()[源代码]
在服务容器开始关闭时调用。
扩展应在此处执行任何优雅的关闭操作。
- class nameko.rpc.Rpc(*args, **kwargs)[源代码]
- setup()[源代码]
在容器启动之前调用了绑定的扩展。
扩展应在此处进行任何必要的初始化。
- stop()[源代码]
在服务容器开始关闭时调用。
扩展应在此处执行任何优雅的关闭操作。
- class nameko.rpc.RpcConsumer(*args, **kwargs)[源代码]
- setup()[源代码]
在容器启动之前调用了绑定的扩展。
扩展应在此处进行任何必要的初始化。
- stop()[源代码]
停止 RpcConsumer。
当最后一个 Rpc 子类从队列消费者中注销时,RpcConsumer 将正常注销。如果没有注册任何提供者,则我们应该在被请求停止时立即从队列消费者注销。
- unregister_provider(provider)[源代码]
注销提供者。
阻塞直到此 RpcConsumer 从其队列消费者中注销,这只有在所有提供者都请求注销时才会发生。
- class nameko.rpc.RpcProxy(*args, **kwargs)[源代码]
- get_dependency(worker_ctx)[源代码]
在工作者执行之前调用。依赖提供者应返回一个对象,以便容器将其注入到工作者实例中。
- class nameko.rpc.RpcReply(reply_event: Event)[源代码]
解析RPC响应内容
nameko.runners¶
- class nameko.runners.ServiceRunner(config: dict)[源代码]
允许用户并发提供多个服务。 调用者可以为多个服务类注册名称,然后使用 start 方法来提供它们,并使用 stop 和 kill 方法 来停止它们。wait 方法将阻塞,直到所有服务停止。
示例:
runner = ServiceRunner(config) runner.add_service(Foobar) runner.add_service(Spam) add_sig_term_handler(runner.kill) runner.start() runner.wait()
- add_service(cls: Type)[源代码]
将服务类添加到运行器中。 对于给定的服务名称,最多只能有一个服务类。 服务类必须在调用 start() 之前注册。
- kill()[源代码]
并发杀死所有正在运行的容器。 该方法将在所有容器停止之前将阻塞。
- start()[源代码]
启动所有注册的服务。
每个服务都会使用 __init__ 方法中提供的容器类创建一个新容器。
所有容器将并发启动,该方法将在所有容器完成启动例程之前阻塞。
- stop()[源代码]
并发停止所有正在运行的容器。 该方法在所有容器停止之前将阻塞。
- wait()[源代码]
等待所有正在运行的容器停止。
- nameko.runners.run_services(config: dict, *services: Type, **kwargs)[源代码]
为上下文块提供多个服务。 调用者可以指定多个服务类,然后在退出上下文块时 停止(默认)或杀死它们。
示例:
with run_services(config, Foobar, Spam) as runner: # 与服务交互并在退出块时停止它们 # 服务已停止
可以通过关键字参数指定额外的配置,以供 :class:
ServiceRunner
实例使用:with run_services(config, Foobar, Spam, kill_on_exit=True): # 与服务交互 # 服务已被杀死
- Parameters:
- configdict
用于实例化服务容器的配置
- services服务定义
在上下文块中提供的服务
- kill_on_exitbool (default=False)
如果为
True
,在退出上下文块时对服务容器调用kill()
。 否则在退出块时将调用stop()
。
- Returns:
配置好的
ServiceRunner
实例