Redis 流示例(Stream Examples)

基本配置(basic config)

redis_host = "redis"
stream_key = "skey"
stream2_key = "s2key"
group1 = "grp1"
group2 = "grp2"


import redis
from time import time
from redis.exceptions import ConnectionError, DataError, NoScriptError, RedisError, ResponseError

r = redis.Redis( redis_host )

xadd 和 xread


for i in range(0,10):
    r.xadd( stream_key, { 'ts': time(), 'v': i } )
print( f"stream length: {r.xlen( stream_key )}")
stream length: 10


## read 2 entries from stream_key
l = r.xread( count=2, streams={stream_key:0} )
[[b'skey', [(b'1710790167982-0', {b'ts': b'1710790167.9824948', b'v': b'0'}), (b'1710790167983-0', {b'ts': b'1710790167.9830241', b'v': b'1'})]]]

从返回结构提取数据(extract data from the returned structure)

first_stream = l[0]
print( f"got data from stream: {first_stream[0]}")
fs_data = first_stream[1]
for id, value in fs_data:
    print( f"id: {id} value: {value[b'v']}")
got data from stream: b'skey'
id: b'1710790167982-0' value: b'0'
id: b'1710790167983-0' value: b'1'


如果我们使用相同的参数调用 xread,将会获取相同的数据。

l = r.xread( count=2, streams={stream_key:0} )
for id, value in l[0][1]:
    print( f"id: {id} value: {value[b'v']}")
id: b'1710790167982-0' value: b'0'
id: b'1710790167983-0' value: b'1'


last_id_returned = l[0][1][-1][0]
l = r.xread( count=2, streams={stream_key: last_id_returned} )
for id, value in l[0][1]:
    print( f"id: {id} value: {value[b'v']}")
id: b'1710790167983-1' value: b'2'
id: b'1710790167983-2' value: b'3'
last_id_returned = l[0][1][-1][0]
l = r.xread( count=2, streams={stream_key: last_id_returned} )
for id, value in l[0][1]:
    print( f"id: {id} value: {value[b'v']}")
id: b'1710790167983-3' value: b'4'
id: b'1710790167983-4' value: b'5'


print( f"stream length: {r.xlen( stream_key )}")
# wait for 5s for new messages
l = r.xread( count=1, block=5000, streams={stream_key: '$'} )
print( f"after 5s block, got an empty list {l}, no *new* messages on the stream")
print( f"stream length: {r.xlen( stream_key )}")
stream length: 10
after 5s block, got an empty list [], no *new* messages on the stream
stream length: 10


# read the last available message
l = r.xread( count=1, streams={stream_key: '+'} )
print( f"stream length: {r.xlen( stream_key )}")
[[b'skey', [(b'1710790167984-0', {b'ts': b'1710790167.9839962', b'v': b'9'})]]]
stream length: 10

第二个流(2nd stream)


for i in range(1000,1010):
    r.xadd( stream2_key, { 'v': i } )
print( f"stream length: {r.xlen( stream2_key )}")
stream length: 10


l = r.xread( count=1, streams={stream_key:0,stream2_key:0} )
for k,d in l:
    print(f"got from {k} the entry {d}")
got from b'skey' the entry [(b'1710790167982-0', {b'ts': b'1710790167.9824948', b'v': b'0'})]
got from b's2key' the entry [(b'1710790173142-0', {b'v': b'1000'})]

流组(Stream groups)

使用流组可以跟踪多个消费者,并在 Redis 端查看哪些消息已经被消费。

向流中添加数据(add some data to streams)

创建两个流,每个流包含 10 条消息。

def add_some_data_to_stream( sname, key_range ):
    for i in key_range:
        r.xadd( sname, { 'ts': time(), 'v': i } )
    print( f"stream '{sname}' length: {r.xlen( stream_key )}")

add_some_data_to_stream( stream_key, range(0,10) )
add_some_data_to_stream( stream2_key, range(1000,1010) )
stream 'skey' length: 20
stream 's2key' length: 20


  • 创建一个组 grp1,与流 skey 关联;

  • 创建一个组 grp2,与流 skeys2key 关联。

使用 xinfo_group 来验证组创建的结果。

## create the group
def create_group( skey, gname ):
        r.xgroup_create( name=skey, groupname=gname, id=0 )
    except ResponseError as e:
        print(f"raised: {e}")

# group1 read the stream 'skey'
create_group( stream_key, group1 )
# group2 read the streams 'skey' and 's2key'
create_group( stream_key, group2 )
create_group( stream2_key, group2 )

def group_info( skey ):
    res = r.xinfo_groups( name=skey )
    for i in res:
        print( f"{skey} -> group name: {i['name']} with {i['consumers']} consumers and {i['last-delivered-id']}"
              + f" as last read id")

group_info( stream_key )
group_info( stream2_key )
skey -> group name: b'grp1' with 0 consumers and b'0-0' as last read id
skey -> group name: b'grp2' with 0 consumers and b'0-0' as last read id
s2key -> group name: b'grp2' with 0 consumers and b'0-0' as last read id


xreadgroup 方法允许从流组中读取数据。

def print_xreadgroup_reply( reply, group = None, run = None):
    for d_stream in reply:
        for element in d_stream[1]:
            print(  f"got element {element[0]}"
                  + f"from stream {d_stream[0]}" )
            if run is not None:
                run( d_stream[0], group, element[0] )
# read some messages on group1 with consumer 'c'
d = r.xreadgroup( groupname=group1, consumername='c', block=10,
                  count=2, streams={stream_key:'>'})
print_xreadgroup_reply( d )
got element b'1710790167982-0'from stream b'skey'
got element b'1710790167983-0'from stream b'skey'

对于同一流组的 第二个消费者,将不会收到已经发送的消息。

# read some messages on group1 with consumer 'c'
d = r.xreadgroup( groupname=group1, consumername='c2', block=10,
                  count=2, streams={stream_key:'>'})
print_xreadgroup_reply( d )
got element b'1710790167983-1'from stream b'skey'
got element b'1710790167983-2'from stream b'skey'

但是 第二个流组 可以再次读取已经发送的消息。


d2 = r.xreadgroup( groupname=group2, consumername='c', block=10,
                   count=2, streams={stream_key:'>',stream2_key:'>'})
print_xreadgroup_reply( d2 )
got element b'1710790167982-0'from stream b'skey'
got element b'1710790167983-0'from stream b'skey'
got element b'1710790173142-0'from stream b's2key'
got element b'1710790173143-0'from stream b's2key'

要检查待处理的消息(已发送但未确认的消息),我们可以使用 xpending

# check pending status (read messages without a ack)
def print_pending_info( key_group ):
    for s,k in key_group:
        pr = r.xpending( name=s, groupname=k )
        print( f"{pr.get('pending')} pending messages on '{s}' for group '{k}'" )

print_pending_info( ((stream_key,group1),(stream_key,group2),(stream2_key,group2)) )
4 pending messages on 'skey' for group 'grp1'
2 pending messages on 'skey' for group 'grp2'
2 pending messages on 's2key' for group 'grp2'


使用 xack 确认一些消息。

# do acknowledges for group1
toack = lambda k,g,e: r.xack( k,g, e )
print_xreadgroup_reply( d, group=group1, run=toack )
got element b'1710790167983-1'from stream b'skey'
got element b'1710790167983-2'from stream b'skey'
# check pending again
print_pending_info( ((stream_key,group1),(stream_key,group2),(stream2_key,group2)) )
2 pending messages on 'skey' for group 'grp1'
2 pending messages on 'skey' for group 'grp2'
2 pending messages on 's2key' for group 'grp2'

确认 group1 中的所有消息.

d = r.xreadgroup( groupname=group1, consumername='c', block=10,
                      count=100, streams={stream_key:'>'})
print_xreadgroup_reply( d, group=group1, run=toack)
print_pending_info( ((stream_key,group1),) )
got element b'1710790167983-3'from stream b'skey'
got element b'1710790167983-4'from stream b'skey'
got element b'1710790167983-5'from stream b'skey'
got element b'1710790167983-6'from stream b'skey'
got element b'1710790167983-7'from stream b'skey'
got element b'1710790167984-0'from stream b'skey'
got element b'1710790173157-0'from stream b'skey'
got element b'1710790173158-0'from stream b'skey'
got element b'1710790173158-1'from stream b'skey'
got element b'1710790173158-2'from stream b'skey'
got element b'1710790173158-3'from stream b'skey'
got element b'1710790173158-4'from stream b'skey'
got element b'1710790173158-5'from stream b'skey'
got element b'1710790173159-0'from stream b'skey'
got element b'1710790173159-1'from stream b'skey'
got element b'1710790173159-2'from stream b'skey'
2 pending messages on 'skey' for group 'grp1'

但是在对 group1 上的所有消息执行 xack 后,流的长度将保持不变。



要删除消息,需要使用 xdel 显式删除它们。

s1 = r.xread( streams={stream_key:0} )
for streams in s1:
    stream_name, messages = streams
    # del all ids from the message list
    [ r.xdel( stream_name, i[0] ) for i in messages ]



但使用 xdel 后,第二个组可以从 skey 中读取任何未处理的消息。

d2 = r.xreadgroup( groupname=group2, consumername='c', block=10,
                   count=2, streams={stream_key:'>',stream2_key:'>'})
print_xreadgroup_reply( d2 )
got element b'1657571042113-1'from stream b's2key'
got element b'1657571042114-0'from stream b's2key'
[ ]: