欢迎来到 aio-pika 的文档

Coveralls Github Actions Latest Version https://img.shields.io/pypi/wheel/aio-pika.svg https://img.shields.io/pypi/pyversions/aio-pika.svg https://img.shields.io/pypi/l/aio-pika.svg

aio-pika 是一个基于 aiormqasyncio 的更人性化的包.

功能

  • 完全异步的 API。

  • 面向对象的 API。

  • 使用 connect_robust 实现透明的自动重连,完全状态恢复(例如,声明的队列或交换机、消费状态和绑定)。

  • 兼容 Python 3.6 及以上版本。

  • 对于 Python 3.5 用户,可以使用 aio-pika<7

  • 透明的 publisher confirms (发布确认)支持。

  • 支持 Transactions (事物)。

  • 完全的类型提示覆盖。

AMQP URL 参数

URL 是配置连接的支持方式。为了自定义连接行为,您可以像传递查询字符串那样传递参数。

本文描述了这些参数的说明。

aiormq 特定参数

  • name (str URL 编码) - 一个字符串,在 RabbitMQ 管理控制台和服务器日志中可见,便于诊断。

  • cafile (str) - 证书授权文件的路径。

  • capath (str) - 证书授权目录的路径。

  • cadata (str URL 编码) - URL 编码的 CA 证书内容。

  • keyfile (str) - 客户端 SSL 私钥文件的路径。

  • certfile (str) - 客户端 SSL 证书文件的路径。

  • no_verify_ssl - 不验证服务器 SSL 证书。默认值为 0,表示 False,其他值表示 True

  • heartbeat (int 类似) - AMQP 心跳包之间的间隔(以秒为单位)。0 表示禁用此功能。

aio_pika.connect 函数和 aio_pika.Connection 类特定参数

  • interleave (int 类似) - 控制当主机名解析为多个 IP 地址时的地址重排序。如果为 0 或未指定,则不进行重排序,地址按 getaddrinfo() 返回的顺序尝试。如果指定了正整数,则按地址族交错这些地址,该整数被解释为 RFC 8305 中定义的“首地址族计数”。如果未指定 happy_eyeballs_delay,默认值为 0;如果指定,则为 1

    备注

    对于使用一个 DNS 名称的 RabbitMQ 集群,具有多个 A/AAAA 记录,这个选项非常有用。

    警告

    此选项由 asyncio.DefaultEventLoopPolicy 支持,并在 Python 3.8 及以后版本可用。

  • happy_eyeballs_delay (float 类似) - 如果给定,则为此连接启用 Happy Eyeballs。它应为一个浮点数,表示在开始下一个并行连接尝试之前,等待当前连接尝试完成的时间(以秒为单位)。这被称为 RFC 8305 中定义的“连接尝试延迟”。RFC 推荐的合理默认值是 0.25 (250 毫秒)。

    备注

    对于使用一个 DNS 名称的 RabbitMQ 集群,具有多个 A/AAAA 记录,这个选项非常有用。

    警告

    此选项由 asyncio.DefaultEventLoopPolicy 支持,并在 Python 3.8 及以后版本可用。

aio_pika.connect_robust 函数和 aio_pika.RobustConnection 类特定参数

对于 aio_pika.RobustConnection 类,适用所有与 aio_pika.Connection 相关的参数,如 name/interleave/happy_eyeballs_delay,以及一些特定参数:

  • reconnect_interval (float 类似) - 重新建立连接的尝试间隔(以秒为单位),表示不超过此时间间隔进行重连尝试。

  • fail_fast (true/yes/y/enable/on/enabled/1 表示 True,否则为 False) - 在启动连接尝试时的特殊行为,如果尝试失败,则所有其他尝试将停止,并在连接阶段抛出异常。默认启用,如果你确定需要禁用此功能,请确保传递的 URL 实际可用。否则,程序将进入无休止的重连尝试,无法成功。

URL 示例

  • amqp://username:password@hostname/vhost?name=connection%20name&heartbeat=60&happy_eyeballs_delay=0.25

  • amqps://username:password@hostname/vhost?reconnect_interval=5&fail_fast=1

  • amqps://username:password@hostname/vhost?cafile=/path/to/ca.pem

  • amqps://username:password@hostname/vhost?cafile=/path/to/ca.pem&keyfile=/path/to/key.pem&certfile=/path/to/sert.pem

安装

使用pip:

pip install aio-pika

使用git:

# via pip
pip install https://github.com/mosquito/aio-pika/archive/master.zip

# manually
git clone https://github.com/mosquito/aio-pika.git
cd aio-pika
python setup.py install

开发

克隆项目:

git clone https://github.com/mosquito/aio-pika.git
cd aio-pika

创建一个属于 aio-pika 的虚拟环境:

virtualenv -p python3.5 env

安装 aio-pika 的所有依赖:

env/bin/pip install -e '.[develop]'

目录

感谢以下人员的贡献

同样参考

aiormq

aiormq 是一个纯 Python 的 AMQP 客户端库。它在 aio-pika 的底层实现中,可以在你需要与协议进行底层交互时使用。以下示例演示了用户 API 的用法。

Simple 消费者:

import asyncio
import aiormq

async def on_message(message):
    """
    on_message 并不一定要定义为异步函数。这里展示的是它可以是异步的可能性。
    """
    print(f" [x] Received message {message!r}")
    print(f"Message body is: {message.body!r}")
    print("Before sleep!")
    await asyncio.sleep(5)   # Represents async I/O operations
    print("After sleep!")

async def main():
    # 创建连接
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # 创建1个通道
    channel = await connection.channel()

    # 声明队列
    declare_ok = await channel.queue_declare('helo')
    consume_ok = await channel.basic_consume(
        declare_ok.queue, on_message, no_ack=True
    )

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_forever()

Simple 发布者:

import asyncio
from typing import Optional

import aiormq
from aiormq.abc import DeliveredMessage

MESSAGE: Optional[DeliveredMessage] = None

async def main():
    global MESSAGE
    body = b'Hello World!'

    # 创建连接
    connection = await aiormq.connect("amqp://guest:guest@localhost//")

    # 创建1个通道
    channel = await connection.channel()
    declare_ok = await channel.queue_declare("hello", auto_delete=True)

    # 发送消息
    await channel.basic_publish(body, routing_key='hello')
    print(f" [x] Sent {body}")

    MESSAGE = await channel.basic_get(declare_ok.queue)
    print(f" [x] Received message from {declare_ok.queue!r}")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

assert MESSAGE is not None
assert MESSAGE.routing_key == "hello"
assert MESSAGE.body == b'Hello World!'

patiopatio-rabbitmq

PATIO 是 “Python Asynchronous Tasks for AsyncIO(基于异步IO的python异步任务)” 的缩写——一个易于扩展的库,用于分布式任务执行,类似于 Celery,但主要设计为支持 asyncio。

patio-rabbitmq 让你能够使用 基于 RabbitMQ 的 RPC 服务,且实现非常简单:

from patio import Registry, ThreadPoolExecutor
from patio_rabbitmq import RabbitMQBroker

rpc = Registry(project="patio-rabbitmq", auto_naming=False)

@rpc("sum")
def sum(*args):
    return sum(args)

async def main():
    async with ThreadPoolExecutor(rpc, max_workers=16) as executor:
        async with RabbitMQBroker(
            executor, amqp_url="amqp://guest:guest@localhost/",
        ) as broker:
            await broker.join()

调用方可以像这样编写:

import asyncio
from patio import NullExecutor, Registry
from patio_rabbitmq import RabbitMQBroker

async def main():
    async with NullExecutor(Registry(project="patio-rabbitmq")) as executor:
        async with RabbitMQBroker(
            executor, amqp_url="amqp://guest:guest@localhost/",
        ) as broker:
            print(await asyncio.gather(
                *[
                    broker.call("mul", i, i, timeout=1) for i in range(10)
                 ]
            ))

FastStream

FastStream 是一个强大且易于使用的 Python 库,用于构建与事件流交互的异步服务。

如果你不需要深入了解 RabbitMQ 的细节,可以使用更高层次的 FastStream 接口:

from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

@broker.subscriber("user")
async def user_created(user_id: int):
    assert isinstance(user_id, int)
    return f"user-{user_id}: created"

@app.after_startup
async def pub_smth():
    assert (
        await broker.publish(1, "user", rpc=True)
    ) ==  "user-1: created"

此外,FastStream 通过 pydantic 验证消息,生成你的项目 AsyncAPI 规范,支持内存测试、RPC 调用等功能。

实际上,它是 aio-pika 之上的高层包装器,因此你可以同时利用这两个库的优势。

python-socketio

Socket.IO 是一种传输协议,能够实现客户端(通常是网页浏览器,但不局限于此)与服务器之间的实时双向事件驱动通信。此包提供了两种 Python 实现,分别为标准和 asyncio 版本。

此外,此包还适合通过 aio-pika 适配器构建基于 RabbitMQ 的消息服务:

import socketio
from aiohttp import web

sio = socketio.AsyncServer(client_manager=socketio.AsyncAioPikaManager())
app = web.Application()
sio.attach(app)

@sio.event
async def chat_message(sid, data):
    print("message ", data)

if __name__ == '__main__':
    web.run_app(app)

客户端可以通过以下方式调用 chat_message:

import asyncio
import socketio

sio = socketio.AsyncClient()

async def main():
    await sio.connect('http://localhost:8080')
    await sio.emit('chat_message', {'response': 'my response'})

if __name__ == '__main__':
    asyncio.run(main())

taskiqtaskiq-aio-pika

Taskiq 是一个用于 Python 的异步分布式任务队列。该项目受到大型项目如 Celery 和 Dramatiq 的启发,但 Taskiq 可以发送和运行同步与异步函数。

该库还为运行任务提供了 aio-pika 代理。

from taskiq_aio_pika import AioPikaBroker

broker = AioPikaBroker()

@broker.task
async def test() -> None:
    print("nothing")

async def main():
    await broker.startup()
    await test.kiq()

Rasa

拥有超过 2500 万次下载,Rasa Open Source 是构建聊天和语音 AI 助手的最流行的开源框架。

使用 Rasa,你可以在以下平台上构建上下文助手:

  • Facebook Messenger

  • Slack

  • Google Hangouts

  • Webex Teams

  • Microsoft Bot Framework

  • Rocket.Chat

  • Mattermost

  • Telegram

  • Twilio

你还可以创建自定义的对话渠道或语音助手,如:

  • Alexa Skills

  • Google Home Actions

Rasa 帮助你构建能够进行多层次对话的上下文助手,实现丰富的互动。为了让人类与上下文助手进行有意义的交流,助手需要能够利用上下文,基于之前讨论的内容进行扩展——Rasa 使你能够以可扩展的方式构建能够实现这一目标的助手。

它还使用 aio-pika 来与 RabbitMQ 深度交互!

版本控制

本软件遵循 语义化版本控制