21. 非阻塞流¶
21. Non-Blocking Streams
asyncio 的一个主要好处是能够使用非阻塞流。
让我们仔细看看。
A major benefit of asyncio is the ability to use non-blocking streams.
Let’s take a closer look.
21.1 Asyncio 的流¶
21.1 Asyncio Streams
Asyncio 提供非阻塞 I/O 套接字编程。
这是通过流(streams)提供的。
流(streams)是高级异步/等待就绪原语,可与网络连接一起使用。 流允许在不使用回调或低级协议和传输的情况下发送和接收数据。
可以打开提供对流写入器和流写入器的访问的套接字。
然后可以使用协程在流中写入和读取数据,并在适当的时候挂起。
完成后,可以关闭套接字。
异步流功能是低级的,这意味着必须手动实现所需的任何协议。
这可能包括常见的网络协议,例如:
- 用于与 Web 服务器交互的 HTTP 或 HTTPS
- 用于与电子邮件服务器交互的 SMTP
- 用于与文件服务器交互的 FTP。
这些流还可用于创建服务器来使用标准协议处理请求,或开发您自己的特定于应用程序的协议。
现在我们知道什么是异步流,让我们看看如何使用它们。
Asyncio provides non-blocking I/O socket programming.
This is provided via streams.
Streams are high-level async/await-ready primitives to work with network connections. Streams allow sending and receiving data without using callbacks or low-level protocols and transports.
— ASYNCIO STREAMS
Sockets can be opened that provide access to a stream writer and a stream writer.
Data can then be written and read from the stream using coroutines, suspending when appropriate.
Once finished, the socket can be closed.
The asyncio streams capability is low-level meaning that any protocols required must be implemented manually.
This might include common web protocols, such as:
- HTTP or HTTPS for interacting with web servers
- SMTP for interacting with email servers
- FTP for interacting with file servers.
The streams can also be used to create a server to handle requests using a standard protocol, or to develop your own application-specific protocol.
Now that we know what asyncio streams are, let’s look at how to use them.
21.2 如何打开一个连接¶
21.2 How to Open a Connection
可以使用 asyncio.open_connection() 函数打开 asyncio TCP 客户端套接字连接。
建立网络连接并返回一对(reader、writer)对象。 返回的读取器和写入器对象是 StreamReader 和 StreamWriter 类的实例。
这是一个必须等待的协程,一旦套接字连接打开就会返回。
该函数返回一个 StreamReader 和 StreamWriter 对象,用于与套接字交互。
例如:
...
# 打开一个连接
reader, writer = await asyncio.open_connection(...)
asyncio.open_connection() 函数需要许多参数来配置套接字连接。
两个必需的参数是主机和端口。
主机是一个字符串,指定要连接的服务器,例如域名或IP地址。
port 是套接字端口号,例如 HTTP 服务器为 80,HTTPS 服务器为 443,SMTP 为 23 等。
例如:
...
# 打开与 http 服务器的连接
reader, writer = await asyncio.open_connection('www.google.com', 80)
SSL 协议支持加密套接字连接。
最常见的例子可能是 HTTPS,它正在取代 HTTP。
这可以通过将“ssl”参数设置为True来实现。
例如:
...
# 打开与 https 服务器的连接
reader, writer = await asyncio.open_connection('www.google.com', 443, ssl=True)
An asyncio TCP client socket connection can be opened using the asyncio.open_connection() function.
Establish a network connection and return a pair of (reader, writer) objects. The returned reader and writer objects are instances of StreamReader and StreamWriter classes.
This is a coroutine that must be awaited and will return once the socket connection is open.
The function returns a StreamReader and StreamWriter object for interacting with the socket.
For example:
...
# open a connection
reader, writer = await asyncio.open_connection(...)
The asyncio.open_connection() function takes many arguments in order to configure the socket connection.
The two required arguments are the host and the port.
The host is a string that specifies the server to connect to, such as a domain name or an IP address.
The port is the socket port number, such as 80 for HTTP servers, 443 for HTTPS servers, 23 for SMTP and so on.
For example:
...
# open a connection to an http server
reader, writer = await asyncio.open_connection('www.google.com', 80)
Encrypted socket connections are supported over the SSL protocol.
The most common example might be HTTPS which is replacing HTTP.
This can be achieved by setting the “ssl” argument to True.
For example:
...
# open a connection to an https server
reader, writer = await asyncio.open_connection('www.google.com', 443, ssl=True)
21.3 如何启动一个侦听服务¶
21.3 How to Start a Server
可以使用 asyncio.start_server() 函数打开 asyncio TCP 服务器套接字。
创建一个 TCP 服务器(套接字类型 SOCK_STREAM),侦听主机地址的端口。
这是一个必须等待的协程。
该函数返回一个代表正在运行的服务器的 asyncio.Server 对象。
例如:
...
# 启动一个tcp服务器
server = await asyncio.start_server(...)
三个必需参数是回调函数、主机和端口。
回调函数是一个由名称指定的自定义函数,每次客户端连接到服务器时都会调用该函数。
每当建立新的客户端连接时,都会调用
client_connected_cb
回调。 它接收一个(读取器(reader),写入器(writer))对作为两个参数,即 StreamReader 和 StreamWriter 类的实例。
主机是客户端指定连接的域名或IP地址。 port 是接收连接的套接字端口号,例如 FTP 为 21
,HTTP 为 80
。
例如:
# 处理连接
async def handler(reader, writer):
# ...
...
# 启动一个服务器来接收http连接
server = await asyncio.start_server(handler, '127.0.0.1', 80)
An asyncio TCP server socket can be opened using the asyncio.start_server() function.
Create a TCP server (socket type SOCK_STREAM) listening on port of the host address.
This is a coroutine that must be awaited.
The function returns an asyncio.Server object that represents the running server.
For example:
...
# start a tcp server
server = await asyncio.start_server(...)
The three required arguments are the callback function, the host, and the port.
The callback function is a custom function specified by name that will be called each time a client connects to the server.
The client_connected_cb callback is called whenever a new client connection is established. It receives a (reader, writer) pair as two arguments, instances of the StreamReader and StreamWriter classes.
— ASYNCIO STREAMS
The host is the domain name or IP address that clients will specify to connect. The port is the socket port number on which to receive connections, such as 21 for FTP or 80 for HTTP.
For example:
# handle connections
async def handler(reader, writer):
# ...
...
# start a server to receive http connections
server = await asyncio.start_server(handler, '127.0.0.1', 80)
21.4 如何使用 StreamWriter 写入数据¶
21.4 How to Write Data with the StreamWriter
我们可以使用 asyncio.StreamWriter 将数据写入套接字。
表示一个 writer 对象,它提供 API 将数据写入 IO 流。
数据以字节形式写入。
可以使用 write() 方法将字节数据写入套接字。
该方法尝试立即将数据写入底层套接字。 如果失败,数据将在内部写入缓冲区中排队,直到可以发送。
例如:
...
# 写入字节数据
writer.write(byte_data)
或者,可以使用 writelines() 写入组织成列表或可迭代的多“行”字节数据。 writelines) 方法。
例如:
...
# 写入字节数据行
writer.writelines(byte_lines)
这两种方法都不会写入数据块或挂起调用协程。
写入字节数据后,最好通过 drain() 方法排空套接字。
等到合适的时候再继续写入流。
这是一个协程,将挂起调用者,直到字节已传输且套接字准备就绪。
例如:
...
# 写入字节数据
writer.write(byte_data)
# 等待数据传输
await writer.drain()
We can write data to the socket using an asyncio.StreamWriter.
Represents a writer object that provides APIs to write data to the IO stream.
Data is written as bytes.
Byte data can be written to the socket using the write() method.
The method attempts to write the data to the underlying socket immediately. If that fails, the data is queued in an internal write buffer until it can be sent.
For example:
...
# write byte data
writer.write(byte_data)
Alternatively, multiple “lines” of byte data organized into a list or iterable can be written using the writelines() method.
For example:
...
# write lines of byte data
writer.writelines(byte_lines)
Neither method for writing data blocks or suspends the calling coroutine.
After writing byte data it is a good idea to drain the socket via the drain() method.
Wait until it is appropriate to resume writing to the stream.
This is a coroutine and will suspend the caller until the bytes have been transmitted and the socket is ready.
For example:
...
# write byte data
writer.write(byte_data)
# wait for data to be transmitted
await writer.drain()
21.5 如何使用 StreamReader 读取数据¶
21.5 How to Read Data with the StreamReader
我们可以使用 asyncio.StreamReader 从套接字读取数据。
表示一个读取器对象,它提供 API 以从 IO 流读取数据。
数据以字节格式读取,因此字符串在使用之前可能需要进行编码。
所有读取方法都是必须等待的协程。
可以通过 read() 方法读取任意数量的字节,该方法将读取到文件末尾 (EOF)。
...
# 读取字节数据
byte_data = await reader.read()
此外,可以通过“n”参数指定要读取的字节数。
最多读取 n 个字节。 如果未提供 n 或设置为 -1,则读取直到 EOF 并返回所有读取的字节。
如果您知道下一个响应的预期字节数,这可能会有所帮助。
例如:
...
# 读取字节数据
byte_data = await reader.read(n=100)
可以使用 readline() 方法读取单行数据。
这将返回字节,直到遇到新行字符“\n”或 EOF。
读取一行,其中“line”是以\n结尾的字节序列。 如果收到 EOF 但未找到 \n,则该方法返回部分读取的数据。 如果收到 EOF 并且内部缓冲区为空,则返回一个空字节对象。
这在阅读使用文本行操作的标准协议时很有帮助。
...
# 读取一行数据
byte_line = await reader.readline()
此外,还有一个 readexactly() 方法用于读取确切的字节数,否则会引发异常,还有一个 readuntil() 方法将读取字节,直到读取字节形式的指定字符。
We can read data from the socket using an asyncio.StreamReader.
Represents a reader object that provides APIs to read data from the IO stream.
Data is read in byte format, therefore strings may need to be encoded before being used.
All read methods are coroutines that must be awaited.
An arbitrary number of bytes can be read via the read() method, which will read until the end of file (EOF).
...
# read byte data
byte_data = await reader.read()
Additionally, the number of bytes to read can be specified via the “n” argument.
Read up to n bytes. If n is not provided, or set to -1, read until EOF and return all read bytes.
This may be helpful if you know the number of bytes expected from the next response.
For example:
...
# read byte data
byte_data = await reader.read(n=100)
A single line of data can be read using the readline() method.
This will return bytes until a new line character ‘\n’ is encountered, or EOF.
Read one line, where “line” is a sequence of bytes ending with \n. If EOF is received and \n was not found, the method returns partially read data. If EOF is received and the internal buffer is empty, return an empty bytes object.
This is helpful when reading standard protocols that operate with lines of text.
...
# read a line data
byte_line = await reader.readline()
Additionally, there is a readexactly() method to read an exact number of bytes otherwise raise an exception, and a readuntil() that will read bytes until a specified character in byte form is read.
21.6 如何关闭连接¶
21.6 How to Close Connection
可以通过 asyncio.StreamWriter 关闭套接字。
可以调用 close() 方法来关闭套接字。
该方法关闭流和底层套接字。
该方法不会阻塞。
例如:
...
# 关闭套接字
writer.close()
虽然 close() 方法不会阻塞,但我们可以等待套接字完全关闭后再继续。
这可以通过 wait_close() 方法来实现。
等待流关闭。 应在 close() 之后调用以等待底层连接关闭。
这是一个可以等待的协程。
例如:
...
# 关闭套接字
writer.close()
# 等待套接字关闭
await writer.wait_closed()
我们可以通过 is_close() 方法检查套接字是否已关闭或正在关闭过程中。
例如:
...
# 检查套接字是否已关闭或正在关闭
if writer.is_closing():
# ...
The socket can be closed via the asyncio.StreamWriter.
The close() method can be called which will close the socket.
The method closes the stream and the underlying socket.
This method does not block.
For example:
...
# close the socket
writer.close()
Although the close() method does not block, we can wait for the socket to close completely before continuing on.
This can be achieved via the wait_closed() method.
Wait until the stream is closed. Should be called after close() to wait until the underlying connection is closed.
This is a coroutine that can be awaited.
For example:
...
# close the socket
writer.close()
# wait for the socket to close
await writer.wait_closed()
We can check if the socket has been closed or is in the process of being closed via the is_closing() method.
For example:
...
# check if the socket is closed or closing
if writer.is_closing():
# ...
创建日期: 2024年9月4日