Topics¶
警告
这是从 官方教程 移植的测试版本。如果你发现了错误,请为我创建 issue 或 pull request。
备注
使用 aio-pika 异步 Python 客户端。
备注
前提条件
本教程假设 RabbitMQ 已安装 并在本地以标准端口(5672)运行。 如果你使用的是不同的主机、端口或凭据,则需要调整连接设置。
寻求帮助的途径
如果在完成本教程时遇到困难,可以通过邮件列表 联系我们 。
在 上一教程 中,我们改进了我们的日志系统。我们不再使用仅能进行简单广播的 fanout 交换,而是使用了 direct 交换,从而获得了选择性接收日志的能力。
尽管使用 direct 交换改善了我们的系统,但它仍然存在一些限制——它无法基于多个标准进行路由。
在我们的日志系统中,我们可能希望不仅根据严重性订阅日志,还根据发出日志的来源进行订阅。您可能从 unix 工具 syslog 中了解到这个概念,它根据严重性(info / warn / crit...)和设施(auth / cron / kern...)路由日志。
这将为我们提供很大的灵活性——我们可能只想监听来自 'cron' 的关键错误,同时也希望接收来自 'kern' 的所有日志。
要在我们的日志系统中实现这一点,我们需要学习更复杂的主题交换(topic exchange)。
主题交换(Topic Exchange)¶
发送到主题交换的消息不能使用任意的 routing_key —— 它必须是由点分隔的单词列表。这些单词可以是任何内容,但通常会指定与消息相关的一些特征。以下是一些有效的 routing key 示例: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit" 。routing key 中的单词可以有任意数量,最多限制为 255 字节。
绑定键(binding key)也必须采用相同的格式。主题交换背后的逻辑类似于直接交换——使用特定 routing key 发送的消息将被发送到所有与匹配绑定键相绑定的队列。然而,绑定键有两个重要的特殊情况:
* (星号)可以替代一个单词。
# (井号)可以替代零个或多个单词。
用一个示例来解释这一点是最简单的:
在这个例子中,我们将发送描述动物的消息。这些消息将使用由三个单词(两个点)组成的 routing key 发送。routing key 中的第一个单词将描述速度,第二个单词描述颜色,第三个单词描述物种: "<celerity>.<colour>.<species>" 。
我们创建了三个绑定: Q1 使用绑定键 "*.orange.*" 绑定, Q2 使用绑定键 "*.*.rabbit" 和 "lazy.#" 绑定。
这些绑定可以总结如下:
Q1 对所有橙色动物感兴趣。
Q2 想了解有关兔子的所有信息,以及关于懒惰动物的所有信息。
routing key 设置为 "quick.orange.rabbit" 的消息将被发送到两个队列。 消息 "lazy.orange.elephant" 也会发送到两个队列。另一方面,"quick.orange.fox" 仅会发送到第一个队列,而 "lazy.brown.fox" 仅会发送到第二个队列。"lazy.pink.rabbit" 将仅发送到第二个队列一次,即使它匹配了两个绑定。"quick.brown.fox" 不匹配任何绑定,因此会被丢弃。
如果我们违反合同,发送一个包含一个或四个单词的消息,例如 "orange" 或 "quick.orange.male.rabbit",会发生什么?这些消息将不匹配任何绑定,因此会丢失。
另一方面, "lazy.orange.male.rabbit" 尽管有四个单词,但会匹配最后一个绑定,并将被发送到第二个队列。
备注
主题交换
主题交换非常强大,可以像其他交换一样运行。
当一个队列使用 "#" (井号)绑定键绑定时——它将接收所有消息,而不管 routing key 是什么——就像在 fanout 交换中一样。
当在绑定中不使用特殊字符 "*" (星号)和 "#" (井号)时,主题交换将表现得就像直接交换一样。
综合起来¶
我们将在日志系统中使用主题交换。我们将以日志的 routing keys 有两个单词: "<facility>.<severity>" 为工作假设开始。
代码与 前一个教程 中的几乎相同。
用于 emit_log_topic.py
的代码:
import asyncio
import sys
from aio_pika import DeliveryMode, ExchangeType, Message, connect
async def main() -> None:
# Perform connection
connection = await connect(
"amqp://guest:guest@localhost/",
)
async with connection:
# Creating a channel
channel = await connection.channel()
topic_logs_exchange = await channel.declare_exchange(
"topic_logs", ExchangeType.TOPIC,
)
routing_key = sys.argv[1] if len(sys.argv) > 2 else "anonymous.info"
message_body = b" ".join(
arg.encode() for arg in sys.argv[2:]
) or b"Hello World!"
message = Message(
message_body,
delivery_mode=DeliveryMode.PERSISTENT,
)
# Sending the message
await topic_logs_exchange.publish(message, routing_key=routing_key)
print(f" [x] Sent {message!r}")
if __name__ == "__main__":
asyncio.run(main())
用于 receive_logs_topic.py
的代码:
import asyncio
import sys
from aio_pika import ExchangeType, connect
from aio_pika.abc import AbstractIncomingMessage
async def main() -> None:
# Perform connection
connection = await connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
# Declare an exchange
topic_logs_exchange = await channel.declare_exchange(
"topic_logs", ExchangeType.TOPIC,
)
# Declaring queue
queue = await channel.declare_queue(
"task_queue", durable=True,
)
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
await queue.bind(topic_logs_exchange, routing_key=binding_key)
print(" [*] Waiting for messages. To exit press CTRL+C")
# Start listening the queue with name 'task_queue'
async with queue.iterator() as iterator:
message: AbstractIncomingMessage
async for message in iterator:
async with message.process():
print(f" [x] {message.routing_key!r}:{message.body!r}")
if __name__ == "__main__":
asyncio.run(main())
要接收所有日志,请运行:
python receive_logs_topic.py "#"
要接收来自设施 "kern" 的所有日志,请运行:
python receive_logs_topic.py "kern.*"
或者如果您只想了解 "critical" 日志,请运行:
python receive_logs_topic.py "*.critical"
您可以创建多个绑定:
python receive_logs_topic.py "kern.*" "*.critical"
要发出 routing key 为 "kern.critical" 的日志,请输入:
python emit_log_topic.py "kern.critical" "A critical kernel error"
玩得开心!请注意,代码对 routing 或 binding keys 并没有任何假设,您可能想尝试更多的 routing key 参数。
接下来请参阅 教程 6 来了解 RPC。
备注
本材料摘自 rabbitmq.org 上的 官方教程 。