异步示例(Asyncio Examples)¶
所有命令都是协程函数。
连接和断开连接(Connecting and Disconnecting)¶
使用 asyncio Redis 需要显式地断开连接,因为没有 asyncio 的析构魔法方法。默认情况下,在 redis.Redis()
中创建一个内部连接池并附加到 Redis
实例。当调用 Redis.aclose
时,这个内部连接池会自动关闭,从而断开所有连接。
[1]:
import redis.asyncio as redis
client = redis.Redis()
print(f"Ping successful: {await client.ping()}")
await client.aclose()
Ping successful: True
如果您创建一个自定义的 ConnectionPool
以供单个 Redis
实例使用,请使用 Redis.from_pool
类方法。Redis 客户端将拥有这个连接池。这将导致连接池与 Redis 实例一起断开连接。断开连接池只是简单地断开池中所有托管连接。
[ ]:
import redis.asyncio as redis
pool = redis.ConnectionPool.from_url("redis://localhost")
client = redis.Redis.from_pool(pool)
await client.aclose()
但是,如果 ConnectionPool
要被多个 Redis
实例共享,您应该使用 connection_pool
参数,并且可能需要显式地断开连接池。
[2]:
import redis.asyncio as redis
pool = redis.ConnectionPool.from_url("redis://localhost")
client1 = redis.Redis(connection_pool=pool)
client2 = redis.Redis(connection_pool=pool)
await client1.aclose()
await client2.aclose()
await pool.aclose()
默认情况下,该库使用 RESP 协议的版本 2。要启用 RESP 版本 3,您需要将 protocol
设置为 3。
[ ]:
import redis.asyncio as redis
client = redis.Redis(protocol=3)
await client.aclose()
await client.ping()
事务 (Multi/Exec)¶
aioredis.Redis.pipeline 将返回一个 aioredis.Pipeline 对象,该对象会在内存中缓冲所有命令,并使用 Redis Bulk String 协议将其编译成批次。此外,每个命令将返回 Pipeline 实例,允许您链式调用命令,例如 p.set('foo', 1).set('bar', 2).mget('foo', 'bar')
。
在调用并等待 execute()
之前,这些命令不会反映在 Redis 中。
通常,在执行批量操作时,利用“事务”(例如 Multi/Exec)是值得的,因为它还将为您的批量操作增加一层原子性。
[3]:
import redis.asyncio as redis
r = await redis.from_url("redis://localhost")
async with r.pipeline(transaction=True) as pipe:
ok1, ok2 = await (pipe.set("key1", "value1").set("key2", "value2").execute())
assert ok1
assert ok2
发布/订阅模式(Pub/Sub Mode)¶
订阅指定频道:
[4]:
import asyncio
import redis.asyncio as redis
STOPWORD = "STOP"
async def reader(channel: redis.client.PubSub):
while True:
message = await channel.get_message(ignore_subscribe_messages=True, timeout=None)
if message is not None:
print(f"(Reader) Message Received: {message}")
if message["data"].decode() == STOPWORD:
print("(Reader) STOP")
break
r = redis.from_url("redis://localhost")
async with r.pubsub() as pubsub:
await pubsub.subscribe("channel:1", "channel:2")
future = asyncio.create_task(reader(pubsub))
await r.publish("channel:1", "Hello")
await r.publish("channel:2", "World")
await r.publish("channel:1", STOPWORD)
await future
(Reader) Message Received: {'type': 'message', 'pattern': None, 'channel': b'channel:1', 'data': b'Hello'}
(Reader) Message Received: {'type': 'message', 'pattern': None, 'channel': b'channel:2', 'data': b'World'}
(Reader) Message Received: {'type': 'message', 'pattern': None, 'channel': b'channel:1', 'data': b'STOP'}
(Reader) STOP
订阅与通配符样式模式匹配的频道:
[5]:
import asyncio
import redis.asyncio as redis
STOPWORD = "STOP"
async def reader(channel: redis.client.PubSub):
while True:
message = await channel.get_message(ignore_subscribe_messages=True, timeout=None)
if message is not None:
print(f"(Reader) Message Received: {message}")
if message["data"].decode() == STOPWORD:
print("(Reader) STOP")
break
r = await redis.from_url("redis://localhost")
async with r.pubsub() as pubsub:
await pubsub.psubscribe("channel:*")
future = asyncio.create_task(reader(pubsub))
await r.publish("channel:1", "Hello")
await r.publish("channel:2", "World")
await r.publish("channel:1", STOPWORD)
await future
(Reader) Message Received: {'type': 'pmessage', 'pattern': b'channel:*', 'channel': b'channel:1', 'data': b'Hello'}
(Reader) Message Received: {'type': 'pmessage', 'pattern': b'channel:*', 'channel': b'channel:2', 'data': b'World'}
(Reader) Message Received: {'type': 'pmessage', 'pattern': b'channel:*', 'channel': b'channel:1', 'data': b'STOP'}
(Reader) STOP
哨兵客户端(Sentinel)¶
哨兵客户端需要一组 Redis Sentinel 地址以进行连接并开始发现服务。
调用 aioredis.sentinel.Sentinel.master_for
或 aioredis.sentinel.Sentinel.slave_for
方法将返回连接到 Sentinel 监控的指定服务的 Redis 客户端。
Sentinel 客户端将自动检测故障转移并重新连接 Redis 客户端。
[ ]:
import asyncio
from redis.asyncio.sentinel import Sentinel
sentinel = Sentinel([("localhost", 26379), ("sentinel2", 26379)])
r = sentinel.master_for("mymaster")
ok = await r.set("key", "value")
assert ok
val = await r.get("key")
assert val == b"value"
通过指定 URL 方案连接到 Redis 实例¶
参数作为参数传递给以下方案。
支持三种 URL 方案:
redis://
创建一个 TCP 套接字连接。 https://www.iana.org/assignments/uri-schemes/prov/redisrediss://
创建一个 SSL 包装的 TCP 套接字连接。 https://www.iana.org/assignments/uri-schemes/prov/redissunix://
创建一个 Unix 域套接字连接。
[ ]:
import redis.asyncio as redis
url_connection = redis.from_url("redis://localhost:6379?decode_responses=True")
url_connection.ping()
True
要启用 RESP 3 协议,请在 URL 后附加 protocol=3
。
[ ]:
import redis.asyncio as redis
url_connection = redis.from_url("redis://localhost:6379?decode_responses=True&protocol=3")
url_connection.ping()