Skip to content

Store

BaseYStore

Bases: ABC

Source code in ypy_websocket/ystore.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class BaseYStore(ABC):
    metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None
    version = 2
    _started: Event | None = None
    _starting: bool = False
    _task_group: TaskGroup | None = None

    @abstractmethod
    def __init__(
        self,
        path: str,
        metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None,
    ): ...

    @abstractmethod
    async def write(self, data: bytes) -> None: ...

    @abstractmethod
    async def read(self) -> AsyncIterator[tuple[bytes, bytes]]: ...

    @property
    def started(self) -> Event:
        if self._started is None:
            self._started = Event()
        return self._started

    async def __aenter__(self) -> BaseYStore:
        if self._task_group is not None:
            raise RuntimeError("YStore already running")

        async with AsyncExitStack() as exit_stack:
            tg = create_task_group()
            self._task_group = await exit_stack.enter_async_context(tg)
            self._exit_stack = exit_stack.pop_all()
            tg.start_soon(self.start)

        return self

    async def __aexit__(self, exc_type, exc_value, exc_tb):
        if self._task_group is None:
            raise RuntimeError("YStore not running")

        self._task_group.cancel_scope.cancel()
        self._task_group = None
        return await self._exit_stack.__aexit__(exc_type, exc_value, exc_tb)

    async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
        """启动 store.

        Arguments:
            task_status: 任务开始时设置的状态。
        """
        if self._starting:
            return
        else:
            self._starting = True

        if self._task_group is not None:
            raise RuntimeError("YStore already running")

        self.started.set()
        self._starting = False
        task_status.started()

    def stop(self) -> None:
        """停止 store."""
        if self._task_group is None:
            raise RuntimeError("YStore not running")

        self._task_group.cancel_scope.cancel()
        self._task_group = None

    async def get_metadata(self) -> bytes:
        """
        Returns:
            元数据(metadata).
        """
        if self.metadata_callback is None:
            return b""

        metadata = self.metadata_callback()
        if isawaitable(metadata):
            metadata = await metadata
        metadata = cast(bytes, metadata)
        return metadata

    async def encode_state_as_update(self, ydoc: Y.YDoc) -> None:
        """存储1个 YDoc 状态.

        Arguments:
            ydoc: 用于存储状态的 YDoc。
        """
        update = Y.encode_state_as_update(ydoc)  # type: ignore
        await self.write(update)

    async def apply_updates(self, ydoc: Y.YDoc) -> None:
        """将所有存储的更新应用到 YDoc。

        Arguments:
            ydoc: 要应用更新的 YDoc。
        """
        async for update, *rest in self.read():  # type: ignore
            Y.apply_update(ydoc, update)  # type: ignore

apply_updates(ydoc) async

将所有存储的更新应用到 YDoc。

Parameters:

Name Type Description Default
ydoc Y.YDoc

要应用更新的 YDoc。

required
Source code in ypy_websocket/ystore.py
121
122
123
124
125
126
127
128
async def apply_updates(self, ydoc: Y.YDoc) -> None:
    """将所有存储的更新应用到 YDoc。

    Arguments:
        ydoc: 要应用更新的 YDoc。
    """
    async for update, *rest in self.read():  # type: ignore
        Y.apply_update(ydoc, update)  # type: ignore

encode_state_as_update(ydoc) async

存储1个 YDoc 状态.

Parameters:

Name Type Description Default
ydoc Y.YDoc

用于存储状态的 YDoc。

required
Source code in ypy_websocket/ystore.py
112
113
114
115
116
117
118
119
async def encode_state_as_update(self, ydoc: Y.YDoc) -> None:
    """存储1个 YDoc 状态.

    Arguments:
        ydoc: 用于存储状态的 YDoc。
    """
    update = Y.encode_state_as_update(ydoc)  # type: ignore
    await self.write(update)

get_metadata() async

Returns:

Type Description
bytes

元数据(metadata).

Source code in ypy_websocket/ystore.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
async def get_metadata(self) -> bytes:
    """
    Returns:
        元数据(metadata).
    """
    if self.metadata_callback is None:
        return b""

    metadata = self.metadata_callback()
    if isawaitable(metadata):
        metadata = await metadata
    metadata = cast(bytes, metadata)
    return metadata

start(*, task_status=TASK_STATUS_IGNORED) async

启动 store.

Parameters:

Name Type Description Default
task_status TaskStatus[None]

任务开始时设置的状态。

TASK_STATUS_IGNORED
Source code in ypy_websocket/ystore.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
    """启动 store.

    Arguments:
        task_status: 任务开始时设置的状态。
    """
    if self._starting:
        return
    else:
        self._starting = True

    if self._task_group is not None:
        raise RuntimeError("YStore already running")

    self.started.set()
    self._starting = False
    task_status.started()

stop()

停止 store.

Source code in ypy_websocket/ystore.py
90
91
92
93
94
95
96
def stop(self) -> None:
    """停止 store."""
    if self._task_group is None:
        raise RuntimeError("YStore not running")

    self._task_group.cancel_scope.cancel()
    self._task_group = None

FileYStore

Bases: BaseYStore

每个文档使用一个文件的 YStore。

Source code in ypy_websocket/ystore.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
class FileYStore(BaseYStore):
    """每个文档使用一个文件的 YStore。"""

    path: str
    metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None
    lock: Lock

    def __init__(
        self,
        path: str,
        metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None,
        log: Logger | None = None,
    ) -> None:
        """初始化对象

        Arguments:
            path: 用于存储更新的文件路径。
            metadata_callback: 调用以获取元数据的可选回调。
            log: 可选记录器(logger).
        """
        self.path = path
        self.metadata_callback = metadata_callback
        self.log = log or getLogger(__name__)
        self.lock = Lock()

    async def check_version(self) -> int:
        """检查商店格式的版本。

        Returns:
            数据在文件中的偏移量。
        """
        if not await anyio.Path(self.path).exists():
            version_mismatch = True
        else:
            version_mismatch = False
            move_file = False
            async with await anyio.open_file(self.path, "rb") as f:
                header = await f.read(8)
                if header == b"VERSION:":
                    version = int(await f.readline())
                    if version == self.version:
                        offset = await f.tell()
                    else:
                        version_mismatch = True
                else:
                    version_mismatch = True
                if version_mismatch:
                    move_file = True
            if move_file:
                new_path = await get_new_path(self.path)
                self.log.warning(
                    f"YStore version mismatch, moving {self.path} to {new_path}"
                )
                await anyio.Path(self.path).rename(new_path)
        if version_mismatch:
            async with await anyio.open_file(self.path, "wb") as f:
                version_bytes = f"VERSION:{self.version}\n".encode()
                await f.write(version_bytes)
                offset = len(version_bytes)
        return offset

    async def read(self) -> AsyncIterator[tuple[bytes, bytes, float]]:  # type: ignore
        """用于读取存储内容的异步迭代器。

        Returns:
            每个更新的一个元组, 结构是: (update, metadata, timestamp)
        """
        async with self.lock:
            if not await anyio.Path(self.path).exists():
                raise YDocNotFound
            offset = await self.check_version()
            async with await anyio.open_file(self.path, "rb") as f:
                await f.seek(offset)
                data = await f.read()
                if not data:
                    raise YDocNotFound
        i = 0
        for d in Decoder(data).read_messages():
            if i == 0:
                update = d
            elif i == 1:
                metadata = d
            else:
                timestamp = struct.unpack("<d", d)[0]
                yield update, metadata, timestamp
            i = (i + 1) % 3

    async def write(self, data: bytes) -> None:
        """存储1个更新

        Arguments:
            data: 要存储的更新。
        """
        parent = Path(self.path).parent
        async with self.lock:
            await anyio.Path(parent).mkdir(parents=True, exist_ok=True)
            await self.check_version()
            async with await anyio.open_file(self.path, "ab") as f:
                data_len = write_var_uint(len(data))
                await f.write(data_len + data)
                metadata = await self.get_metadata()
                metadata_len = write_var_uint(len(metadata))
                await f.write(metadata_len + metadata)
                timestamp = struct.pack("<d", time.time())
                timestamp_len = write_var_uint(len(timestamp))
                await f.write(timestamp_len + timestamp)

__init__(path, metadata_callback=None, log=None)

初始化对象

Parameters:

Name Type Description Default
path str

用于存储更新的文件路径。

required
metadata_callback Callable[[], Awaitable[bytes] | bytes] | None

调用以获取元数据的可选回调。

None
log Logger | None

可选记录器(logger).

None
Source code in ypy_websocket/ystore.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def __init__(
    self,
    path: str,
    metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None,
    log: Logger | None = None,
) -> None:
    """初始化对象

    Arguments:
        path: 用于存储更新的文件路径。
        metadata_callback: 调用以获取元数据的可选回调。
        log: 可选记录器(logger).
    """
    self.path = path
    self.metadata_callback = metadata_callback
    self.log = log or getLogger(__name__)
    self.lock = Lock()

check_version() async

检查商店格式的版本。

Returns:

Type Description
int

数据在文件中的偏移量。

Source code in ypy_websocket/ystore.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
async def check_version(self) -> int:
    """检查商店格式的版本。

    Returns:
        数据在文件中的偏移量。
    """
    if not await anyio.Path(self.path).exists():
        version_mismatch = True
    else:
        version_mismatch = False
        move_file = False
        async with await anyio.open_file(self.path, "rb") as f:
            header = await f.read(8)
            if header == b"VERSION:":
                version = int(await f.readline())
                if version == self.version:
                    offset = await f.tell()
                else:
                    version_mismatch = True
            else:
                version_mismatch = True
            if version_mismatch:
                move_file = True
        if move_file:
            new_path = await get_new_path(self.path)
            self.log.warning(
                f"YStore version mismatch, moving {self.path} to {new_path}"
            )
            await anyio.Path(self.path).rename(new_path)
    if version_mismatch:
        async with await anyio.open_file(self.path, "wb") as f:
            version_bytes = f"VERSION:{self.version}\n".encode()
            await f.write(version_bytes)
            offset = len(version_bytes)
    return offset

read() async

用于读取存储内容的异步迭代器。

Returns:

Type Description
AsyncIterator[tuple[bytes, bytes, float]]

每个更新的一个元组, 结构是: (update, metadata, timestamp)

Source code in ypy_websocket/ystore.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
async def read(self) -> AsyncIterator[tuple[bytes, bytes, float]]:  # type: ignore
    """用于读取存储内容的异步迭代器。

    Returns:
        每个更新的一个元组, 结构是: (update, metadata, timestamp)
    """
    async with self.lock:
        if not await anyio.Path(self.path).exists():
            raise YDocNotFound
        offset = await self.check_version()
        async with await anyio.open_file(self.path, "rb") as f:
            await f.seek(offset)
            data = await f.read()
            if not data:
                raise YDocNotFound
    i = 0
    for d in Decoder(data).read_messages():
        if i == 0:
            update = d
        elif i == 1:
            metadata = d
        else:
            timestamp = struct.unpack("<d", d)[0]
            yield update, metadata, timestamp
        i = (i + 1) % 3

write(data) async

存储1个更新

Parameters:

Name Type Description Default
data bytes

要存储的更新。

required
Source code in ypy_websocket/ystore.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
async def write(self, data: bytes) -> None:
    """存储1个更新

    Arguments:
        data: 要存储的更新。
    """
    parent = Path(self.path).parent
    async with self.lock:
        await anyio.Path(parent).mkdir(parents=True, exist_ok=True)
        await self.check_version()
        async with await anyio.open_file(self.path, "ab") as f:
            data_len = write_var_uint(len(data))
            await f.write(data_len + data)
            metadata = await self.get_metadata()
            metadata_len = write_var_uint(len(metadata))
            await f.write(metadata_len + metadata)
            timestamp = struct.pack("<d", time.time())
            timestamp_len = write_var_uint(len(timestamp))
            await f.write(timestamp_len + timestamp)

TempFileYStore

Bases: FileYStore

使用系统临时目录的 YStore。文件写入公共目录下。要为目录名称添加前缀(例如 /tmp/my_prefix_b4whmm7y/):

class PrefixTempFileYStore(TempFileYStore):
    prefix_dir = "my_prefix_"
Source code in ypy_websocket/ystore.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
class TempFileYStore(FileYStore):
    """使用系统临时目录的 YStore。文件写入公共目录下。要为目录名称添加前缀(例如 /tmp/my_prefix_b4whmm7y/):

    ```py
    class PrefixTempFileYStore(TempFileYStore):
        prefix_dir = "my_prefix_"
    ```
    """

    prefix_dir: str | None = None
    base_dir: str | None = None

    def __init__(
        self,
        path: str,
        metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None,
        log: Logger | None = None,
    ):
        """初始化对象

        Arguments:
            path: 用于存储更新的文件路径。
            metadata_callback: 调用以获取元数据的可选回调。
            log: 可选的记录器(logger).
        """
        full_path = str(Path(self.get_base_dir()) / path)
        super().__init__(full_path, metadata_callback=metadata_callback, log=log)

    def get_base_dir(self) -> str:
        """获取写入更新文件的基本目录。

        Returns:
            基目录路径。
        """
        if self.base_dir is None:
            self.make_directory()
        assert self.base_dir is not None
        return self.base_dir

    def make_directory(self):
        """创建写入更新文件的基本目录."""
        type(self).base_dir = tempfile.mkdtemp(prefix=self.prefix_dir)

__init__(path, metadata_callback=None, log=None)

初始化对象

Parameters:

Name Type Description Default
path str

用于存储更新的文件路径。

required
metadata_callback Callable[[], Awaitable[bytes] | bytes] | None

调用以获取元数据的可选回调。

None
log Logger | None

可选的记录器(logger).

None
Source code in ypy_websocket/ystore.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def __init__(
    self,
    path: str,
    metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None,
    log: Logger | None = None,
):
    """初始化对象

    Arguments:
        path: 用于存储更新的文件路径。
        metadata_callback: 调用以获取元数据的可选回调。
        log: 可选的记录器(logger).
    """
    full_path = str(Path(self.get_base_dir()) / path)
    super().__init__(full_path, metadata_callback=metadata_callback, log=log)

get_base_dir()

获取写入更新文件的基本目录。

Returns:

Type Description
str

基目录路径。

Source code in ypy_websocket/ystore.py
267
268
269
270
271
272
273
274
275
276
def get_base_dir(self) -> str:
    """获取写入更新文件的基本目录。

    Returns:
        基目录路径。
    """
    if self.base_dir is None:
        self.make_directory()
    assert self.base_dir is not None
    return self.base_dir

make_directory()

创建写入更新文件的基本目录.

Source code in ypy_websocket/ystore.py
278
279
280
def make_directory(self):
    """创建写入更新文件的基本目录."""
    type(self).base_dir = tempfile.mkdtemp(prefix=self.prefix_dir)

SQLiteYStore

Bases: BaseYStore

使用 SQLite 数据库的 YStore。 与基于文件的 YStore 不同,所有文档的 Y 更新都存储在同一个数据库中。

子类指向您的数据库文件:

class MySQLiteYStore(SQLiteYStore):
    db_path = "path/to/my_ystore.db"
Source code in ypy_websocket/ystore.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
class SQLiteYStore(BaseYStore):
    """使用 SQLite 数据库的 YStore。
    与基于文件的 YStore 不同,所有文档的 Y 更新都存储在同一个数据库中。

    子类指向您的数据库文件:

    ```py
    class MySQLiteYStore(SQLiteYStore):
        db_path = "path/to/my_ystore.db"
    ```
    """

    db_path: str = "ystore.db"
    # 确定所有文档的“生存时间”,即在清除文档历史记录之前文档的最新更新必须有多新。
    # 默认为永不清除文档历史记录(无)。
    document_ttl: int | None = None
    path: str
    lock: Lock
    db_initialized: Event

    def __init__(
        self,
        path: str,
        metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None,
        log: Logger | None = None,
    ) -> None:
        """初始化对象.

        Arguments:
            path: 用于存储更新的文件路径。
            metadata_callback: 调用以获取元数据的可选回调。
            log: 可选的记录器(logger).
        """
        self.path = path
        self.metadata_callback = metadata_callback
        self.log = log or getLogger(__name__)
        self.lock = Lock()
        self.db_initialized = Event()

    async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
        """启动 SQLiteYStore.

        Arguments:
            task_status: 任务开始时设置的状态。
        """
        if self._starting:
            return
        else:
            self._starting = True

        if self._task_group is not None:
            raise RuntimeError("YStore already running")

        async with create_task_group() as self._task_group:
            self._task_group.start_soon(self._init_db)
            self.started.set()
            self._starting = False
            task_status.started()

    async def _init_db(self):
        create_db = False
        move_db = False
        if not await anyio.Path(self.db_path).exists():
            create_db = True
        else:
            async with self.lock:
                async with aiosqlite.connect(self.db_path) as db:
                    cursor = await db.execute(
                        "SELECT count(name) FROM sqlite_master WHERE type='table' and name='yupdates'"
                    )
                    table_exists = (await cursor.fetchone())[0]
                    if table_exists:
                        cursor = await db.execute("pragma user_version")
                        version = (await cursor.fetchone())[0]
                        if version != self.version:
                            move_db = True
                            create_db = True
                    else:
                        create_db = True
        if move_db:
            new_path = await get_new_path(self.db_path)
            self.log.warning(
                f"YStore version mismatch, moving {self.db_path} to {new_path}"
            )
            await anyio.Path(self.db_path).rename(new_path)
        if create_db:
            async with self.lock:
                async with aiosqlite.connect(self.db_path) as db:
                    await db.execute(
                        "CREATE TABLE yupdates (path TEXT NOT NULL, yupdate BLOB, metadata BLOB, timestamp REAL NOT NULL)"
                    )
                    await db.execute(
                        "CREATE INDEX idx_yupdates_path_timestamp ON yupdates (path, timestamp)"
                    )
                    await db.execute(f"PRAGMA user_version = {self.version}")
                    await db.commit()
        self.db_initialized.set()

    async def read(self) -> AsyncIterator[tuple[bytes, bytes, float]]:  # type: ignore
        """用于读取存储内容的异步迭代器。

        Returns:
            A tuple of (update, metadata, timestamp) for each update.
        """
        await self.db_initialized.wait()
        try:
            async with self.lock:
                async with aiosqlite.connect(self.db_path) as db:
                    async with db.execute(
                        "SELECT yupdate, metadata, timestamp FROM yupdates WHERE path = ?",
                        (self.path,),
                    ) as cursor:
                        found = False
                        async for update, metadata, timestamp in cursor:
                            found = True
                            yield update, metadata, timestamp
                        if not found:
                            raise YDocNotFound
        except Exception:
            raise YDocNotFound

    async def write(self, data: bytes) -> None:
        """保存更新。

        Arguments:
            data: 要存储的更新。
        """
        await self.db_initialized.wait()
        async with self.lock:
            async with aiosqlite.connect(self.db_path) as db:
                # 首先,确定自上次更新以来经过的时间
                cursor = await db.execute(
                    "SELECT timestamp FROM yupdates WHERE path = ? ORDER BY timestamp DESC LIMIT 1",
                    (self.path,),
                )
                row = await cursor.fetchone()
                diff = (time.time() - row[0]) if row else 0

                if self.document_ttl is not None and diff > self.document_ttl:
                    # squash updates
                    ydoc = Y.YDoc()
                    async with db.execute(
                        "SELECT yupdate FROM yupdates WHERE path = ?", (self.path,)
                    ) as cursor:
                        async for (update,) in cursor:
                            Y.apply_update(ydoc, update)
                    # delete history
                    await db.execute(
                        "DELETE FROM yupdates WHERE path = ?", (self.path,)
                    )
                    # insert squashed updates
                    squashed_update = Y.encode_state_as_update(ydoc)
                    metadata = await self.get_metadata()
                    await db.execute(
                        "INSERT INTO yupdates VALUES (?, ?, ?, ?)",
                        (self.path, squashed_update, metadata, time.time()),
                    )

                # finally, write this update to the DB
                metadata = await self.get_metadata()
                await db.execute(
                    "INSERT INTO yupdates VALUES (?, ?, ?, ?)",
                    (self.path, data, metadata, time.time()),
                )
                await db.commit()

__init__(path, metadata_callback=None, log=None)

初始化对象.

Parameters:

Name Type Description Default
path str

用于存储更新的文件路径。

required
metadata_callback Callable[[], Awaitable[bytes] | bytes] | None

调用以获取元数据的可选回调。

None
log Logger | None

可选的记录器(logger).

None
Source code in ypy_websocket/ystore.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def __init__(
    self,
    path: str,
    metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None,
    log: Logger | None = None,
) -> None:
    """初始化对象.

    Arguments:
        path: 用于存储更新的文件路径。
        metadata_callback: 调用以获取元数据的可选回调。
        log: 可选的记录器(logger).
    """
    self.path = path
    self.metadata_callback = metadata_callback
    self.log = log or getLogger(__name__)
    self.lock = Lock()
    self.db_initialized = Event()

read() async

用于读取存储内容的异步迭代器。

Returns:

Type Description
AsyncIterator[tuple[bytes, bytes, float]]

A tuple of (update, metadata, timestamp) for each update.

Source code in ypy_websocket/ystore.py
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
async def read(self) -> AsyncIterator[tuple[bytes, bytes, float]]:  # type: ignore
    """用于读取存储内容的异步迭代器。

    Returns:
        A tuple of (update, metadata, timestamp) for each update.
    """
    await self.db_initialized.wait()
    try:
        async with self.lock:
            async with aiosqlite.connect(self.db_path) as db:
                async with db.execute(
                    "SELECT yupdate, metadata, timestamp FROM yupdates WHERE path = ?",
                    (self.path,),
                ) as cursor:
                    found = False
                    async for update, metadata, timestamp in cursor:
                        found = True
                        yield update, metadata, timestamp
                    if not found:
                        raise YDocNotFound
    except Exception:
        raise YDocNotFound

start(*, task_status=TASK_STATUS_IGNORED) async

启动 SQLiteYStore.

Parameters:

Name Type Description Default
task_status TaskStatus[None]

任务开始时设置的状态。

TASK_STATUS_IGNORED
Source code in ypy_websocket/ystore.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
async def start(self, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED):
    """启动 SQLiteYStore.

    Arguments:
        task_status: 任务开始时设置的状态。
    """
    if self._starting:
        return
    else:
        self._starting = True

    if self._task_group is not None:
        raise RuntimeError("YStore already running")

    async with create_task_group() as self._task_group:
        self._task_group.start_soon(self._init_db)
        self.started.set()
        self._starting = False
        task_status.started()

write(data) async

保存更新。

Parameters:

Name Type Description Default
data bytes

要存储的更新。

required
Source code in ypy_websocket/ystore.py
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
async def write(self, data: bytes) -> None:
    """保存更新。

    Arguments:
        data: 要存储的更新。
    """
    await self.db_initialized.wait()
    async with self.lock:
        async with aiosqlite.connect(self.db_path) as db:
            # 首先,确定自上次更新以来经过的时间
            cursor = await db.execute(
                "SELECT timestamp FROM yupdates WHERE path = ? ORDER BY timestamp DESC LIMIT 1",
                (self.path,),
            )
            row = await cursor.fetchone()
            diff = (time.time() - row[0]) if row else 0

            if self.document_ttl is not None and diff > self.document_ttl:
                # squash updates
                ydoc = Y.YDoc()
                async with db.execute(
                    "SELECT yupdate FROM yupdates WHERE path = ?", (self.path,)
                ) as cursor:
                    async for (update,) in cursor:
                        Y.apply_update(ydoc, update)
                # delete history
                await db.execute(
                    "DELETE FROM yupdates WHERE path = ?", (self.path,)
                )
                # insert squashed updates
                squashed_update = Y.encode_state_as_update(ydoc)
                metadata = await self.get_metadata()
                await db.execute(
                    "INSERT INTO yupdates VALUES (?, ?, ?, ?)",
                    (self.path, squashed_update, metadata, time.time()),
                )

            # finally, write this update to the DB
            metadata = await self.get_metadata()
            await db.execute(
                "INSERT INTO yupdates VALUES (?, ?, ?, ?)",
                (self.path, data, metadata, time.time()),
            )
            await db.commit()