Skip to content

Room

Source code in ypy_websocket/yroom.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
class YRoom:
    clients: list
    ydoc: Y.YDoc
    ystore: BaseYStore | None
    _on_message: Callable[[bytes], Awaitable[bool] | bool] | None
    _update_send_stream: MemoryObjectSendStream
    _update_receive_stream: MemoryObjectReceiveStream
    _ready: bool
    _task_group: TaskGroup | None
    _started: Event | None
    _starting: bool

    def __init__(
        self,
        ready: bool = True,
        ystore: BaseYStore | None = None,
        log: Logger | None = None,
    ):
        """初始化对象。

        YRoom 实例应优先作为异步上下文管理器使用:

        ```py
        async with room:
            ...
        ```

        但是,也可以使用更低级的 API:

        ```py
        task = asyncio.create_task(room.start())
        await room.started.wait()
        ...
        room.stop()
        ```

        Arguments:
            ready: 内部 YDoc 是否准备好立即进行同步。
            ystore: 可选的存储,用于持久化文档更新。
            log: 可选的日志记录器。
        """
        self.ydoc = Y.YDoc()
        self.awareness = Awareness(self.ydoc)
        self._update_send_stream, self._update_receive_stream = (
            create_memory_object_stream(max_buffer_size=65536)
        )
        self._ready = False
        self.ready = ready
        self.ystore = ystore
        self.log = log or getLogger(__name__)
        self.clients = []
        self._on_message = None
        self._started = None
        self._starting = False
        self._task_group = None

    @property
    def started(self):
        """YRoom 提供程序启动时设置的异步事件。"""
        if self._started is None:
            self._started = Event()
        return self._started

    @property
    def ready(self) -> bool:
        """
        Returns:
            True 表示内部 YDoc 已准备好同步。
        """
        return self._ready

    @ready.setter
    def ready(self, value: bool) -> None:
        """
        Arguments:
            value: 如果内部 YDoc 已准备好同步,则为 True,否则为 False。"""
        self._ready = value
        if value:
            self.ydoc.observe_after_transaction(
                partial(put_updates, self._update_send_stream)
            )

    @property
    def on_message(self) -> Callable[[bytes], Awaitable[bool] | bool] | None:
        """
        Returns:
            收到消息时调用的可选回调。
        """
        return self._on_message

    @on_message.setter
    def on_message(self, value: Callable[[bytes], Awaitable[bool] | bool] | None):
        """
        Arguments:
            value: 收到消息时调用的可选回调。如果回调返回 True,则跳过该消息。
        """
        self._on_message = value

    async def _broadcast_updates(self):
        if self.ystore is not None and not self.ystore.started.is_set():
            self._task_group.start_soon(self.ystore.start)

        async with self._update_receive_stream:
            async for update in self._update_receive_stream:
                if self._task_group.cancel_scope.cancel_called:
                    return
                # 将内部 ydoc 的更新广播到所有客户端,包括来自客户端的更改和来自后端的更改(带外更改)
                for client in self.clients:
                    self.log.debug(
                        "Sending Y update to client with endpoint: %s", client.path
                    )
                    message = create_update_message(update)
                    self._task_group.start_soon(client.send, message)
                if self.ystore:
                    self.log.debug("Writing Y update to YStore")
                    self._task_group.start_soon(self.ystore.write, update)

    async def __aenter__(self) -> YRoom:
        if self._task_group is not None:
            raise RuntimeError("YRoom already running")

        async with AsyncExitStack() as exit_stack:
            tg = create_task_group()
            self._task_group = await exit_stack.enter_async_context(tg)
            self._exit_stack = exit_stack.pop_all()
            tg.start_soon(self._broadcast_updates)
            self.started.set()

        return self

    async def __aexit__(self, exc_type, exc_value, exc_tb):
        if self._task_group is None:
            raise RuntimeError("YRoom not running")

        self._task_group.cancel_scope.cancel()
        self._task_group = None
        return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb)

    async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
        """启动room

        Arguments:
            task_status: 任务开始时设置的状态。
        """
        if self._starting:
            return
        else:
            self._starting = True

        if self._task_group is not None:
            raise RuntimeError("YRoom already running")

        async with create_task_group() as self._task_group:
            self._task_group.start_soon(self._broadcast_updates)
            self.started.set()
            self._starting = False
            task_status.started()

    def stop(self):
        """停止room."""
        if self._task_group is None:
            raise RuntimeError("YRoom not running")

        self._task_group.cancel_scope.cancel()
        self._task_group = None

    async def serve(self, websocket: Websocket):
        """服务一个客户端

        Arguments:
            websocket: 用于为客户端提供服务的 WebSocket。
        """
        async with create_task_group() as tg:
            self.clients.append(websocket)
            await sync(self.ydoc, websocket, self.log)
            try:
                async for message in websocket:
                    # 过滤消息 (e.g. awareness)
                    skip = False
                    if self.on_message:
                        _skip = self.on_message(message)
                        skip = await _skip if isawaitable(_skip) else _skip
                    if skip:
                        continue
                    message_type = message[0]
                    if message_type == YMessageType.SYNC:
                        # 在后台更新我们的内部状态,对内部状态的更改随后将转发给所有客户端并存储在 YStore 中(如果有)
                        tg.start_soon(
                            process_sync_message,
                            message[1:],
                            self.ydoc,
                            websocket,
                            self.log,
                        )
                    elif message_type == YMessageType.AWARENESS:
                        # 将感知消息从此客户端转发给所有客户端,包括它自己,因为它用于保持连接处于活动状态
                        self.log.debug(
                            "Received %s message from endpoint: %s",
                            YMessageType.AWARENESS.name,
                            websocket.path,
                        )
                        for client in self.clients:
                            self.log.debug(
                                "Sending Y awareness from client with endpoint %s to client with endpoint: %s",
                                websocket.path,
                                client.path,
                            )
                            tg.start_soon(client.send, message)
            except Exception as e:
                self.log.debug("Error serving endpoint: %s", websocket.path, exc_info=e)

            # remove this client
            # 移除这个客户端
            self.clients = [c for c in self.clients if c != websocket]

on_message: Callable[[bytes], Awaitable[bool] | bool] | None property writable

Returns:

Type Description
Callable[[bytes], Awaitable[bool] | bool] | None

收到消息时调用的可选回调。

ready: bool property writable

Returns:

Type Description
bool

True 表示内部 YDoc 已准备好同步。

started property

YRoom 提供程序启动时设置的异步事件。

__init__(ready=True, ystore=None, log=None)

初始化对象。

YRoom 实例应优先作为异步上下文管理器使用:

async with room:
    ...

但是,也可以使用更低级的 API:

task = asyncio.create_task(room.start())
await room.started.wait()
...
room.stop()

Parameters:

Name Type Description Default
ready bool

内部 YDoc 是否准备好立即进行同步。

True
ystore BaseYStore | None

可选的存储,用于持久化文档更新。

None
log Logger | None

可选的日志记录器。

None
Source code in ypy_websocket/yroom.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def __init__(
    self,
    ready: bool = True,
    ystore: BaseYStore | None = None,
    log: Logger | None = None,
):
    """初始化对象。

    YRoom 实例应优先作为异步上下文管理器使用:

    ```py
    async with room:
        ...
    ```

    但是,也可以使用更低级的 API:

    ```py
    task = asyncio.create_task(room.start())
    await room.started.wait()
    ...
    room.stop()
    ```

    Arguments:
        ready: 内部 YDoc 是否准备好立即进行同步。
        ystore: 可选的存储,用于持久化文档更新。
        log: 可选的日志记录器。
    """
    self.ydoc = Y.YDoc()
    self.awareness = Awareness(self.ydoc)
    self._update_send_stream, self._update_receive_stream = (
        create_memory_object_stream(max_buffer_size=65536)
    )
    self._ready = False
    self.ready = ready
    self.ystore = ystore
    self.log = log or getLogger(__name__)
    self.clients = []
    self._on_message = None
    self._started = None
    self._starting = False
    self._task_group = None

serve(websocket) async

服务一个客户端

Parameters:

Name Type Description Default
websocket Websocket

用于为客户端提供服务的 WebSocket。

required
Source code in ypy_websocket/yroom.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
async def serve(self, websocket: Websocket):
    """服务一个客户端

    Arguments:
        websocket: 用于为客户端提供服务的 WebSocket。
    """
    async with create_task_group() as tg:
        self.clients.append(websocket)
        await sync(self.ydoc, websocket, self.log)
        try:
            async for message in websocket:
                # 过滤消息 (e.g. awareness)
                skip = False
                if self.on_message:
                    _skip = self.on_message(message)
                    skip = await _skip if isawaitable(_skip) else _skip
                if skip:
                    continue
                message_type = message[0]
                if message_type == YMessageType.SYNC:
                    # 在后台更新我们的内部状态,对内部状态的更改随后将转发给所有客户端并存储在 YStore 中(如果有)
                    tg.start_soon(
                        process_sync_message,
                        message[1:],
                        self.ydoc,
                        websocket,
                        self.log,
                    )
                elif message_type == YMessageType.AWARENESS:
                    # 将感知消息从此客户端转发给所有客户端,包括它自己,因为它用于保持连接处于活动状态
                    self.log.debug(
                        "Received %s message from endpoint: %s",
                        YMessageType.AWARENESS.name,
                        websocket.path,
                    )
                    for client in self.clients:
                        self.log.debug(
                            "Sending Y awareness from client with endpoint %s to client with endpoint: %s",
                            websocket.path,
                            client.path,
                        )
                        tg.start_soon(client.send, message)
        except Exception as e:
            self.log.debug("Error serving endpoint: %s", websocket.path, exc_info=e)

        # remove this client
        # 移除这个客户端
        self.clients = [c for c in self.clients if c != websocket]

start(*, task_status=TASK_STATUS_IGNORED) async

启动room

Parameters:

Name Type Description Default
task_status TaskStatus[None]

任务开始时设置的状态。

TASK_STATUS_IGNORED
Source code in ypy_websocket/yroom.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
    """启动room

    Arguments:
        task_status: 任务开始时设置的状态。
    """
    if self._starting:
        return
    else:
        self._starting = True

    if self._task_group is not None:
        raise RuntimeError("YRoom already running")

    async with create_task_group() as self._task_group:
        self._task_group.start_soon(self._broadcast_updates)
        self.started.set()
        self._starting = False
        task_status.started()

stop()

停止room.

Source code in ypy_websocket/yroom.py
189
190
191
192
193
194
195
def stop(self):
    """停止room."""
    if self._task_group is None:
        raise RuntimeError("YRoom not running")

    self._task_group.cancel_scope.cancel()
    self._task_group = None