Skip to content

ASGI server

ASGI server.

Source code in ypy_websocket/asgi_server.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class ASGIServer:
    """ASGI server."""

    def __init__(
        self,
        websocket_server: WebsocketServer,
        on_connect: Callable[[dict[str, Any], dict[str, Any]], Awaitable[bool] | bool]
        | None = None,
        on_disconnect: Callable[[dict[str, Any]], Awaitable[None] | None] | None = None,
    ):
        """初始化对象.

        Arguments:
            websocket_server: WebsocketServer的一个实例.
            on_connect: 可选的回调函数,当连接 WebSocket 时调用。如果回调返回 True,则不接受该 WebSocket。
            on_disconnect: 可选的回调函数,在断开 WebSocket 连接时调用。
        """
        self._websocket_server = websocket_server
        self._on_connect = on_connect
        self._on_disconnect = on_disconnect

    async def __call__(
        self,
        scope: dict[str, Any],
        receive: Callable[[], Awaitable[dict[str, Any]]],
        send: Callable[[dict[str, Any]], Awaitable[None]],
    ):
        msg = await receive()
        if msg["type"] == "websocket.connect":
            if self._on_connect is not None:
                close = self._on_connect(msg, scope)
                if isawaitable(close):
                    close = await close
                if close:
                    return

            await send({"type": "websocket.accept"})
            websocket = ASGIWebsocket(receive, send, scope["path"], self._on_disconnect)
            await self._websocket_server.serve(websocket)

__init__(websocket_server, on_connect=None, on_disconnect=None)

初始化对象.

Parameters:

Name Type Description Default
websocket_server WebsocketServer

WebsocketServer的一个实例.

required
on_connect Callable[[dict[str, Any], dict[str, Any]], Awaitable[bool] | bool] | None

可选的回调函数,当连接 WebSocket 时调用。如果回调返回 True,则不接受该 WebSocket。

None
on_disconnect Callable[[dict[str, Any]], Awaitable[None] | None] | None

可选的回调函数,在断开 WebSocket 连接时调用。

None
Source code in ypy_websocket/asgi_server.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def __init__(
    self,
    websocket_server: WebsocketServer,
    on_connect: Callable[[dict[str, Any], dict[str, Any]], Awaitable[bool] | bool]
    | None = None,
    on_disconnect: Callable[[dict[str, Any]], Awaitable[None] | None] | None = None,
):
    """初始化对象.

    Arguments:
        websocket_server: WebsocketServer的一个实例.
        on_connect: 可选的回调函数,当连接 WebSocket 时调用。如果回调返回 True,则不接受该 WebSocket。
        on_disconnect: 可选的回调函数,在断开 WebSocket 连接时调用。
    """
    self._websocket_server = websocket_server
    self._on_connect = on_connect
    self._on_disconnect = on_disconnect