nameko.events ============= .. py:module:: nameko.events .. autoapi-nested-parse:: 提供了对核心消息模块的高级接口。 事件是特殊的消息,可以由一个服务发出,并由其他监听服务处理。 事件由一个标识符和一些数据组成,并使用从 :class:`EventDispatcher` 实例获得的注入进行调度。 事件是异步调度的。仅保证事件已被调度,并不保证它被监听器接收或处理。 要监听事件,服务必须使用 :func:`handle_event` 入口点声明一个处理程序,提供目标服务和事件类型过滤器。 示例:: # 服务 A def edit_foo(self, id): # ... self.dispatch('foo_updated', {'id': id}) # 服务 B @handle_event('service_a', 'foo_updated') def bar(event_data): pass Attributes ---------- .. autoapisummary:: nameko.events.SERVICE_POOL nameko.events.SINGLETON nameko.events.BROADCAST nameko.events._log nameko.events.event_handler Exceptions ---------- .. autoapisummary:: nameko.events.EventHandlerConfigurationError Classes ------- .. autoapisummary:: nameko.events.EventDispatcher nameko.events.EventHandler Module Contents --------------- .. py:data:: SERVICE_POOL :value: 'service_pool' .. py:data:: SINGLETON :value: 'singleton' .. py:data:: BROADCAST :value: 'broadcast' .. py:data:: _log .. py:exception:: EventHandlerConfigurationError Bases: :py:obj:`Exception` 当事件处理程序配置错误时引发的异常。 Initialize self. See help(type(self)) for accurate signature. .. py:class:: EventDispatcher(exchange=None, queue=None, declare=None, **options) Bases: :py:obj:`nameko.messaging.Publisher` 通过依赖注入提供事件调度方法。 发出的事件将通过服务的事件交换进行调度, 该交换会自动声明为主题交换。 交换的名称将是 `{service-name}.events`。 通过调度器发出的事件将被序列化并发布到事件交换。 事件的类型属性用作路由键,可用于在监听器端进行过滤。 调度器将在事件消息发布后立即返回。 不保证任何服务会接收事件,仅保证事件已成功调度。 示例:: class Spammer(object): dispatch_spam = EventDispatcher() def emit_spam(self): evt_data = '火腿和鸡蛋' self.dispatch_spam('spam.ham', evt_data) 提供通过依赖注入的 AMQP 消息发布方法。 在 AMQP 中,消息被发布到 *交换机*,并路由到绑定的 *队列*。该依赖接受要发布的 `exchange`,并确保在发布之前已声明。 可选地,您可以使用 `declare` 关键字参数传递其他 :class:`kombu.Exchange` 或 :class:`kombu.Queue` 对象,以便在发布之前进行声明。 :Parameters: exchange : :class:`kombu.Exchange` 目标交换机 queue : :class:`kombu.Queue` **已弃用**: 绑定队列。事件将发布到该队列的交换机。 declare : list 要在发布之前声明的 :class:`kombu.Exchange` 或 :class:`kombu.Queue` 对象的列表。 如果未提供 `exchange`,则消息将发布到默认交换机。 示例:: class Foobar(object): publish = Publisher(exchange=...) def spam(self, data): self.publish('spam:' + data) .. py:method:: setup() 在容器启动之前调用了绑定的扩展。 扩展应在此处进行任何必要的初始化。 .. py:method:: get_dependency(worker_ctx) 在服务实例上注入一个调度方法 .. py:class:: EventHandler(source_service, event_type, handler_type=SERVICE_POOL, reliable_delivery=True, requeue_on_error=False, **kwargs) Bases: :py:obj:`nameko.messaging.Consumer` 入口点封装类 将方法装饰为处理来自名为 ``source_service`` 的服务的 ``event_type`` 事件的处理程序。 :Parameters: source_service : str 发出事件的服务名称 event_type : str 要处理的事件类型 handler_type : str 决定处理程序在集群中的行为: - ``events.SERVICE_POOL``: 事件处理程序按服务类型和方法进行池化, 每个池中的一个服务实例接收事件。 :: .-[队列]- (服务 X 处理方法-1) / 交换 o --[队列]- (服务 X 处理方法-2) \ \ (服务 Y(实例 1)处理方法) \ / [队列] \ (服务 Y(实例 2)处理方法) - ``events.SINGLETON``: 事件仅由一个注册的处理程序接收, 不管服务类型如何。如果在错误时重新排队,它们可能 会被不同的服务实例处理。 :: (服务 X 处理方法) / 交换 o -- [队列] \ (服务 Y 处理方法) - ``events.BROADCAST``: 事件将被每个处理程序接收。事件广播到每个服务实例,而不仅仅是每个服务 类型。实例通过 :attr:`EventHandler.broadcast_identifier` 进行区分。 :: [队列]- (服务 X(实例 1)处理方法) / 交换 o - [队列]- (服务 X(实例 2)处理方法) \ [队列]- (服务 Y 处理方法) requeue_on_error : bool # TODO: 由 Consumer 定义.. 如果为真,则如果处理事件时发生错误,处理程序将事件返回到队列。 默认值为 False。 reliable_delivery : bool 如果为真,事件将在队列中保持,直到有处理程序来消费它们。默认值为 True。 .. py:attribute:: source_service .. py:attribute:: event_type .. py:attribute:: handler_type .. py:attribute:: reliable_delivery .. py:property:: broadcast_identifier 用于 `BROADCAST` 类型处理程序的唯一字符串,以识别服务实例。 当使用 `BROADCAST` 处理程序类型时,`broadcast_identifier` 将附加到队列名称。 它必须唯一地识别接收广播的服务实例。 默认的 `broadcast_identifier` 是在服务启动时设置的 uuid。 当服务重新启动时,它会改变,这意味着任何未消费的消息 将不会被发送到 '旧' 服务实例的 '新' 实例接收。 :: @property def broadcast_identifier(self): # 使用 uuid 作为标识符。 # 当服务重新启动时,标识符将会改变,任何未消费的消息将会丢失 return uuid.uuid4().hex 因此,默认行为与可靠交付不兼容。 一个能够在服务重启时存活的替代 `broadcast_identifier` 是 :: @property def broadcast_identifier(self): # 使用机器的主机名作为标识符。 # 这假设在任何给定机器上仅运行一个服务实例 return socket.gethostname() 如果这两种方法都不合适,可以从配置文件中读取该值 :: @property def broadcast_identifier(self): return self.config['SERVICE_IDENTIFIER'] # 或类似的 广播队列是独占的,以确保 `broadcast_identifier` 值是唯一的。 由于此方法是描述符,它将在容器创建期间被调用, 与配置的 `handler_type` 无关。 有关更多详细信息,请参见 :class:`nameko.extensions.Extension`。 .. py:method:: setup() 在容器启动之前调用了绑定的扩展。 扩展应在此处进行任何必要的初始化。 .. py:data:: event_handler