路由

警告

这是从 官方教程 移植的测试版本。如果你发现了错误,请为我创建 issuepull request

备注

使用 aio-pika 异步 Python 客户端。

备注

前提条件

本教程假设 RabbitMQ 已安装 并在本地以标准端口(5672)运行。 如果你使用的是不同的主机、端口或凭据,则需要调整连接设置。

寻求帮助的途径

如果在完成本教程时遇到困难,可以通过邮件列表 联系我们

上一教程 中,我们构建了一个简单的日志系统,能够将日志消息广播给多个接收者。

在本教程中,我们将为其添加一个功能——我们将使订阅只接收部分消息成为可能。例如,我们可以将只有关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

绑定

在之前的示例中,我们已经创建了绑定。你可能还记得类似的代码:

async def main():
    ...

    # 将队列绑定到交换机
    await queue.bind(logs_exchange)

...

绑定是交换机和队列之间的关系。可以简单理解为:队列对来自该交换机的消息感兴趣。

绑定可以接受一个额外的 routing_key 参数。为了避免与 basic_publish 参数混淆,我们将其称为 绑定键。以下是如何使用键创建绑定的示例:

async def main():
    ...

    # 将队列绑定到交换机
    await queue.bind(logs_exchange,
                     routing_key="black")

...

绑定键 的含义取决于交换机的类型。我们之前使用的 扇出型 交换机会简单地忽略这个值。

直连交换机

我们在前一教程中的日志系统将所有消息广播给所有消费者。现在我们想扩展功能,允许根据消息的严重性进行过滤。例如,我们可能希望将日志消息写入磁盘的脚本只接收关键错误,而不浪费磁盘空间存储警告或信息日志消息。

我们之前使用的是扇出型交换机,它的灵活性有限——只能进行无脑的广播。

我们将改用直连交换机。直连交换机背后的路由算法很简单——消息会被路由到绑定键与消息的路由键完全匹配的队列。

举个例子,看看以下设置:

../_images/direct-exchange.svg

在这个设置中,我们看到直连交换机 X 与两个队列绑定。第一个队列的绑定键是 orange,第二个队列有两个绑定,一个绑定键为 black,另一个为 green

在这样的设置中,带有路由键 orange 的消息将被路由到队列 Q1。带有路由键 blackgreen 的消息将被路由到 Q2。所有其他消息将被丢弃。

多重绑定

../_images/direct-exchange-multiple.svg

将多个队列与相同的绑定键绑定是完全合法的。在我们的示例中,我们可以添加一个绑定,将交换机 X 与队列 Q1 通过绑定键 black 绑定。在这种情况下,直连交换机将像扇出交换机一样工作,并将消息广播到所有匹配的队列。带有路由键 black 的消息将会同时被发送到 Q1Q2

发送日志

我们将使用这种模型来构建日志系统。与使用 fanout 不同,我们会将消息发送到 direct 交换机。我们将日志的严重性作为 routing key 提供,这样接收脚本就可以选择它想要接收的严重性。首先让我们专注于发送日志。

像往常一样,我们首先需要创建一个交换机:

from aio_pika import ExchangeType

async def main():
    ...

    direct_logs_exchange = await channel.declare_exchange(
        'logs', ExchangeType.DIRECT
    )

现在我们可以发送消息了:

async def main():
    ...

    await direct_logs_exchange.publish(
        Message(message_body),
        routing_key=severity,
    )

为简化起见,我们假设 'severity' 可以是 'info''warning''error' 之一。

订阅

接收消息的方式与之前的教程相同,唯一的例外是——我们将为每个感兴趣的严重性创建一个新的绑定。

async def main():
    ...

    # 声明队列
    queue = await channel.declare_queue(exclusive=True)

    # 将队列绑定到交换机
    await queue.bind(direct_logs_exchange,
                     routing_key=severity)

...

综合起来

../_images/python-four.svg

简化后的代码 receive_logs_direct_simple.py:

import asyncio
import sys

from aio_pika import ExchangeType, connect
from aio_pika.abc import AbstractIncomingMessage


async def main() -> None:
    # Perform connection
    connection = await connect("amqp://guest:guest@localhost/")

    async with connection:
        # Creating a channel
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=1)

        severities = sys.argv[1:]

        if not severities:
            sys.stderr.write(
                f"Usage: {sys.argv[0]} [info] [warning] [error]\n",
            )
            sys.exit(1)

        # Declare an exchange
        direct_logs_exchange = await channel.declare_exchange(
            "logs", ExchangeType.DIRECT,
        )

        # Declaring random queue
        queue = await channel.declare_queue(durable=True)

        for severity in severities:
            await queue.bind(direct_logs_exchange, routing_key=severity)

        async with queue.iterator() as iterator:
            message: AbstractIncomingMessage
            async for message in iterator:
                async with message.process():
                    print(f" [x] {message.routing_key!r}:{message.body!r}")

        print(" [*] Waiting for messages. To exit press CTRL+C")
        await asyncio.Future()


if __name__ == "__main__":
    asyncio.run(main())

代码 emit_log_direct.py:

import asyncio
import sys

from aio_pika import DeliveryMode, ExchangeType, Message, connect


async def main() -> None:
    # Perform connection
    connection = await connect("amqp://guest:guest@localhost/")

    async with connection:
        # Creating a channel
        channel = await connection.channel()

        logs_exchange = await channel.declare_exchange(
            "logs", ExchangeType.DIRECT,
        )

        message_body = b" ".join(
            arg.encode() for arg in sys.argv[2:]
        ) or b"Hello World!"

        message = Message(
            message_body,
            delivery_mode=DeliveryMode.PERSISTENT,
        )

        # Sending the message
        routing_key = sys.argv[1] if len(sys.argv) > 2 else "info"
        await logs_exchange.publish(message, routing_key=routing_key)

        print(f" [x] Sent {message.body!r}")


if __name__ == "__main__":
    asyncio.run(main())

备注

基于回调的代码 receive_logs_direct.py:

import asyncio
import sys

from aio_pika import ExchangeType, connect
from aio_pika.abc import AbstractIncomingMessage


async def on_message(message: AbstractIncomingMessage) -> None:
    async with message.process():
        print(" [x] %r:%r" % (message.routing_key, message.body))


async def main() -> None:
    # Perform connection
    connection = await connect("amqp://guest:guest@localhost/")

    async with connection:
        # Creating a channel
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=1)

        severities = sys.argv[1:]

        if not severities:
            sys.stderr.write(
                "Usage: %s [info] [warning] [error]\n" % sys.argv[0],
            )
            sys.exit(1)

        # Declare an exchange
        direct_logs_exchange = await channel.declare_exchange(
            "logs", ExchangeType.DIRECT,
        )

        # Declaring random queue
        queue = await channel.declare_queue(durable=True)

        for severity in severities:
            await queue.bind(direct_logs_exchange, routing_key=severity)

        # Start listening the random queue
        await queue.consume(on_message)

        print(" [*] Waiting for messages. To exit press CTRL+C")
        await asyncio.Future()


if __name__ == "__main__":
    asyncio.run(main())

如果您只想将 'warning''error'*(而不是 *'info')日志消息保存到文件中,只需打开控制台并输入:

$ python receive_logs_direct_simple.py warning error > logs_from_rabbit.log

如果您希望在屏幕上查看所有日志消息,请打开一个新终端并执行:

$ python receive_logs_direct.py info warning error
 [*] Waiting for logs. To exit press CTRL+C

例如,要发送错误日志消息,只需输入:

$ python emit_log_direct.py error "Run. Run. Or it will explode."
[x] Sent 'error':'Run. Run. Or it will explode.'

继续阅读 教程 5,了解如何根据模式监听消息。

备注

该材料改编自 rabbitmq.org 上的 官方教程