Skip to content

WebSocket provider

WebSocket provider.

Source code in ypy_websocket/websocket_provider.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
class WebsocketProvider:
    """WebSocket provider."""

    _ydoc: Y.YDoc
    _update_send_stream: MemoryObjectSendStream
    _update_receive_stream: MemoryObjectReceiveStream
    _started: Event | None
    _starting: bool
    _task_group: TaskGroup | None

    def __init__(
        self, ydoc: Y.YDoc, websocket: Websocket, log: Logger | None = None
    ) -> None:
        """初始化对象

        WebsocketProvider 实例最好作为异步上下文管理器使用:

        ```py
        async with websocket_provider:
            ...
        ```

        不过,也可以使用更低级的 API:

        ```py
        task = asyncio.create_task(websocket_provider.start())
        await websocket_provider.started.wait()
        ...
        websocket_provider.stop()
        ```

        Arguments:
            ydoc: 通过 WebSocket 连接的 YDoc。
            websocket: 通过该 WebSocket 连接 YDoc。
            log: 可选的日志记录器。
        """
        self._ydoc = ydoc
        self._websocket = websocket
        self.log = log or getLogger(__name__)
        self._update_send_stream, self._update_receive_stream = (
            create_memory_object_stream(max_buffer_size=65536)
        )
        self._started = None
        self._starting = False
        self._task_group = None
        ydoc.observe_after_transaction(partial(put_updates, self._update_send_stream))

    @property
    def started(self) -> Event:
        """WebSocket 提供程序启动时设置的异步事件。"""
        if self._started is None:
            self._started = Event()
        return self._started

    async def __aenter__(self) -> WebsocketProvider:
        if self._task_group is not None:
            raise RuntimeError("WebsocketProvider already running")

        async with AsyncExitStack() as exit_stack:
            tg = create_task_group()
            self._task_group = await exit_stack.enter_async_context(tg)
            self._exit_stack = exit_stack.pop_all()
            tg.start_soon(self._run)
            self.started.set()

        return self

    async def __aexit__(self, exc_type, exc_value, exc_tb):
        if self._task_group is None:
            raise RuntimeError("WebsocketProvider not running")

        self._task_group.cancel_scope.cancel()
        self._task_group = None
        return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb)

    async def _run(self):
        await sync(self._ydoc, self._websocket, self.log)
        self._task_group.start_soon(self._send)
        async for message in self._websocket:
            if message[0] == YMessageType.SYNC:
                await process_sync_message(
                    message[1:], self._ydoc, self._websocket, self.log
                )

    async def _send(self):
        async with self._update_receive_stream:
            async for update in self._update_receive_stream:
                message = create_update_message(update)
                try:
                    await self._websocket.send(message)
                except Exception:
                    pass

    async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
        """启动 WebSocket 提供程序。

        Arguments:
            task_status: 任务开始时设置的状态。
        """
        if self._starting:
            return
        else:
            self._starting = True

        if self._task_group is not None:
            raise RuntimeError("WebsocketProvider already running")

        async with create_task_group() as self._task_group:
            self._task_group.start_soon(self._run)
            self.started.set()
            self._starting = False
            task_status.started()

    def stop(self):
        """停止 WebSocket 提供程序。"""
        if self._task_group is None:
            raise RuntimeError("WebsocketProvider not running")

        self._task_group.cancel_scope.cancel()
        self._task_group = None

started: Event property

WebSocket 提供程序启动时设置的异步事件。

__init__(ydoc, websocket, log=None)

初始化对象

WebsocketProvider 实例最好作为异步上下文管理器使用:

async with websocket_provider:
    ...

不过,也可以使用更低级的 API:

task = asyncio.create_task(websocket_provider.start())
await websocket_provider.started.wait()
...
websocket_provider.stop()

Parameters:

Name Type Description Default
ydoc Y.YDoc

通过 WebSocket 连接的 YDoc。

required
websocket Websocket

通过该 WebSocket 连接 YDoc。

required
log Logger | None

可选的日志记录器。

None
Source code in ypy_websocket/websocket_provider.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def __init__(
    self, ydoc: Y.YDoc, websocket: Websocket, log: Logger | None = None
) -> None:
    """初始化对象

    WebsocketProvider 实例最好作为异步上下文管理器使用:

    ```py
    async with websocket_provider:
        ...
    ```

    不过,也可以使用更低级的 API:

    ```py
    task = asyncio.create_task(websocket_provider.start())
    await websocket_provider.started.wait()
    ...
    websocket_provider.stop()
    ```

    Arguments:
        ydoc: 通过 WebSocket 连接的 YDoc。
        websocket: 通过该 WebSocket 连接 YDoc。
        log: 可选的日志记录器。
    """
    self._ydoc = ydoc
    self._websocket = websocket
    self.log = log or getLogger(__name__)
    self._update_send_stream, self._update_receive_stream = (
        create_memory_object_stream(max_buffer_size=65536)
    )
    self._started = None
    self._starting = False
    self._task_group = None
    ydoc.observe_after_transaction(partial(put_updates, self._update_send_stream))

start(*, task_status=TASK_STATUS_IGNORED) async

启动 WebSocket 提供程序。

Parameters:

Name Type Description Default
task_status TaskStatus[None]

任务开始时设置的状态。

TASK_STATUS_IGNORED
Source code in ypy_websocket/websocket_provider.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
    """启动 WebSocket 提供程序。

    Arguments:
        task_status: 任务开始时设置的状态。
    """
    if self._starting:
        return
    else:
        self._starting = True

    if self._task_group is not None:
        raise RuntimeError("WebsocketProvider already running")

    async with create_task_group() as self._task_group:
        self._task_group.start_soon(self._run)
        self.started.set()
        self._starting = False
        task_status.started()

stop()

停止 WebSocket 提供程序。

Source code in ypy_websocket/websocket_provider.py
140
141
142
143
144
145
146
def stop(self):
    """停止 WebSocket 提供程序。"""
    if self._task_group is None:
        raise RuntimeError("WebsocketProvider not running")

    self._task_group.cancel_scope.cancel()
    self._task_group = None