Streams

在 AnyIO 中, “流(stream)” 是一个简单的接口, 用于将信息从一个地方传输到另一个地方。它可以表示进程间通信或通过网络发送数据。AnyIO 将流分为两类:字节流(byte streams)和对象流(object streams)。

字节流(在 Trio 术语中为“Streams”)是接收和/或发送字节块的对象。它们是基于流套接字的限制来建模的, 这意味着边界不会被严格遵守。实际上, 这意味着例如你调用 .send(b'hello ') 然后 .send(b'world'), 另一端接收到的数据将被任意拆分, 如( b'hello'b' world' )、 b'hello world' 或( b'hel'b'lo wo'b'rld')。

另一方面, 对象流(在 Trio 术语中为“Channels”)处理 Python 对象。这些流最常见的实现是内存对象流。对象流的具体语义因实现方式而异。

许多流实现会包装其他流。其中一些可以包装任何字节导向的流, 即 ObjectStream[bytes]ByteStream。这使得许多有趣的用例成为可能。

内存对象流

Memory object streams

内存对象流用于实现带有多个任务的生产者-消费者模式。通过使用 create_memory_object_stream(), 你将获得一对对象流:一个用于发送, 一个用于接收。它们本质上像队列, 但支持关闭和异步迭代。

默认情况下, 内存对象流创建时的缓冲区大小为 0。这意味着 send() 将会阻塞, 直到有另一个任务调用 receive()。你可以在创建流时设置自定义的缓冲区大小。也可以通过传递 math.inf 作为缓冲区大小来创建一个无限缓冲区, 但这并不推荐使用。

内存对象流可以通过调用 clone() 方法进行克隆。每个克隆可以单独关闭, 但只有当所有克隆都关闭时, 流的每一端才会被认为是关闭的。例如, 如果你有两个接收流的克隆, 发送流只有在两个接收流都关闭后才会开始引发 BrokenResourceError

多个任务可以在同一个内存对象流(或其克隆)上进行发送和接收, 但每个发送的项只会交付给一个接收者。

内存对象流的接收端可以使用异步迭代协议进行迭代。当所有发送流的克隆都关闭时, 循环会退出。

示例:

from anyio import create_task_group, create_memory_object_stream, run
from anyio.streams.memory import MemoryObjectReceiveStream


async def process_items(receive_stream: MemoryObjectReceiveStream[str]) -> None:
    async with receive_stream:
        async for item in receive_stream:
            print('received', item)


async def main():
    # [str] 指定了通过内存对象流传递的对象类型。这是一个技巧, 因为 create_memory_object_stream
    # 实际上是一个伪装成函数的类。
    send_stream, receive_stream = create_memory_object_stream[str]()
    async with create_task_group() as tg:
        tg.start_soon(process_items, receive_stream)
        async with send_stream:
            for num in range(10):
                await send_stream.send(f'number {num}')

run(main)

与其他 AnyIO 流不同(但与 Trio 的 Channels 一致), 内存对象流可以同步关闭, 使用 close() 方法或将流作为上下文管理器来关闭:

from anyio.streams.memory import MemoryObjectSendStream


def synchronous_callback(send_stream: MemoryObjectSendStream[str]) -> None:
    with send_stream:
        send_stream.send_nowait('hello')

装订流

Stapled streams

一个拼接流将任何互相兼容的接收流和发送流结合在一起, 形成一个单一的双向流。

它有两种变体:

缓冲字节流

Buffered byte streams

缓冲字节流包装了一个现有的字节导向接收流, 并提供了一些需要缓冲的功能, 例如接收指定数量的字节, 或接收直到找到给定的分隔符为止。

示例:

from anyio import run, create_memory_object_stream
from anyio.streams.buffered import BufferedByteReceiveStream


async def main():
    send, receive = create_memory_object_stream
    buffered = BufferedByteReceiveStream(receive)
    for part in b'hel', b'lo, ', b'wo', b'rld!':
        await send.send(part)

    result = await buffered.receive_exactly(8)
    print(repr(result))

    result = await buffered.receive_until(b'!', 10)
    print(repr(result))

run(main)

上述脚本会输出以下内容:

b'hello, w'
b'orld'

文本流

Text streams

文本流包装了现有的接收/发送流, 并将字符串编码/解码为字节, 反之亦然。

示例:

from anyio import run, create_memory_object_stream
from anyio.streams.text import TextReceiveStream, TextSendStream


async def main():
    bytes_send, bytes_receive = create_memory_object_stream
    text_send = TextSendStream(bytes_send)
    await text_send.send('åäö')
    result = await bytes_receive.receive()
    print(repr(result))

    text_receive = TextReceiveStream(bytes_receive)
    await bytes_send.send(result)
    result = await text_receive.receive()
    print(repr(result))

run(main)

上述脚本会输出以下内容:

b'\xc3\xa5\xc3\xa4\xc3\xb6'
'åäö'

文件流

File streams

文件流用于从文件系统中读取或写入文件。它们对于将文件替代为其他数据源, 或将输出写入文件以用于日志记录或调试目的非常有用。

示例:

from anyio import run
from anyio.streams.file import FileReadStream, FileWriteStream


async def main():
    path = '/tmp/testfile'
    async with await FileWriteStream.from_path(path) as stream:
        await stream.send(b'Hello, World!')

    async with await FileReadStream.from_path(path) as stream:
        async for chunk in stream:
            print(chunk.decode(), end='')

    print()

run(main)

Added in version 3.0.

TLS 流

TLS streams

TLS(传输层安全性), 是SSL(安全套接字层)的继任者, 是在AnyIO中为TCP流提供身份验证和保密性的支持方式。

TLS通常在连接建立后立即进行。握手过程包括以下步骤:

  • 将证书发送给对端(通常仅由服务器进行)

  • 将对端的证书与受信任的CA证书进行验证

  • 检查对端主机名是否与证书匹配

获取服务器证书

Obtaining a server certificate

获取X.509证书用于服务器的主要方式有三种:

  1. 创建一个自签名证书

  2. 使用 certbot 或类似软件从 Let's Encrypt 自动获取证书

  3. 从证书供应商处购买一个证书

第一种选项可能是最简单的, 但这要求任何连接到您的服务器的客户端将自签名证书添加到其受信任证书列表中。显然, 这在本地开发之外是不可行的, 并且在生产环境中强烈不推荐使用。

第二种选项如今是推荐的方法, 只要您有一个可以运行certbot_或类似软件的环境, 并且能够在必要时自动替换证书为更新版本, 同时不需要额外的功能, 比如类2验证。

第三种选项可能是您唯一有效的选择, 特别是在您对证书有特殊要求, 只有证书供应商可以满足这些要求, 或者在您的环境中自动更新证书不可行或不切实际时。

使用自签名证书

Using self signed certificates

要为 localhost 创建一个自签名证书, 可以使用openssl_命令行工具:

openssl req -x509 -newkey rsa:2048 -subj '/CN=localhost' -keyout key.pem -out cert.pem -nodes -days 365

这将创建一个(2048位)私有RSA密钥(key.pem)和一个证书(cert.pem), 其主机名为“localhost”。在这些设置下, 证书的有效期为一年。

要使用这个密钥-证书对设置服务器, 可以参考以下示例:

import ssl

from anyio import create_tcp_listener, run
from anyio.streams.tls import TLSListener


async def handle(client):
    async with client:
        name = await client.receive()
        await client.send(b'Hello, %s\n' % name)


async def main():
    # 创建一个用于客户端身份验证的上下文
    context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)

    # 加载服务器证书和私钥
    context.load_cert_chain(certfile='cert.pem', keyfile='key.pem')

    # 创建监听器并开始提供连接服务
    listener = TLSListener(await create_tcp_listener(local_port=1234), context)
    await listener.serve(handle)

run(main)

然后, 可以使用以下方式连接到此服务器:

import ssl

from anyio import connect_tcp, run


async def main():
    # 如果证书没有被您计算机上安装的CA证书信任, 则需要以下两个步骤;
    # 如果您使用Let's Encrypt或商业证书供应商, 则可以跳过这部分
    context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
    context.load_verify_locations(cafile='cert.pem')

    async with await connect_tcp('localhost', 1234, ssl_context=context) as client:
        await client.send(b'Client\n')
        response = await client.receive()
        print(response)

run(main)

动态创建自签名证书

Creating self-signed certificates on the fly

在测试启用了TLS的服务时, 动态生成证书会更加方便。为此, 您可以使用 trustme 库:

import ssl

import pytest
import trustme


@pytest.fixture(scope='session')
def ca():
    return trustme.CA()


@pytest.fixture(scope='session')
def server_context(ca):
    server_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    ca.issue_cert('localhost').configure_cert(server_context)
    return server_context


@pytest.fixture(scope='session')
def client_context(ca):
    client_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
    ca.configure_trust(client_context)
    return client_context

然后, 您可以将上述fixture中的服务器和客户端上下文传递给 TLSListenerwrap() 或您在任一端使用的任何方法。

处理不规则的 EOF

Dealing with ragged EOFs

根据 TLS标准 , 加密连接应该以关闭握手结束。此做法可以防止所谓的 截断攻击 。然而, 广泛使用的协议实现(如HTTP)通常忽略这一要求, 因为协议级别的关闭信号会使关闭握手变得多余。

AnyIO 默认遵循此标准(不同于Python标准库的 ssl 模块)。这一实践的实际含义是, 如果您正在实现一个预期跳过TLS关闭握手的协议, 您需要在 wrap()TLSListener 中传递 standard_compatible=False 选项。