nameko.events 源代码
"""
提供了对核心消息模块的高级接口。
事件是特殊的消息,可以由一个服务发出,并由其他监听服务处理。
事件由一个标识符和一些数据组成,并使用从 :class:`EventDispatcher` 实例获得的注入进行调度。
事件是异步调度的。仅保证事件已被调度,并不保证它被监听器接收或处理。
要监听事件,服务必须使用 :func:`handle_event` 入口点声明一个处理程序,提供目标服务和事件类型过滤器。
示例::
# 服务 A
def edit_foo(self, id):
# ...
self.dispatch('foo_updated', {'id': id})
# 服务 B
@handle_event('service_a', 'foo_updated')
def bar(event_data):
pass
"""
from __future__ import absolute_import
import uuid
from logging import getLogger
from kombu import Queue
from nameko.messaging import Consumer, Publisher
from nameko.standalone.events import get_event_exchange
[文档]
SERVICE_POOL = "service_pool"
[文档]
SINGLETON = "singleton"
[文档]
BROADCAST = "broadcast"
[文档]
_log = getLogger(__name__)
[文档]
class EventHandlerConfigurationError(Exception):
"""当事件处理程序配置错误时引发的异常。"""
[文档]
class EventDispatcher(Publisher):
"""通过依赖注入提供事件调度方法。
发出的事件将通过服务的事件交换进行调度,
该交换会自动声明为主题交换。
交换的名称将是 `{service-name}.events`。
通过调度器发出的事件将被序列化并发布到事件交换。
事件的类型属性用作路由键,可用于在监听器端进行过滤。
调度器将在事件消息发布后立即返回。
不保证任何服务会接收事件,仅保证事件已成功调度。
示例::
class Spammer(object):
dispatch_spam = EventDispatcher()
def emit_spam(self):
evt_data = '火腿和鸡蛋'
self.dispatch_spam('spam.ham', evt_data)
"""
[文档]
def setup(self):
self.exchange = get_event_exchange(
self.container.service_name, self.container.config
)
self.declare.append(self.exchange)
super(EventDispatcher, self).setup()
[文档]
def get_dependency(self, worker_ctx):
"""在服务实例上注入一个调度方法"""
extra_headers = self.get_message_headers(worker_ctx)
def dispatch(event_type, event_data):
self.publisher.publish(
event_data,
exchange=self.exchange,
routing_key=event_type,
extra_headers=extra_headers,
)
return dispatch
[文档]
class EventHandler(Consumer):
def __init__(
self,
source_service,
event_type,
handler_type=SERVICE_POOL,
reliable_delivery=True,
requeue_on_error=False,
**kwargs,
):
"""
将方法装饰为处理来自名为 ``source_service`` 的服务的 ``event_type`` 事件的处理程序。
:Parameters:
source_service : str
发出事件的服务名称
event_type : str
要处理的事件类型
handler_type : str
决定处理程序在集群中的行为:
- ``events.SERVICE_POOL``:
事件处理程序按服务类型和方法进行池化,
每个池中的一个服务实例接收事件。 ::
.-[队列]- (服务 X 处理方法-1)
/
交换 o --[队列]- (服务 X 处理方法-2)
\\
\\ (服务 Y(实例 1)处理方法)
\\ /
[队列]
\\
(服务 Y(实例 2)处理方法)
- ``events.SINGLETON``:
事件仅由一个注册的处理程序接收,
不管服务类型如何。如果在错误时重新排队,它们可能
会被不同的服务实例处理。 ::
(服务 X 处理方法)
/
交换 o -- [队列]
\\
(服务 Y 处理方法)
- ``events.BROADCAST``:
事件将被每个处理程序接收。事件广播到每个服务实例,而不仅仅是每个服务
类型。实例通过 :attr:`EventHandler.broadcast_identifier` 进行区分。 ::
[队列]- (服务 X(实例 1)处理方法)
/
交换 o - [队列]- (服务 X(实例 2)处理方法)
\\
[队列]- (服务 Y 处理方法)
requeue_on_error : bool # TODO: 由 Consumer 定义..
如果为真,则如果处理事件时发生错误,处理程序将事件返回到队列。
默认值为 False。
reliable_delivery : bool
如果为真,事件将在队列中保持,直到有处理程序来消费它们。默认值为 True。
"""
[文档]
self.source_service = source_service
[文档]
self.event_type = event_type
[文档]
self.handler_type = handler_type
[文档]
self.reliable_delivery = reliable_delivery
super(EventHandler, self).__init__(
queue=None, requeue_on_error=requeue_on_error, **kwargs
)
@property
[文档]
def broadcast_identifier(self):
"""
用于 `BROADCAST` 类型处理程序的唯一字符串,以识别服务实例。
当使用 `BROADCAST` 处理程序类型时,`broadcast_identifier` 将附加到队列名称。
它必须唯一地识别接收广播的服务实例。
默认的 `broadcast_identifier` 是在服务启动时设置的 uuid。
当服务重新启动时,它会改变,这意味着任何未消费的消息
将不会被发送到 '旧' 服务实例的 '新' 实例接收。 ::
@property
def broadcast_identifier(self):
# 使用 uuid 作为标识符。
# 当服务重新启动时,标识符将会改变,任何未消费的消息将会丢失
return uuid.uuid4().hex
因此,默认行为与可靠交付不兼容。
一个能够在服务重启时存活的替代 `broadcast_identifier` 是 ::
@property
def broadcast_identifier(self):
# 使用机器的主机名作为标识符。
# 这假设在任何给定机器上仅运行一个服务实例
return socket.gethostname()
如果这两种方法都不合适,可以从配置文件中读取该值 ::
@property
def broadcast_identifier(self):
return self.config['SERVICE_IDENTIFIER'] # 或类似的
广播队列是独占的,以确保 `broadcast_identifier` 值是唯一的。
由于此方法是描述符,它将在容器创建期间被调用,
与配置的 `handler_type` 无关。
有关更多详细信息,请参见 :class:`nameko.extensions.Extension`。
"""
if self.handler_type is not BROADCAST:
return None
if self.reliable_delivery:
raise EventHandlerConfigurationError(
"您正在使用默认的广播标识符,"
"这与可靠交付不兼容。请参阅 "
":meth:`nameko.events.EventHandler.broadcast_identifier` "
"以获取详细信息。"
)
return uuid.uuid4().hex
[文档]
def setup(self):
_log.debug("启动 %s", self)
# handler_type 决定队列名称和独占标志
exclusive = False
service_name = self.container.service_name
if self.handler_type is SERVICE_POOL:
queue_name = "evt-{}-{}--{}.{}".format(
self.source_service, self.event_type, service_name, self.method_name
)
elif self.handler_type is SINGLETON:
queue_name = "evt-{}-{}".format(self.source_service, self.event_type)
elif self.handler_type is BROADCAST:
broadcast_identifier = self.broadcast_identifier
queue_name = "evt-{}-{}--{}.{}-{}".format(
self.source_service,
self.event_type,
service_name,
self.method_name,
broadcast_identifier,
)
exchange = get_event_exchange(self.source_service, self.container.config)
# 对于没有可靠交付的处理程序,队列应标记为自动删除,以便在消费者断开连接时被移除
auto_delete = self.reliable_delivery is False
# 对于广播处理程序,队列是独占的(这意味着只有一个消费者可以连接)
# 除非启用了可靠交付,因为独占队列在消费者断开连接时
# 始终被移除,而不管 auto_delete 的值如何
exclusive = self.handler_type is BROADCAST
if self.reliable_delivery:
exclusive = False
self.queue = Queue(
queue_name,
exchange=exchange,
routing_key=self.event_type,
durable=True,
auto_delete=auto_delete,
exclusive=exclusive,
)
super(EventHandler, self).setup()
[文档]
event_handler = EventHandler.decorator