nameko.standalone.events 源代码
from kombu import Exchange
from nameko import serialization
from nameko.amqp.publish import Publisher
from nameko.constants import (
AMQP_SSL_CONFIG_KEY, AMQP_URI_CONFIG_KEY, LOGIN_METHOD_CONFIG_KEY,
PERSISTENT
)
[文档]
def get_event_exchange(service_name, config):
""" 获取 ``service_name`` 事件的交换机。
"""
auto_delete = config.get("AUTO_DELETE_EVENT_EXCHANGES")
disable_exchange_declaration = config.get("DECLARE_EVENT_EXCHANGES") is False
exchange_name = "{}.events".format(service_name)
exchange = Exchange(
exchange_name,
type='topic',
durable=True,
delivery_mode=PERSISTENT,
auto_delete=auto_delete,
no_declare=disable_exchange_declaration,
)
return exchange
[文档]
def event_dispatcher(nameko_config, **kwargs):
""" 返回一个用于分发 Nameko 事件的函数。
"""
amqp_uri = nameko_config[AMQP_URI_CONFIG_KEY]
serializer, _ = serialization.setup(nameko_config)
serializer = kwargs.pop('serializer', serializer)
ssl = nameko_config.get(AMQP_SSL_CONFIG_KEY)
login_method = nameko_config.get(LOGIN_METHOD_CONFIG_KEY)
# TODO: standalone event dispatcher should accept context event_data
# and insert a call id
publisher = Publisher(
amqp_uri, serializer=serializer, ssl=ssl, login_method=login_method, **kwargs
)
def dispatch(service_name, event_type, event_data):
""" 分发一个声称来自 `service_name` 的事件,带有给定的 `event_type` 和 `event_data`。
"""
exchange = get_event_exchange(service_name, nameko_config)
publisher.publish(
event_data,
exchange=exchange,
routing_key=event_type
)
return dispatch