模式和辅助工具¶
备注
自 aio-pika>=1.7.0 起可用
aio-pika 包含一些用于创建分布式系统的有用模式(patterns)。
Master/Worker¶
实现 Master/Worker 模式的辅助工具。这适用于在多个工作者之间平衡任务。
Master 节点创建任务:
import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import Master
async def main() -> None:
connection = await connect_robust(
"amqp://guest:guest@127.0.0.1/?name=aio-pika%20master",
)
async with connection:
# Creating channel
channel = await connection.channel()
master = Master(channel)
# Creates tasks by proxy object
for task_id in range(1000):
await master.proxy.my_task_name(task_id=task_id)
# Or using create_task method
for task_id in range(1000):
await master.create_task(
"my_task_name", kwargs=dict(task_id=task_id),
)
if __name__ == "__main__":
asyncio.run(main())
Worker 代码:
import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import Master, NackMessage, RejectMessage
async def worker(*, task_id: int) -> None:
# If you want to reject message or send
# nack you might raise special exception
if task_id % 2 == 0:
raise RejectMessage(requeue=False)
if task_id % 2 == 1:
raise NackMessage(requeue=False)
print(task_id)
async def main() -> None:
connection = await connect_robust(
"amqp://guest:guest@127.0.0.1/?name=aio-pika%20worker",
)
# Creating channel
channel = await connection.channel()
# Initializing Master with channel
master = Master(channel)
await master.create_worker("my_task_name", worker, auto_delete=True)
try:
await asyncio.Future()
finally:
await connection.close()
if __name__ == "__main__":
asyncio.run(main())
一个或多个工作者执行任务。
RPC¶
实现远程过程调用(Remote Procedure Call - RPC)模式的辅助工具。这适用于在多个工作者之间平衡任务。
调用者创建任务并等待结果:
import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import RPC
async def main() -> None:
connection = await connect_robust(
"amqp://guest:guest@127.0.0.1/",
client_properties={"connection_name": "caller"},
)
async with connection:
# Creating channel
channel = await connection.channel()
rpc = await RPC.create(channel)
# Creates tasks by proxy object
for i in range(1000):
print(await rpc.proxy.multiply(x=100, y=i))
# Or using create_task method
for i in range(1000):
print(await rpc.call("multiply", kwargs=dict(x=100, y=i)))
if __name__ == "__main__":
asyncio.run(main())
一个或多个被调用者执行任务:
import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import RPC
async def multiply(*, x: int, y: int) -> int:
return x * y
async def main() -> None:
connection = await connect_robust(
"amqp://guest:guest@127.0.0.1/",
client_properties={"connection_name": "callee"},
)
# Creating channel
channel = await connection.channel()
rpc = await RPC.create(channel)
await rpc.register("multiply", multiply, auto_delete=True)
try:
await asyncio.Future()
finally:
await connection.close()
if __name__ == "__main__":
asyncio.run(main())
扩展¶
这两种模式的序列化行为可以通过继承和重新定义方法 aio_pika.patterns.base.serialize()
和 aio_pika.patterns.base.deserialize()
来改变。
下面的示例演示了这一点:
from typing import Any
import msgpack # type: ignore
from aio_pika.patterns import RPC, Master
class MsgpackRPC(RPC):
CONTENT_TYPE = "application/msgpack"
def serialize(self, data: Any) -> bytes:
return msgpack.dumps(data)
def deserialize(self, data: bytes) -> bytes:
return msgpack.loads(data)
class MsgpackMaster(Master):
CONTENT_TYPE = "application/msgpack"
def serialize(self, data: Any) -> bytes:
return msgpack.dumps(data)
def deserialize(self, data: bytes) -> bytes:
return msgpack.loads(data)