异步示例(Asyncio Examples)

所有命令都是协程函数。

连接和断开连接(Connecting and Disconnecting)

使用 asyncio Redis 需要显式地断开连接,因为没有 asyncio 的析构魔法方法。默认情况下,在 redis.Redis() 中创建一个内部连接池并附加到 Redis 实例。当调用 Redis.aclose 时,这个内部连接池会自动关闭,从而断开所有连接。

[1]:
import redis.asyncio as redis

client = redis.Redis()
print(f"Ping successful: {await client.ping()}")
await client.aclose()
Ping successful: True

如果您创建一个自定义的 ConnectionPool 以供单个 Redis 实例使用,请使用 Redis.from_pool 类方法。Redis 客户端将拥有这个连接池。这将导致连接池与 Redis 实例一起断开连接。断开连接池只是简单地断开池中所有托管连接。

[ ]:
import redis.asyncio as redis

pool = redis.ConnectionPool.from_url("redis://localhost")
client = redis.Redis.from_pool(pool)
await client.aclose()

但是,如果 ConnectionPool 要被多个 Redis 实例共享,您应该使用 connection_pool 参数,并且可能需要显式地断开连接池。

[2]:
import redis.asyncio as redis

pool = redis.ConnectionPool.from_url("redis://localhost")
client1 = redis.Redis(connection_pool=pool)
client2 = redis.Redis(connection_pool=pool)
await client1.aclose()
await client2.aclose()
await pool.aclose()

默认情况下,该库使用 RESP 协议的版本 2。要启用 RESP 版本 3,您需要将 protocol 设置为 3。

[ ]:
import redis.asyncio as redis

client = redis.Redis(protocol=3)
await client.aclose()
await client.ping()

事务 (Multi/Exec)

aioredis.Redis.pipeline 将返回一个 aioredis.Pipeline 对象,该对象会在内存中缓冲所有命令,并使用 Redis Bulk String 协议将其编译成批次。此外,每个命令将返回 Pipeline 实例,允许您链式调用命令,例如 p.set('foo', 1).set('bar', 2).mget('foo', 'bar')

在调用并等待 execute() 之前,这些命令不会反映在 Redis 中。

通常,在执行批量操作时,利用“事务”(例如 Multi/Exec)是值得的,因为它还将为您的批量操作增加一层原子性。

[3]:
import redis.asyncio as redis

r = await redis.from_url("redis://localhost")
async with r.pipeline(transaction=True) as pipe:
    ok1, ok2 = await (pipe.set("key1", "value1").set("key2", "value2").execute())
assert ok1
assert ok2

发布/订阅模式(Pub/Sub Mode)

订阅指定频道:

[4]:
import asyncio

import redis.asyncio as redis

STOPWORD = "STOP"


async def reader(channel: redis.client.PubSub):
    while True:
        message = await channel.get_message(ignore_subscribe_messages=True, timeout=None)
        if message is not None:
            print(f"(Reader) Message Received: {message}")
            if message["data"].decode() == STOPWORD:
                print("(Reader) STOP")
                break

r = redis.from_url("redis://localhost")
async with r.pubsub() as pubsub:
    await pubsub.subscribe("channel:1", "channel:2")

    future = asyncio.create_task(reader(pubsub))

    await r.publish("channel:1", "Hello")
    await r.publish("channel:2", "World")
    await r.publish("channel:1", STOPWORD)

    await future
(Reader) Message Received: {'type': 'message', 'pattern': None, 'channel': b'channel:1', 'data': b'Hello'}
(Reader) Message Received: {'type': 'message', 'pattern': None, 'channel': b'channel:2', 'data': b'World'}
(Reader) Message Received: {'type': 'message', 'pattern': None, 'channel': b'channel:1', 'data': b'STOP'}
(Reader) STOP

订阅与通配符样式模式匹配的频道:

[5]:
import asyncio

import redis.asyncio as redis

STOPWORD = "STOP"


async def reader(channel: redis.client.PubSub):
    while True:
        message = await channel.get_message(ignore_subscribe_messages=True, timeout=None)
        if message is not None:
            print(f"(Reader) Message Received: {message}")
            if message["data"].decode() == STOPWORD:
                print("(Reader) STOP")
                break


r = await redis.from_url("redis://localhost")
async with r.pubsub() as pubsub:
    await pubsub.psubscribe("channel:*")

    future = asyncio.create_task(reader(pubsub))

    await r.publish("channel:1", "Hello")
    await r.publish("channel:2", "World")
    await r.publish("channel:1", STOPWORD)

    await future
(Reader) Message Received: {'type': 'pmessage', 'pattern': b'channel:*', 'channel': b'channel:1', 'data': b'Hello'}
(Reader) Message Received: {'type': 'pmessage', 'pattern': b'channel:*', 'channel': b'channel:2', 'data': b'World'}
(Reader) Message Received: {'type': 'pmessage', 'pattern': b'channel:*', 'channel': b'channel:1', 'data': b'STOP'}
(Reader) STOP

哨兵客户端(Sentinel)

哨兵客户端需要一组 Redis Sentinel 地址以进行连接并开始发现服务。

调用 aioredis.sentinel.Sentinel.master_foraioredis.sentinel.Sentinel.slave_for 方法将返回连接到 Sentinel 监控的指定服务的 Redis 客户端。

Sentinel 客户端将自动检测故障转移并重新连接 Redis 客户端。

[ ]:
import asyncio

from redis.asyncio.sentinel import Sentinel


sentinel = Sentinel([("localhost", 26379), ("sentinel2", 26379)])
r = sentinel.master_for("mymaster")

ok = await r.set("key", "value")
assert ok
val = await r.get("key")
assert val == b"value"

通过指定 URL 方案连接到 Redis 实例

参数作为参数传递给以下方案。

支持三种 URL 方案:

[ ]:
import redis.asyncio as redis
url_connection = redis.from_url("redis://localhost:6379?decode_responses=True")
url_connection.ping()
True

要启用 RESP 3 协议,请在 URL 后附加 protocol=3

[ ]:
import redis.asyncio as redis

url_connection = redis.from_url("redis://localhost:6379?decode_responses=True&protocol=3")
url_connection.ping()