Skip to content

WebSocket

Bases: Protocol

WebSocket.

Websocket 实例可以使用异步迭代器接收消息,直到连接关闭:

async for message in websocket:
    ...

或者通过直接调用 recv()

message = await websocket.recv()

发送消息使用 send()

await websocket.send(message)
Source code in ypy_websocket/websocket.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class Websocket(Protocol):
    """WebSocket.

    Websocket 实例可以使用异步迭代器接收消息,直到连接关闭:

    ```py
    async for message in websocket:
        ...
    ```

    或者通过直接调用 `recv()`:

    ```py
    message = await websocket.recv()
    ```

    发送消息使用 `send()`:

    ```py
    await websocket.send(message)
    ```
    """

    @property
    def path(self) -> str:
        """WebSocket 路径"""
        ...

    def __aiter__(self):
        return self

    async def __anext__(self) -> bytes:
        try:
            message = await self.recv()
        except Exception:
            raise StopAsyncIteration()

        return message

    async def send(self, message: bytes) -> None:
        """发送消息。

        Arguments:
            message: 要发送的消息。
        """
        ...

    async def recv(self) -> bytes:
        """收到一条消息。

        Returns:
            收到的消息。
        """
        ...

path: str property

WebSocket 路径

recv() async

收到一条消息。

Returns:

Type Description
bytes

收到的消息。

Source code in ypy_websocket/websocket.py
56
57
58
59
60
61
62
async def recv(self) -> bytes:
    """收到一条消息。

    Returns:
        收到的消息。
    """
    ...

send(message) async

发送消息。

Parameters:

Name Type Description Default
message bytes

要发送的消息。

required
Source code in ypy_websocket/websocket.py
48
49
50
51
52
53
54
async def send(self, message: bytes) -> None:
    """发送消息。

    Arguments:
        message: 要发送的消息。
    """
    ...