nameko.events¶
提供了对核心消息模块的高级接口。
事件是特殊的消息,可以由一个服务发出,并由其他监听服务处理。
事件由一个标识符和一些数据组成,并使用从 EventDispatcher
实例获得的注入进行调度。
事件是异步调度的。仅保证事件已被调度,并不保证它被监听器接收或处理。
要监听事件,服务必须使用 handle_event()
入口点声明一个处理程序,提供目标服务和事件类型过滤器。
示例:
# 服务 A
def edit_foo(self, id):
# ...
self.dispatch('foo_updated', {'id': id})
# 服务 B
@handle_event('service_a', 'foo_updated')
def bar(event_data):
pass
Attributes¶
Exceptions¶
当事件处理程序配置错误时引发的异常。 |
Classes¶
通过依赖注入提供事件调度方法。 |
|
入口点封装类 |
Module Contents¶
- exception nameko.events.EventHandlerConfigurationError[源代码]¶
Bases:
Exception
当事件处理程序配置错误时引发的异常。
Initialize self. See help(type(self)) for accurate signature.
- class nameko.events.EventDispatcher(exchange=None, queue=None, declare=None, **options)[源代码]¶
Bases:
nameko.messaging.Publisher
通过依赖注入提供事件调度方法。
发出的事件将通过服务的事件交换进行调度, 该交换会自动声明为主题交换。 交换的名称将是 {service-name}.events。
通过调度器发出的事件将被序列化并发布到事件交换。 事件的类型属性用作路由键,可用于在监听器端进行过滤。
调度器将在事件消息发布后立即返回。 不保证任何服务会接收事件,仅保证事件已成功调度。
示例:
class Spammer(object): dispatch_spam = EventDispatcher() def emit_spam(self): evt_data = '火腿和鸡蛋' self.dispatch_spam('spam.ham', evt_data)
提供通过依赖注入的 AMQP 消息发布方法。
在 AMQP 中,消息被发布到 交换机,并路由到绑定的 队列。该依赖接受要发布的 exchange,并确保在发布之前已声明。
可选地,您可以使用 declare 关键字参数传递其他
kombu.Exchange
或kombu.Queue
对象,以便在发布之前进行声明。- Parameters:
- exchange
kombu.Exchange
目标交换机
- queue
kombu.Queue
已弃用: 绑定队列。事件将发布到该队列的交换机。
- declarelist
要在发布之前声明的
kombu.Exchange
或kombu.Queue
对象的列表。
- exchange
如果未提供 exchange,则消息将发布到默认交换机。
示例:
class Foobar(object): publish = Publisher(exchange=...) def spam(self, data): self.publish('spam:' + data)
- class nameko.events.EventHandler(source_service, event_type, handler_type=SERVICE_POOL, reliable_delivery=True, requeue_on_error=False, **kwargs)[源代码]¶
Bases:
nameko.messaging.Consumer
入口点封装类
将方法装饰为处理来自名为
source_service
的服务的event_type
事件的处理程序。- Parameters:
- source_servicestr
发出事件的服务名称
- event_typestr
要处理的事件类型
- handler_typestr
决定处理程序在集群中的行为:
events.SERVICE_POOL
:事件处理程序按服务类型和方法进行池化, 每个池中的一个服务实例接收事件。
.-[队列]- (服务 X 处理方法-1) / 交换 o --[队列]- (服务 X 处理方法-2) \ \ (服务 Y(实例 1)处理方法) \ / [队列] \ (服务 Y(实例 2)处理方法)
events.SINGLETON
:事件仅由一个注册的处理程序接收, 不管服务类型如何。如果在错误时重新排队,它们可能 会被不同的服务实例处理。
(服务 X 处理方法) / 交换 o -- [队列] \ (服务 Y 处理方法)
events.BROADCAST
:事件将被每个处理程序接收。事件广播到每个服务实例,而不仅仅是每个服务 类型。实例通过
EventHandler.broadcast_identifier
进行区分。[队列]- (服务 X(实例 1)处理方法) / 交换 o - [队列]- (服务 X(实例 2)处理方法) \ [队列]- (服务 Y 处理方法)
- requeue_on_errorbool # TODO: 由 Consumer 定义..
如果为真,则如果处理事件时发生错误,处理程序将事件返回到队列。 默认值为 False。
- reliable_deliverybool
如果为真,事件将在队列中保持,直到有处理程序来消费它们。默认值为 True。
- property broadcast_identifier[源代码]¶
- 用于 `BROADCAST` 类型处理程序的唯一字符串,以识别服务实例。
当使用 BROADCAST 处理程序类型时,broadcast_identifier 将附加到队列名称。 它必须唯一地识别接收广播的服务实例。
默认的 broadcast_identifier 是在服务启动时设置的 uuid。 当服务重新启动时,它会改变,这意味着任何未消费的消息 将不会被发送到 '旧' 服务实例的 '新' 实例接收。
@property def broadcast_identifier(self): # 使用 uuid 作为标识符。 # 当服务重新启动时,标识符将会改变,任何未消费的消息将会丢失 return uuid.uuid4().hex
因此,默认行为与可靠交付不兼容。
一个能够在服务重启时存活的替代 broadcast_identifier 是
@property def broadcast_identifier(self): # 使用机器的主机名作为标识符。 # 这假设在任何给定机器上仅运行一个服务实例 return socket.gethostname()
如果这两种方法都不合适,可以从配置文件中读取该值
@property def broadcast_identifier(self): return self.config['SERVICE_IDENTIFIER'] # 或类似的
广播队列是独占的,以确保 broadcast_identifier 值是唯一的。
由于此方法是描述符,它将在容器创建期间被调用, 与配置的 handler_type 无关。 有关更多详细信息,请参见
nameko.extensions.Extension
。