nameko.events

提供了对核心消息模块的高级接口。

事件是特殊的消息,可以由一个服务发出,并由其他监听服务处理。

事件由一个标识符和一些数据组成,并使用从 EventDispatcher 实例获得的注入进行调度。

事件是异步调度的。仅保证事件已被调度,并不保证它被监听器接收或处理。

要监听事件,服务必须使用 handle_event() 入口点声明一个处理程序,提供目标服务和事件类型过滤器。

示例:

# 服务 A
def edit_foo(self, id):
    # ...
    self.dispatch('foo_updated', {'id': id})

# 服务 B

@handle_event('service_a', 'foo_updated')
def bar(event_data):
    pass

Attributes

Exceptions

EventHandlerConfigurationError

当事件处理程序配置错误时引发的异常。

Classes

EventDispatcher

通过依赖注入提供事件调度方法。

EventHandler

入口点封装类

Module Contents

nameko.events.SERVICE_POOL = 'service_pool'[源代码]
nameko.events.SINGLETON = 'singleton'[源代码]
nameko.events.BROADCAST = 'broadcast'[源代码]
nameko.events._log[源代码]
exception nameko.events.EventHandlerConfigurationError[源代码]

Bases: Exception

当事件处理程序配置错误时引发的异常。

Initialize self. See help(type(self)) for accurate signature.

class nameko.events.EventDispatcher(exchange=None, queue=None, declare=None, **options)[源代码]

Bases: nameko.messaging.Publisher

通过依赖注入提供事件调度方法。

发出的事件将通过服务的事件交换进行调度, 该交换会自动声明为主题交换。 交换的名称将是 {service-name}.events

通过调度器发出的事件将被序列化并发布到事件交换。 事件的类型属性用作路由键,可用于在监听器端进行过滤。

调度器将在事件消息发布后立即返回。 不保证任何服务会接收事件,仅保证事件已成功调度。

示例:

class Spammer(object):
    dispatch_spam = EventDispatcher()

    def emit_spam(self):
        evt_data = '火腿和鸡蛋'
        self.dispatch_spam('spam.ham', evt_data)

提供通过依赖注入的 AMQP 消息发布方法。

在 AMQP 中,消息被发布到 交换机,并路由到绑定的 队列。该依赖接受要发布的 exchange,并确保在发布之前已声明。

可选地,您可以使用 declare 关键字参数传递其他 kombu.Exchangekombu.Queue 对象,以便在发布之前进行声明。

Parameters:
exchangekombu.Exchange

目标交换机

queuekombu.Queue

已弃用: 绑定队列。事件将发布到该队列的交换机。

declarelist

要在发布之前声明的 kombu.Exchangekombu.Queue 对象的列表。

如果未提供 exchange,则消息将发布到默认交换机。

示例:

class Foobar(object):

    publish = Publisher(exchange=...)

    def spam(self, data):
        self.publish('spam:' + data)
setup()[源代码]

在容器启动之前调用了绑定的扩展。

扩展应在此处进行任何必要的初始化。

get_dependency(worker_ctx)[源代码]

在服务实例上注入一个调度方法

class nameko.events.EventHandler(source_service, event_type, handler_type=SERVICE_POOL, reliable_delivery=True, requeue_on_error=False, **kwargs)[源代码]

Bases: nameko.messaging.Consumer

入口点封装类

将方法装饰为处理来自名为 source_service 的服务的 event_type 事件的处理程序。

Parameters:
source_servicestr

发出事件的服务名称

event_typestr

要处理的事件类型

handler_typestr

决定处理程序在集群中的行为:

  • events.SERVICE_POOL:

    事件处理程序按服务类型和方法进行池化, 每个池中的一个服务实例接收事件。

               .-[队列]- (服务 X 处理方法-1)
              /
    交换 o --[队列]- (服务 X 处理方法-2)
              \
               \          (服务 Y(实例 1)处理方法)
                \       /
                 [队列]
                        \
                          (服务 Y(实例 2)处理方法)
    
  • events.SINGLETON:

    事件仅由一个注册的处理程序接收, 不管服务类型如何。如果在错误时重新排队,它们可能 会被不同的服务实例处理。

                           (服务 X 处理方法)
                         /
    交换 o -- [队列]
                         \
                           (服务 Y 处理方法)
    
  • events.BROADCAST:

    事件将被每个处理程序接收。事件广播到每个服务实例,而不仅仅是每个服务 类型。实例通过 EventHandler.broadcast_identifier 进行区分。

                [队列]- (服务 X(实例 1)处理方法)
              /
    交换 o - [队列]- (服务 X(实例 2)处理方法)
              \
                [队列]- (服务 Y 处理方法)
    
requeue_on_errorbool # TODO: 由 Consumer 定义..

如果为真,则如果处理事件时发生错误,处理程序将事件返回到队列。 默认值为 False。

reliable_deliverybool

如果为真,事件将在队列中保持,直到有处理程序来消费它们。默认值为 True。

source_service[源代码]
event_type[源代码]
handler_type[源代码]
reliable_delivery[源代码]
property broadcast_identifier[源代码]
用于 `BROADCAST` 类型处理程序的唯一字符串,以识别服务实例。

当使用 BROADCAST 处理程序类型时,broadcast_identifier 将附加到队列名称。 它必须唯一地识别接收广播的服务实例。

默认的 broadcast_identifier 是在服务启动时设置的 uuid。 当服务重新启动时,它会改变,这意味着任何未消费的消息 将不会被发送到 '旧' 服务实例的 '新' 实例接收。

@property
def broadcast_identifier(self):
    # 使用 uuid 作为标识符。
    # 当服务重新启动时,标识符将会改变,任何未消费的消息将会丢失
    return uuid.uuid4().hex

因此,默认行为与可靠交付不兼容。

一个能够在服务重启时存活的替代 broadcast_identifier

@property
def broadcast_identifier(self):
    # 使用机器的主机名作为标识符。
    # 这假设在任何给定机器上仅运行一个服务实例
    return socket.gethostname()

如果这两种方法都不合适,可以从配置文件中读取该值

@property
def broadcast_identifier(self):
    return self.config['SERVICE_IDENTIFIER']  # 或类似的

广播队列是独占的,以确保 broadcast_identifier 值是唯一的。

由于此方法是描述符,它将在容器创建期间被调用, 与配置的 handler_type 无关。 有关更多详细信息,请参见 nameko.extensions.Extension

setup()[源代码]

在容器启动之前调用了绑定的扩展。

扩展应在此处进行任何必要的初始化。

nameko.events.event_handler[源代码]