nameko.standalone.rpc

Attributes

Classes

ConsumeEvent

具有与 eventlet.Event 相同接口的 RPC 消费者的事件。

PollingQueueConsumer

实现了 QueueConsumer 的最小接口。

SingleThreadedReplyListener

一个使用自定义队列消费者和 ConsumeEventReplyListener

StandaloneProxyBase

ServiceRpcProxy

一个单线程的 RPC 代理,用于命名服务。代理上的方法调用会转换为对服务的 RPC 调用,并直接返回响应。

ClusterProxy

一个单线程的 RPC 代理,用于服务集群。可以通过属性访问各个服务,这些属性返回服务代理。代理上的方法调用会转换为对服务的 RPC 调用,并直接返回响应。

ClusterRpcProxy

Module Contents

nameko.standalone.rpc._logger[源代码]
class nameko.standalone.rpc.ConsumeEvent(queue_consumer: PollingQueueConsumer, correlation_id: str)[源代码]

Bases: object

具有与 eventlet.Event 相同接口的 RPC 消费者的事件。

exception = None[源代码]
correlation_id[源代码]
queue_consumer[源代码]
send(body)[源代码]
send_exception(exc)[源代码]
wait()[源代码]

对其 queue_consumer 进行阻塞调用,直到处理完具有给定 correlation_id 的消息。

在阻塞调用退出时, self.send() 将被调用,并传入接收到的消息的主体(参见 handle_message() )。

异常将被直接引发。

class nameko.standalone.rpc.PollingQueueConsumer(timeout=None)[源代码]

Bases: object

实现了 QueueConsumer 的最小接口。 它不是在单独的线程中处理消息,而是提供了一种轮询方法,以阻塞直到到达具有相同关联 ID 的 RPC 代理调用的消息。

consumer = None[源代码]
stopped = True[源代码]
timeout[源代码]
replies[源代码]
_setup_consumer()[源代码]
register_provider(provider)[源代码]
unregister_provider(provider)[源代码]
ack_message(msg)[源代码]
on_message(body, message)[源代码]
get_message(correlation_id)[源代码]
class nameko.standalone.rpc.SingleThreadedReplyListener(timeout=None)[源代码]

Bases: nameko.rpc.ReplyListener

一个使用自定义队列消费者和 ConsumeEventReplyListener

queue_consumer = None[源代码]
_reply_events: Dict[str, ConsumeEvent][源代码]
get_reply_event(correlation_id: str)[源代码]
class nameko.standalone.rpc.StandaloneProxyBase(config: dict, context_data=None, timeout=None, reply_listener_cls=SingleThreadedReplyListener)[源代码]

Bases: object

class ServiceContainer(config)[源代码]

Bases: object

实现了 ServiceContainer 的最小接口,以供该模块中的子类和 RPC 导入使用。

service_name = 'standalone_rpc_proxy'[源代码]
config[源代码]
shared_extensions[源代码]
class Dummy(expected_exceptions=(), sensitive_arguments=(), **kwargs)[源代码]

Bases: nameko.extensions.Entrypoint

入口点封装类

Parameters:
expected_exceptions异常类或异常类元组

指定可能由调用者引起的异常(例如,通过提供错误的参数)。 保存在入口点实例中作为 entrypoint.expected_exceptions,供其他扩展(例如监控系统)后续检查。

sensitive_arguments字符串或字符串元组

将参数或参数的一部分标记为敏感。保存在入口点实例中作为 entrypoint.sensitive_arguments, 供其他扩展(例如日志系统)后续检查。

seealso:

nameko.utils.get_redacted_args()

method_name = 'call'[源代码]

记录了RPC调用的方法名称

_proxy = None[源代码]
container[源代码]
_worker_ctx[源代码]
_reply_listener[源代码]
__enter__()[源代码]
__exit__(tpe, value, traceback)[源代码]
start()[源代码]
stop()[源代码]
class nameko.standalone.rpc.ServiceRpcProxy(service_name, *args, **kwargs)[源代码]

Bases: StandaloneProxyBase

一个单线程的 RPC 代理,用于命名服务。代理上的方法调用会转换为对服务的 RPC 调用,并直接返回响应。

允许未托管在 Nameko 中的服务向 Nameko 集群发出 RPC 请求。通常用作上下文管理器,但也可以手动启动和停止。

用法

作为上下文管理器使用:

with ServiceRpcProxy('targetservice', config) as proxy:
    proxy.method()

等效的调用,手动启动和停止:

targetservice_proxy = ServiceRpcProxy('targetservice', config)
proxy = targetservice_proxy.start()
proxy.method()
targetservice_proxy.stop()

如果调用了 start() ,则必须最终调用 stop() 以关闭与代理的连接。

您还可以提供 context_data ,这是一个数据字典,将被序列化到 AMQP 消息头中,并指定自定义的工作上下文类以序列化它们。

_proxy[源代码]
class nameko.standalone.rpc.ClusterProxy(worker_ctx, reply_listener)[源代码]

Bases: object

一个单线程的 RPC 代理,用于服务集群。可以通过属性访问各个服务,这些属性返回服务代理。代理上的方法调用会转换为对服务的 RPC 调用,并直接返回响应。

允许未托管在 Nameko 中的服务向 Nameko 集群发出 RPC 请求。通常用作上下文管理器,但也可以手动启动和停止。

这类似于服务代理,但可以为所有服务的调用使用一个单独的回复队列,而一组服务代理则会为每个代理拥有一个回复队列。

用法

作为上下文管理器使用:

with ClusterRpcProxy(config) as proxy:
    proxy.service.method()
    proxy.other_service.method()

等效的调用,手动启动和停止:

proxy = ClusterRpcProxy(config)
proxy = proxy.start()
proxy.targetservice.method()
proxy.other_service.method()
proxy.stop()

如果调用了 start() ,则必须最终调用 stop() 以关闭与代理的连接。

您还可以提供 context_data ,这是一个数据字典,将被序列化到 AMQP 消息头中,并指定自定义的工作上下文类以序列化它们。

当服务名称在 Python 中不合法时,您也可以使用类似字典的语法:

with ClusterRpcProxy(config) as proxy:
    proxy['service-name'].method()
    proxy['other-service'].method()
_worker_ctx[源代码]
_reply_listener[源代码]
_proxies[源代码]
__getattr__(name)[源代码]
__getitem__(name)[源代码]

Enable dict-like access on the proxy.

class nameko.standalone.rpc.ClusterRpcProxy(*args, **kwargs)[源代码]

Bases: StandaloneProxyBase

_proxy[源代码]