nameko.messaging ================ .. py:module:: nameko.messaging .. autoapi-nested-parse:: 提供核心消息装饰器和依赖项提供者。 Attributes ---------- .. autoapisummary:: nameko.messaging._log nameko.messaging.consume Exceptions ---------- .. autoapisummary:: nameko.messaging.QueueConsumerStopped Classes ------- .. autoapisummary:: nameko.messaging.HeaderEncoder nameko.messaging.HeaderDecoder nameko.messaging.Publisher nameko.messaging.QueueConsumer nameko.messaging.Consumer Module Contents --------------- .. py:data:: _log .. py:class:: HeaderEncoder Bases: :py:obj:`object` .. py:attribute:: header_prefix .. py:method:: _get_header_name(key) .. py:method:: get_message_headers(worker_ctx) .. py:class:: HeaderDecoder Bases: :py:obj:`object` .. py:attribute:: header_prefix .. py:method:: _strip_header_name(key) .. py:method:: unpack_message_headers(message) .. py:class:: Publisher(exchange=None, queue=None, declare=None, **options) Bases: :py:obj:`nameko.extensions.DependencyProvider`, :py:obj:`HeaderEncoder` 请注意,`Extension.__init__` 在 `bind` 过程中以及实例化时都会被调用,因此请避免在此方法中产生副作用。请使用 `setup`。 此外,`bind` 和 `iter_extensions` 使用反射来查找扩展可能声明的任何子扩展。扩展上的任何描述符应该预计在反射过程中被调用,这发生在 `ServiceContainer.__init__` 和 `ServiceContainer.setup` 之间。 `Extension.container` 属性提供对绑定到该扩展的 `nameko.containers.ServiceContainer` 实例的访问,否则为 `None`。 提供通过依赖注入的 AMQP 消息发布方法。 在 AMQP 中,消息被发布到 *交换机*,并路由到绑定的 *队列*。该依赖接受要发布的 `exchange`,并确保在发布之前已声明。 可选地,您可以使用 `declare` 关键字参数传递其他 :class:`kombu.Exchange` 或 :class:`kombu.Queue` 对象,以便在发布之前进行声明。 :Parameters: exchange : :class:`kombu.Exchange` 目标交换机 queue : :class:`kombu.Queue` **已弃用**: 绑定队列。事件将发布到该队列的交换机。 declare : list 要在发布之前声明的 :class:`kombu.Exchange` 或 :class:`kombu.Queue` 对象的列表。 如果未提供 `exchange`,则消息将发布到默认交换机。 示例:: class Foobar(object): publish = Publisher(exchange=...) def spam(self, data): self.publish('spam:' + data) .. py:attribute:: publisher_cls .. py:attribute:: exchange .. py:attribute:: options .. py:attribute:: declare .. py:attribute:: compat_attrs :value: ('retry', 'retry_policy', 'use_confirms') .. py:property:: amqp_uri .. py:property:: serializer 默认序列化器,用于发布消息。 必须作为 `kombu serializer `_ 注册。 .. py:method:: setup() 在容器启动之前调用了绑定的扩展。 扩展应在此处进行任何必要的初始化。 .. py:method:: get_dependency(worker_ctx) 在工作者执行之前调用。依赖提供者应返回一个对象,以便容器将其注入到工作者实例中。 .. py:class:: QueueConsumer Bases: :py:obj:`nameko.extensions.SharedExtension`, :py:obj:`nameko.extensions.ProviderCollector`, :py:obj:`kombu.mixins.ConsumerMixin` 请注意,`Extension.__init__` 在 `bind` 过程中以及实例化时都会被调用,因此请避免在此方法中产生副作用。请使用 `setup`。 此外,`bind` 和 `iter_extensions` 使用反射来查找扩展可能声明的任何子扩展。扩展上的任何描述符应该预计在反射过程中被调用,这发生在 `ServiceContainer.__init__` 和 `ServiceContainer.setup` 之间。 `Extension.container` 属性提供对绑定到该扩展的 `nameko.containers.ServiceContainer` 实例的访问,否则为 `None`。 .. py:attribute:: _consumers .. py:attribute:: _pending_remove_providers .. py:attribute:: _gt :value: None .. py:attribute:: _starting :value: False .. py:attribute:: _consumers_ready .. py:property:: amqp_uri .. py:property:: prefetch_count .. py:property:: accept .. py:method:: _handle_thread_exited(gt) .. py:method:: start() 在容器成功启动时调用绑定的扩展。 此方法仅在所有其他扩展成功返回 `Extension.setup` 后被调用。如果扩展对外部事件做出反应,它现在应该开始对此进行响应。 .. py:method:: stop() 优雅地停止队列消费者。 等待最后一个提供者注销,并等待 ConsumerMixin 的绿色线程退出(即直到所有待处理消息都已确认或重新排队,所有消费者停止)。 .. py:method:: kill() 强制终止队列消费者。 与 `stop()` 不同,任何未确认的消息或重新排队请求、移除提供者的请求等都会丢失,消费线程会尽快终止。 .. py:method:: unregister_provider(provider) .. py:method:: ack_message(message) .. py:method:: requeue_message(message) .. py:method:: _cancel_consumers_if_requested() .. py:property:: connection 提供 Kombu 的 ConsumerMixin 所需的连接参数。 `Connection` 对象是连接参数的声明,采用懒加载方式进行评估。 此时,它并不表示与代理的已建立连接。 .. py:method:: handle_message(provider, body, message) .. py:method:: get_consumers(consumer_cls, channel) Kombu 回调,用于设置消费者。 在与代理的任何(重新)连接之后调用。 .. py:method:: on_iteration() Kombu 回调,在每次 `drain_events` 循环迭代时调用。 .. py:method:: on_connection_error(exc, interval) .. py:method:: on_consume_ready(connection, channel, consumers, **kwargs) Kombu 回调,当消费者准备好接受消息时调用。 在与代理的任何(重新)连接之后调用。 .. py:class:: Consumer(queue, requeue_on_error=False, **kwargs) Bases: :py:obj:`nameko.extensions.Entrypoint`, :py:obj:`HeaderDecoder` 入口点封装类 装饰器将方法标记为消息消费者。 来自队列的消息将根据其内容类型进行反序列化,并传递给被装饰的方法。 当消费者方法正常返回且没有引发任何异常时,消息将自动确认。 如果在消费过程中引发任何异常,并且 `requeue_on_error` 为 `True`,消息将被重新入队。 如果 `requeue_on_error` 为真,当处理事件时发生错误时,处理程序将返回事件到队列。默认值为假。 示例:: @consume(...) def handle_message(self, body): if not self.spam(body): raise Exception('消息将被重新入队') self.shrub(body) 参数: queue: 要消费的队列。 .. py:attribute:: queue_consumer .. py:attribute:: queue .. py:attribute:: requeue_on_error .. py:method:: setup() 在容器启动之前调用了绑定的扩展。 扩展应在此处进行任何必要的初始化。 .. py:method:: stop() 在服务容器开始关闭时调用。 扩展应在此处执行任何优雅的关闭操作。 .. py:method:: handle_message(body, message) .. py:method:: handle_result(message, worker_ctx, result=None, exc_info=None) .. py:method:: handle_message_processed(message, result=None, exc_info=None) .. py:data:: consume .. py:exception:: QueueConsumerStopped Bases: :py:obj:`Exception` Common base class for all non-exit exceptions. Initialize self. See help(type(self)) for accurate signature.