nameko.messaging

提供核心消息装饰器和依赖项提供者。

Attributes

Exceptions

QueueConsumerStopped

Common base class for all non-exit exceptions.

Classes

HeaderEncoder

HeaderDecoder

Publisher

请注意,Extension.__init__bind 过程中以及实例化时都会被调用,因此请避免在此方法中产生副作用。请使用 setup

QueueConsumer

请注意,Extension.__init__bind 过程中以及实例化时都会被调用,因此请避免在此方法中产生副作用。请使用 setup

Consumer

入口点封装类

Module Contents

nameko.messaging._log[源代码]
class nameko.messaging.HeaderEncoder[源代码]

Bases: object

header_prefix[源代码]
_get_header_name(key)[源代码]
get_message_headers(worker_ctx)[源代码]
class nameko.messaging.HeaderDecoder[源代码]

Bases: object

header_prefix[源代码]
_strip_header_name(key)[源代码]
unpack_message_headers(message)[源代码]
class nameko.messaging.Publisher(exchange=None, queue=None, declare=None, **options)[源代码]

Bases: nameko.extensions.DependencyProvider, HeaderEncoder

请注意,Extension.__init__bind 过程中以及实例化时都会被调用,因此请避免在此方法中产生副作用。请使用 setup

此外,binditer_extensions 使用反射来查找扩展可能声明的任何子扩展。扩展上的任何描述符应该预计在反射过程中被调用,这发生在 ServiceContainer.__init__ServiceContainer.setup 之间。

Extension.container 属性提供对绑定到该扩展的 nameko.containers.ServiceContainer 实例的访问,否则为 None

提供通过依赖注入的 AMQP 消息发布方法。

在 AMQP 中,消息被发布到 交换机,并路由到绑定的 队列。该依赖接受要发布的 exchange,并确保在发布之前已声明。

可选地,您可以使用 declare 关键字参数传递其他 kombu.Exchangekombu.Queue 对象,以便在发布之前进行声明。

Parameters:
exchangekombu.Exchange

目标交换机

queuekombu.Queue

已弃用: 绑定队列。事件将发布到该队列的交换机。

declarelist

要在发布之前声明的 kombu.Exchangekombu.Queue 对象的列表。

如果未提供 exchange,则消息将发布到默认交换机。

示例:

class Foobar(object):

    publish = Publisher(exchange=...)

    def spam(self, data):
        self.publish('spam:' + data)
publisher_cls[源代码]
exchange[源代码]
options[源代码]
declare[源代码]
compat_attrs = ('retry', 'retry_policy', 'use_confirms')[源代码]
property amqp_uri[源代码]
property serializer[源代码]
默认序列化器,用于发布消息。

必须作为 kombu serializer 注册。

setup()[源代码]

在容器启动之前调用了绑定的扩展。

扩展应在此处进行任何必要的初始化。

get_dependency(worker_ctx)[源代码]

在工作者执行之前调用。依赖提供者应返回一个对象,以便容器将其注入到工作者实例中。

class nameko.messaging.QueueConsumer[源代码]

Bases: nameko.extensions.SharedExtension, nameko.extensions.ProviderCollector, kombu.mixins.ConsumerMixin

请注意,Extension.__init__bind 过程中以及实例化时都会被调用,因此请避免在此方法中产生副作用。请使用 setup

此外,binditer_extensions 使用反射来查找扩展可能声明的任何子扩展。扩展上的任何描述符应该预计在反射过程中被调用,这发生在 ServiceContainer.__init__ServiceContainer.setup 之间。

Extension.container 属性提供对绑定到该扩展的 nameko.containers.ServiceContainer 实例的访问,否则为 None

_consumers[源代码]
_pending_remove_providers[源代码]
_gt = None[源代码]
_starting = False[源代码]
_consumers_ready[源代码]
property amqp_uri[源代码]
property prefetch_count[源代码]
property accept[源代码]
_handle_thread_exited(gt)[源代码]
start()[源代码]

在容器成功启动时调用绑定的扩展。

此方法仅在所有其他扩展成功返回 Extension.setup 后被调用。如果扩展对外部事件做出反应,它现在应该开始对此进行响应。

stop()[源代码]

优雅地停止队列消费者。

等待最后一个提供者注销,并等待 ConsumerMixin 的绿色线程退出(即直到所有待处理消息都已确认或重新排队,所有消费者停止)。

kill()[源代码]

强制终止队列消费者。

stop() 不同,任何未确认的消息或重新排队请求、移除提供者的请求等都会丢失,消费线程会尽快终止。

unregister_provider(provider)[源代码]
ack_message(message)[源代码]
requeue_message(message)[源代码]
_cancel_consumers_if_requested()[源代码]
property connection[源代码]
提供 Kombu ConsumerMixin 所需的连接参数。

Connection 对象是连接参数的声明,采用懒加载方式进行评估。 此时,它并不表示与代理的已建立连接。

handle_message(provider, body, message)[源代码]
get_consumers(consumer_cls, channel)[源代码]

Kombu 回调,用于设置消费者。

在与代理的任何(重新)连接之后调用。

on_iteration()[源代码]

Kombu 回调,在每次 drain_events 循环迭代时调用。

on_connection_error(exc, interval)[源代码]
on_consume_ready(connection, channel, consumers, **kwargs)[源代码]

Kombu 回调,当消费者准备好接受消息时调用。

在与代理的任何(重新)连接之后调用。

class nameko.messaging.Consumer(queue, requeue_on_error=False, **kwargs)[源代码]

Bases: nameko.extensions.Entrypoint, HeaderDecoder

入口点封装类

装饰器将方法标记为消息消费者。

来自队列的消息将根据其内容类型进行反序列化,并传递给被装饰的方法。 当消费者方法正常返回且没有引发任何异常时,消息将自动确认。 如果在消费过程中引发任何异常,并且 requeue_on_errorTrue,消息将被重新入队。

如果 requeue_on_error 为真,当处理事件时发生错误时,处理程序将返回事件到队列。默认值为假。

示例:

@consume(...)
def handle_message(self, body):

    if not self.spam(body):
        raise Exception('消息将被重新入队')

    self.shrub(body)
参数:

queue: 要消费的队列。

queue_consumer[源代码]
queue[源代码]
requeue_on_error[源代码]
setup()[源代码]

在容器启动之前调用了绑定的扩展。

扩展应在此处进行任何必要的初始化。

stop()[源代码]

在服务容器开始关闭时调用。

扩展应在此处执行任何优雅的关闭操作。

handle_message(body, message)[源代码]
handle_result(message, worker_ctx, result=None, exc_info=None)[源代码]
handle_message_processed(message, result=None, exc_info=None)[源代码]
nameko.messaging.consume[源代码]
exception nameko.messaging.QueueConsumerStopped[源代码]

Bases: Exception

Common base class for all non-exit exceptions.

Initialize self. See help(type(self)) for accurate signature.