Skip to content

WebSocket server

WebSocket server.

Source code in ypy_websocket/websocket_server.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
class WebsocketServer:
    """WebSocket server."""

    auto_clean_rooms: bool
    rooms: dict[str, YRoom]
    _started: Event | None
    _starting: bool
    _task_group: TaskGroup | None

    def __init__(
        self,
        rooms_ready: bool = True,
        auto_clean_rooms: bool = True,
        log: Logger | None = None,
    ) -> None:
        """初始化对象。

        WebsocketServer 实例最好作为异步上下文管理器使用:

        ```py
        async with websocket_server:
            ...
        ```

        不过,也可以使用更低级的 API:

        ```py
        task = asyncio.create_task(websocket_server.start())
        await websocket_server.started.wait()
        ...
        websocket_server.stop()
        ```

        Arguments:
            rooms_ready: 打开时房间是否准备好进行同步。
            auto_clean_rooms: 当没有客户端时,房间是否应该被删除。
            log: 可选的日志记录器。
        """
        self.rooms_ready = rooms_ready
        self.auto_clean_rooms = auto_clean_rooms
        self.log = log or getLogger(__name__)
        self.rooms = {}
        self._started = None
        self._starting = False
        self._task_group = None

    @property
    def started(self) -> Event:
        """WebSocket 服务器启动时设置的异步事件。"""
        if self._started is None:
            self._started = Event()
        return self._started

    async def get_room(self, name: str) -> YRoom:
        """获取或创建一个具有给定名称的房间,并启动它。

        Arguments:
            name: 房间名称

        Returns:
            具有给定名称的房间,如果未找到具有该名称的房间,则为新房间。
        """
        if name not in self.rooms.keys():
            self.rooms[name] = YRoom(ready=self.rooms_ready, log=self.log)
        room = self.rooms[name]
        await self.start_room(room)
        return room

    async def start_room(self, room: YRoom) -> None:
        """如果尚未启动,启动一个房间。

        Arguments:
            room: 要启动的房间
        """
        if self._task_group is None:
            raise RuntimeError(
                "The WebsocketServer is not running: use `async with websocket_server:` or `await websocket_server.start()`"
            )

        if not room.started.is_set():
            await self._task_group.start(room.start)

    def get_room_name(self, room: YRoom) -> str:
        """获取房间的名称。

        Arguments:
            room: 获取名字的房间。

        Returns:
            房间名称
        """
        return list(self.rooms.keys())[list(self.rooms.values()).index(room)]

    def rename_room(
        self,
        to_name: str,
        *,
        from_name: str | None = None,
        from_room: YRoom | None = None,
    ) -> None:
        """重命名房间

        Arguments:
            to_name: 房间的新名称
            from_name: 房间的上一个名称 (如果 `from_room` 没有传入).
            from_room: 要重命名的房间 (如果 `from_name` 没有传入).
        """
        if from_name is not None and from_room is not None:
            raise RuntimeError("Cannot pass from_name and from_room")
        if from_name is None:
            assert from_room is not None
            from_name = self.get_room_name(from_room)
        self.rooms[to_name] = self.rooms.pop(from_name)

    def delete_room(
        self, *, name: str | None = None, room: YRoom | None = None
    ) -> None:
        """删除一个房间

        Arguments:
            name: 要删除房间的名称 (如果 `room` 没有传入).
            room: 要删除的房间 ( 如果 `name` 没有传入).
        """
        if name is not None and room is not None:
            raise RuntimeError("Cannot pass name and room")
        if name is None:
            assert room is not None
            name = self.get_room_name(room)
        room = self.rooms.pop(name)
        room.stop()

    async def serve(self, websocket: Websocket) -> None:
        """通过 WebSocket 为客户端提供服务。

        Arguments:
            websocket: 用于为客户端提供服务的 WebSocket。
        """
        if self._task_group is None:
            raise RuntimeError(
                "The WebsocketServer is not running: use `async with websocket_server:` or `await websocket_server.start()`"
            )

        async with create_task_group() as tg:
            tg.start_soon(self._serve, websocket, tg)

    async def _serve(self, websocket: Websocket, tg: TaskGroup):
        room = await self.get_room(websocket.path)
        await self.start_room(room)
        await room.serve(websocket)

        if self.auto_clean_rooms and not room.clients:
            self.delete_room(room=room)
        tg.cancel_scope.cancel()

    async def __aenter__(self) -> WebsocketServer:
        if self._task_group is not None:
            raise RuntimeError("WebsocketServer already running")

        async with AsyncExitStack() as exit_stack:
            tg = create_task_group()
            self._task_group = await exit_stack.enter_async_context(tg)
            self._exit_stack = exit_stack.pop_all()
            self.started.set()

        return self

    async def __aexit__(self, exc_type, exc_value, exc_tb):
        if self._task_group is None:
            raise RuntimeError("WebsocketServer not running")

        self._task_group.cancel_scope.cancel()
        self._task_group = None
        return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb)

    async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
        """启动 WebSocket 服务。

        Arguments:
            task_status: 任务开始时设置的状态。
        """
        if self._starting:
            return
        else:
            self._starting = True

        if self._task_group is not None:
            raise RuntimeError("WebsocketServer already running")

        # 创建任务组并等待
        async with create_task_group() as self._task_group:
            self._task_group.start_soon(Event().wait)
            self.started.set()
            self._starting = False
            task_status.started()

    def stop(self) -> None:
        """停止 WebSocket 服务."""
        if self._task_group is None:
            raise RuntimeError("WebsocketServer not running")

        self._task_group.cancel_scope.cancel()
        self._task_group = None

started: Event property

WebSocket 服务器启动时设置的异步事件。

__init__(rooms_ready=True, auto_clean_rooms=True, log=None)

初始化对象。

WebsocketServer 实例最好作为异步上下文管理器使用:

async with websocket_server:
    ...

不过,也可以使用更低级的 API:

task = asyncio.create_task(websocket_server.start())
await websocket_server.started.wait()
...
websocket_server.stop()

Parameters:

Name Type Description Default
rooms_ready bool

打开时房间是否准备好进行同步。

True
auto_clean_rooms bool

当没有客户端时,房间是否应该被删除。

True
log Logger | None

可选的日志记录器。

None
Source code in ypy_websocket/websocket_server.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def __init__(
    self,
    rooms_ready: bool = True,
    auto_clean_rooms: bool = True,
    log: Logger | None = None,
) -> None:
    """初始化对象。

    WebsocketServer 实例最好作为异步上下文管理器使用:

    ```py
    async with websocket_server:
        ...
    ```

    不过,也可以使用更低级的 API:

    ```py
    task = asyncio.create_task(websocket_server.start())
    await websocket_server.started.wait()
    ...
    websocket_server.stop()
    ```

    Arguments:
        rooms_ready: 打开时房间是否准备好进行同步。
        auto_clean_rooms: 当没有客户端时,房间是否应该被删除。
        log: 可选的日志记录器。
    """
    self.rooms_ready = rooms_ready
    self.auto_clean_rooms = auto_clean_rooms
    self.log = log or getLogger(__name__)
    self.rooms = {}
    self._started = None
    self._starting = False
    self._task_group = None

delete_room(*, name=None, room=None)

删除一个房间

Parameters:

Name Type Description Default
name str | None

要删除房间的名称 (如果 room 没有传入).

None
room YRoom | None

要删除的房间 ( 如果 name 没有传入).

None
Source code in ypy_websocket/websocket_server.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def delete_room(
    self, *, name: str | None = None, room: YRoom | None = None
) -> None:
    """删除一个房间

    Arguments:
        name: 要删除房间的名称 (如果 `room` 没有传入).
        room: 要删除的房间 ( 如果 `name` 没有传入).
    """
    if name is not None and room is not None:
        raise RuntimeError("Cannot pass name and room")
    if name is None:
        assert room is not None
        name = self.get_room_name(room)
    room = self.rooms.pop(name)
    room.stop()

get_room(name) async

获取或创建一个具有给定名称的房间,并启动它。

Parameters:

Name Type Description Default
name str

房间名称

required

Returns:

Type Description
YRoom

具有给定名称的房间,如果未找到具有该名称的房间,则为新房间。

Source code in ypy_websocket/websocket_server.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
async def get_room(self, name: str) -> YRoom:
    """获取或创建一个具有给定名称的房间,并启动它。

    Arguments:
        name: 房间名称

    Returns:
        具有给定名称的房间,如果未找到具有该名称的房间,则为新房间。
    """
    if name not in self.rooms.keys():
        self.rooms[name] = YRoom(ready=self.rooms_ready, log=self.log)
    room = self.rooms[name]
    await self.start_room(room)
    return room

get_room_name(room)

获取房间的名称。

Parameters:

Name Type Description Default
room YRoom

获取名字的房间。

required

Returns:

Type Description
str

房间名称

Source code in ypy_websocket/websocket_server.py
 95
 96
 97
 98
 99
100
101
102
103
104
def get_room_name(self, room: YRoom) -> str:
    """获取房间的名称。

    Arguments:
        room: 获取名字的房间。

    Returns:
        房间名称
    """
    return list(self.rooms.keys())[list(self.rooms.values()).index(room)]

rename_room(to_name, *, from_name=None, from_room=None)

重命名房间

Parameters:

Name Type Description Default
to_name str

房间的新名称

required
from_name str | None

房间的上一个名称 (如果 from_room 没有传入).

None
from_room YRoom | None

要重命名的房间 (如果 from_name 没有传入).

None
Source code in ypy_websocket/websocket_server.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def rename_room(
    self,
    to_name: str,
    *,
    from_name: str | None = None,
    from_room: YRoom | None = None,
) -> None:
    """重命名房间

    Arguments:
        to_name: 房间的新名称
        from_name: 房间的上一个名称 (如果 `from_room` 没有传入).
        from_room: 要重命名的房间 (如果 `from_name` 没有传入).
    """
    if from_name is not None and from_room is not None:
        raise RuntimeError("Cannot pass from_name and from_room")
    if from_name is None:
        assert from_room is not None
        from_name = self.get_room_name(from_room)
    self.rooms[to_name] = self.rooms.pop(from_name)

serve(websocket) async

通过 WebSocket 为客户端提供服务。

Parameters:

Name Type Description Default
websocket Websocket

用于为客户端提供服务的 WebSocket。

required
Source code in ypy_websocket/websocket_server.py
144
145
146
147
148
149
150
151
152
153
154
155
156
async def serve(self, websocket: Websocket) -> None:
    """通过 WebSocket 为客户端提供服务。

    Arguments:
        websocket: 用于为客户端提供服务的 WebSocket。
    """
    if self._task_group is None:
        raise RuntimeError(
            "The WebsocketServer is not running: use `async with websocket_server:` or `await websocket_server.start()`"
        )

    async with create_task_group() as tg:
        tg.start_soon(self._serve, websocket, tg)

start(*, task_status=TASK_STATUS_IGNORED) async

启动 WebSocket 服务。

Parameters:

Name Type Description Default
task_status TaskStatus[None]

任务开始时设置的状态。

TASK_STATUS_IGNORED
Source code in ypy_websocket/websocket_server.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
    """启动 WebSocket 服务。

    Arguments:
        task_status: 任务开始时设置的状态。
    """
    if self._starting:
        return
    else:
        self._starting = True

    if self._task_group is not None:
        raise RuntimeError("WebsocketServer already running")

    # 创建任务组并等待
    async with create_task_group() as self._task_group:
        self._task_group.start_soon(Event().wait)
        self.started.set()
        self._starting = False
        task_status.started()

start_room(room) async

如果尚未启动,启动一个房间。

Parameters:

Name Type Description Default
room YRoom

要启动的房间

required
Source code in ypy_websocket/websocket_server.py
81
82
83
84
85
86
87
88
89
90
91
92
93
async def start_room(self, room: YRoom) -> None:
    """如果尚未启动,启动一个房间。

    Arguments:
        room: 要启动的房间
    """
    if self._task_group is None:
        raise RuntimeError(
            "The WebsocketServer is not running: use `async with websocket_server:` or `await websocket_server.start()`"
        )

    if not room.started.is_set():
        await self._task_group.start(room.start)

stop()

停止 WebSocket 服务.

Source code in ypy_websocket/websocket_server.py
208
209
210
211
212
213
214
def stop(self) -> None:
    """停止 WebSocket 服务."""
    if self._task_group is None:
        raise RuntimeError("WebsocketServer not running")

    self._task_group.cancel_scope.cancel()
    self._task_group = None