欢迎来到 aio-pika 的文档¶
aio-pika 是一个基于 aiormq 和 asyncio 的更人性化的包.
功能¶
完全异步的 API。
面向对象的 API。
使用 connect_robust 实现透明的自动重连,完全状态恢复(例如,声明的队列或交换机、消费状态和绑定)。
兼容 Python 3.6 及以上版本。
对于 Python 3.5 用户,可以使用 aio-pika<7。
透明的 publisher confirms (发布确认)支持。
支持 Transactions (事物)。
完全的类型提示覆盖。
AMQP URL 参数¶
URL 是配置连接的支持方式。为了自定义连接行为,您可以像传递查询字符串那样传递参数。
本文描述了这些参数的说明。
aiormq
特定参数¶
name
(str
URL 编码) - 一个字符串,在 RabbitMQ 管理控制台和服务器日志中可见,便于诊断。cafile
(str
) - 证书授权文件的路径。capath
(str
) - 证书授权目录的路径。cadata
(str
URL 编码) - URL 编码的 CA 证书内容。keyfile
(str
) - 客户端 SSL 私钥文件的路径。certfile
(str
) - 客户端 SSL 证书文件的路径。no_verify_ssl
- 不验证服务器 SSL 证书。默认值为0
,表示False
,其他值表示True
。heartbeat
(int
类似) - AMQP 心跳包之间的间隔(以秒为单位)。0
表示禁用此功能。
aio_pika.connect
函数和 aio_pika.Connection
类特定参数¶
interleave
(int
类似) - 控制当主机名解析为多个 IP 地址时的地址重排序。如果为 0 或未指定,则不进行重排序,地址按getaddrinfo()
返回的顺序尝试。如果指定了正整数,则按地址族交错这些地址,该整数被解释为 RFC 8305 中定义的“首地址族计数”。如果未指定happy_eyeballs_delay
,默认值为0
;如果指定,则为1
。备注
对于使用一个 DNS 名称的 RabbitMQ 集群,具有多个
A
/AAAA
记录,这个选项非常有用。警告
此选项由
asyncio.DefaultEventLoopPolicy
支持,并在 Python 3.8 及以后版本可用。happy_eyeballs_delay
(float
类似) - 如果给定,则为此连接启用 Happy Eyeballs。它应为一个浮点数,表示在开始下一个并行连接尝试之前,等待当前连接尝试完成的时间(以秒为单位)。这被称为 RFC 8305 中定义的“连接尝试延迟”。RFC 推荐的合理默认值是0.25
(250 毫秒)。备注
对于使用一个 DNS 名称的 RabbitMQ 集群,具有多个
A
/AAAA
记录,这个选项非常有用。警告
此选项由
asyncio.DefaultEventLoopPolicy
支持,并在 Python 3.8 及以后版本可用。
aio_pika.connect_robust
函数和 aio_pika.RobustConnection
类特定参数¶
对于 aio_pika.RobustConnection
类,适用所有与 aio_pika.Connection
相关的参数,如 name
/interleave
/happy_eyeballs_delay
,以及一些特定参数:
reconnect_interval
(float
类似) - 重新建立连接的尝试间隔(以秒为单位),表示不超过此时间间隔进行重连尝试。fail_fast
(true
/yes
/y
/enable
/on
/enabled
/1
表示True
,否则为False
) - 在启动连接尝试时的特殊行为,如果尝试失败,则所有其他尝试将停止,并在连接阶段抛出异常。默认启用,如果你确定需要禁用此功能,请确保传递的 URL 实际可用。否则,程序将进入无休止的重连尝试,无法成功。
URL 示例¶
amqp://username:password@hostname/vhost?name=connection%20name&heartbeat=60&happy_eyeballs_delay=0.25
amqps://username:password@hostname/vhost?reconnect_interval=5&fail_fast=1
amqps://username:password@hostname/vhost?cafile=/path/to/ca.pem
amqps://username:password@hostname/vhost?cafile=/path/to/ca.pem&keyfile=/path/to/key.pem&certfile=/path/to/sert.pem
安装¶
使用pip:
pip install aio-pika
使用git:
# via pip
pip install https://github.com/mosquito/aio-pika/archive/master.zip
# manually
git clone https://github.com/mosquito/aio-pika.git
cd aio-pika
python setup.py install
开发¶
克隆项目:
git clone https://github.com/mosquito/aio-pika.git
cd aio-pika
创建一个属于 aio-pika 的虚拟环境:
virtualenv -p python3.5 env
安装 aio-pika 的所有依赖:
env/bin/pip install -e '.[develop]'
目录¶
感谢以下人员的贡献¶
@mosquito (author)
@decaz (steel persuasiveness while code review)
@heckad (bug fixes)
@smagafurov (bug fixes)
@hellysmile (bug fixes and ideas)
@altvod (bug fixes)
@alternativehood (bugfixes)
@cprieto (bug fixes)
@akhoronko (bug fixes)
@iselind (bug fixes)
@DXist (bug fixes)
@blazewicz (bug fixes)
@chibby0ne (bug fixes)
@jmccarrell (bug fixes)
@taybin (bug fixes)
@ollamh (bug fixes)
@DriverX (bug fixes)
@brianmedigate (bug fixes)
@dan-stone (bug fixes)
@Kludex (bug fixes)
@bmario (bug fixes)
@tzoiker (bug fixes)
@Pehat (bug fixes)
@WindowGenerator (bug fixes)
@dhontecillas (bug fixes)
@tilsche (bug fixes)
@leenr (bug fixes)
@la0rg (bug fixes)
@SolovyovAlexander (bug fixes)
@kremius (bug fixes)
@zyp (bug fixes)
@kajetanj (bug fixes)
@Alviner (moral support, debug sessions and good mood)
@Pavkazzz (composure, and patience while debug sessions)
@bbrodriges (supplying grammar while writing documentation)
@dizballanze (review, grammar)
同样参考¶
aiormq¶
aiormq 是一个纯 Python 的 AMQP 客户端库。它在 aio-pika 的底层实现中,可以在你需要与协议进行底层交互时使用。以下示例演示了用户 API 的用法。
Simple 消费者:
import asyncio
import aiormq
async def on_message(message):
"""
on_message 并不一定要定义为异步函数。这里展示的是它可以是异步的可能性。
"""
print(f" [x] Received message {message!r}")
print(f"Message body is: {message.body!r}")
print("Before sleep!")
await asyncio.sleep(5) # Represents async I/O operations
print("After sleep!")
async def main():
# 创建连接
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# 创建1个通道
channel = await connection.channel()
# 声明队列
declare_ok = await channel.queue_declare('helo')
consume_ok = await channel.basic_consume(
declare_ok.queue, on_message, no_ack=True
)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_forever()
Simple 发布者:
import asyncio
from typing import Optional
import aiormq
from aiormq.abc import DeliveredMessage
MESSAGE: Optional[DeliveredMessage] = None
async def main():
global MESSAGE
body = b'Hello World!'
# 创建连接
connection = await aiormq.connect("amqp://guest:guest@localhost//")
# 创建1个通道
channel = await connection.channel()
declare_ok = await channel.queue_declare("hello", auto_delete=True)
# 发送消息
await channel.basic_publish(body, routing_key='hello')
print(f" [x] Sent {body}")
MESSAGE = await channel.basic_get(declare_ok.queue)
print(f" [x] Received message from {declare_ok.queue!r}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
assert MESSAGE is not None
assert MESSAGE.routing_key == "hello"
assert MESSAGE.body == b'Hello World!'
patio 和 patio-rabbitmq¶
PATIO 是 “Python Asynchronous Tasks for AsyncIO(基于异步IO的python异步任务)” 的缩写——一个易于扩展的库,用于分布式任务执行,类似于 Celery,但主要设计为支持 asyncio。
patio-rabbitmq 让你能够使用 基于 RabbitMQ 的 RPC 服务,且实现非常简单:
from patio import Registry, ThreadPoolExecutor
from patio_rabbitmq import RabbitMQBroker
rpc = Registry(project="patio-rabbitmq", auto_naming=False)
@rpc("sum")
def sum(*args):
return sum(args)
async def main():
async with ThreadPoolExecutor(rpc, max_workers=16) as executor:
async with RabbitMQBroker(
executor, amqp_url="amqp://guest:guest@localhost/",
) as broker:
await broker.join()
调用方可以像这样编写:
import asyncio
from patio import NullExecutor, Registry
from patio_rabbitmq import RabbitMQBroker
async def main():
async with NullExecutor(Registry(project="patio-rabbitmq")) as executor:
async with RabbitMQBroker(
executor, amqp_url="amqp://guest:guest@localhost/",
) as broker:
print(await asyncio.gather(
*[
broker.call("mul", i, i, timeout=1) for i in range(10)
]
))
FastStream¶
FastStream 是一个强大且易于使用的 Python 库,用于构建与事件流交互的异步服务。
如果你不需要深入了解 RabbitMQ 的细节,可以使用更高层次的 FastStream 接口:
from faststream import FastStream
from faststream.rabbit import RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)
@broker.subscriber("user")
async def user_created(user_id: int):
assert isinstance(user_id, int)
return f"user-{user_id}: created"
@app.after_startup
async def pub_smth():
assert (
await broker.publish(1, "user", rpc=True)
) == "user-1: created"
此外,FastStream 通过 pydantic 验证消息,生成你的项目 AsyncAPI 规范,支持内存测试、RPC 调用等功能。
实际上,它是 aio-pika 之上的高层包装器,因此你可以同时利用这两个库的优势。
python-socketio¶
Socket.IO 是一种传输协议,能够实现客户端(通常是网页浏览器,但不局限于此)与服务器之间的实时双向事件驱动通信。此包提供了两种 Python 实现,分别为标准和 asyncio 版本。
此外,此包还适合通过 aio-pika 适配器构建基于 RabbitMQ 的消息服务:
import socketio
from aiohttp import web
sio = socketio.AsyncServer(client_manager=socketio.AsyncAioPikaManager())
app = web.Application()
sio.attach(app)
@sio.event
async def chat_message(sid, data):
print("message ", data)
if __name__ == '__main__':
web.run_app(app)
客户端可以通过以下方式调用 chat_message:
import asyncio
import socketio
sio = socketio.AsyncClient()
async def main():
await sio.connect('http://localhost:8080')
await sio.emit('chat_message', {'response': 'my response'})
if __name__ == '__main__':
asyncio.run(main())
taskiq 和 taskiq-aio-pika¶
Taskiq 是一个用于 Python 的异步分布式任务队列。该项目受到大型项目如 Celery 和 Dramatiq 的启发,但 Taskiq 可以发送和运行同步与异步函数。
该库还为运行任务提供了 aio-pika 代理。
from taskiq_aio_pika import AioPikaBroker
broker = AioPikaBroker()
@broker.task
async def test() -> None:
print("nothing")
async def main():
await broker.startup()
await test.kiq()
Rasa¶
拥有超过 2500 万次下载,Rasa Open Source 是构建聊天和语音 AI 助手的最流行的开源框架。
使用 Rasa,你可以在以下平台上构建上下文助手:
Facebook Messenger
Slack
Google Hangouts
Webex Teams
Microsoft Bot Framework
Rocket.Chat
Mattermost
Telegram
Twilio
你还可以创建自定义的对话渠道或语音助手,如:
Alexa Skills
Google Home Actions
Rasa 帮助你构建能够进行多层次对话的上下文助手,实现丰富的互动。为了让人类与上下文助手进行有意义的交流,助手需要能够利用上下文,基于之前讨论的内容进行扩展——Rasa 使你能够以可扩展的方式构建能够实现这一目标的助手。
它还使用 aio-pika 来与 RabbitMQ 深度交互!
版本控制¶
本软件遵循 语义化版本控制