核心和 ORM 示例

Core and ORM Examples

SQLAlchemy 发行版包含各种代码示例,展示了一些典型和一些不太典型的模式。所有示例都可以运行,并且可以在发行版的 /examples 目录中找到。所有示例的描述和源代码都可以在这里找到。

更多 SQLAlchemy 示例,包括一些用户贡献的,可以在 wiki 上找到 https://www.sqlalchemy.org/trac/wiki/UsageRecipes

The SQLAlchemy distribution includes a variety of code examples illustrating a select set of patterns, some typical and some not so typical. All are runnable and can be found in the /examples directory of the distribution. Descriptions and source code for all can be found here.

Additional SQLAlchemy examples, some user contributed, are available on the wiki at https://www.sqlalchemy.org/trac/wiki/UsageRecipes.

映射配方

Mapping Recipes

邻接表

Adjacency List

使用邻接表模型映射的字典结构的示例。

An example of a dictionary-of-dictionaries structure mapped using an adjacency list model.

E.g.:

node = TreeNode("rootnode")
node.append("node1")
node.append("node3")
session.add(node)
session.commit()

dump_tree(node)

Listing of files:

关联

Associations

说明“关联对象(association object)”模式的用法的示例,其中中间类调解以多对多模式关联的两个类之间的关系。

Examples illustrating the usage of the “association object” pattern, where an intermediary class mediates the relationship between two classes that are associated in a many-to-many pattern.

Listing of files:

  • proxied_association.py - Same example as basic_association, adding in usage of sqlalchemy.ext.associationproxy to make explicit references to OrderItem optional.

  • basic_association.py - Illustrate a many-to-many relationship between an “Order” and a collection of “Item” objects, associating a purchase price with each via an association object called “OrderItem”

  • dict_of_sets_with_default.py - An advanced association proxy example which illustrates nesting of association proxies to produce multi-level Python collections, in this case a dictionary with string keys and sets of integers as values, which conceal the underlying mapped classes.

Asyncio 集成

Asyncio Integration

说明 SQLAlchemy 的 asyncio 引擎功能的示例。

Examples illustrating the asyncio engine feature of SQLAlchemy.

Listing of files:

  • async_orm_writeonly.py - Illustrates using write only relationships for simpler handling of ORM collections under asyncio.

  • greenlet_orm.py - Illustrates use of the sqlalchemy.ext.asyncio.AsyncSession object for asynchronous ORM use, including the optional run_sync() method.

  • gather_orm_statements.py - Illustrates how to run many statements concurrently using asyncio.gather() along many asyncio database connections, merging ORM results into a single AsyncSession.

  • basic.py - Illustrates the asyncio engine / connection interface.

  • async_orm.py - Illustrates use of the sqlalchemy.ext.asyncio.AsyncSession object for asynchronous ORM use.

有向图

Directed Graphs

有向图结构持久性的示例。该图存储为边的集合,每个边都引用节点表中的“下”节点和“上”节点。下图说明了基本持久性和对下邻居和上邻居的查询:

n2 = Node(2)
n5 = Node(5)
n2.add_neighbor(n5)
print(n2.higher_neighbors())

An example of persistence for a directed graph structure. The graph is stored as a collection of edges, each referencing both a “lower” and an “upper” node in a table of nodes. Basic persistence and querying for lower- and upper- neighbors are illustrated:

n2 = Node(2)
n5 = Node(5)
n2.add_neighbor(n5)
print(n2.higher_neighbors())

Listing of files:

动态关系作为字典

Dynamic Relations as Dictionaries

说明如何将类似字典的外观放置在“动态(dynamic)”关系之上,以便字典操作(假设简单的字符串键)可以在大型集合上运行,而无需一次加载整个集合。

Illustrates how to place a dictionary-like facade on top of a “dynamic” relation, so that dictionary operations (assuming simple string keys) can operate upon a large collection without loading the full collection at once.

Listing of files:

通用关联

Generic Associations

演示了将多种类型的父对象与特定子对象关联的多种方法。

这些示例全部使用了声明式扩展(declarative extension)和 声明式混入类(declarative mixins)。每个示例最终展示的用例都是相同的: 两个类 CustomerSupplier,它们都继承自 HasAddresses 混入类, 该混入类确保父类拥有一个 addresses 集合,该集合中包含了 Address 对象。

discriminator_on_association.pygeneric_fk.py 脚本是 2007 年博客文章 使用 SQLAlchemy 进行多态关联 中配方的现代化版本。

Illustrates various methods of associating multiple types of parents with a particular child object.

The examples all use the declarative extension along with declarative mixins. Each one presents the identical use case at the end - two classes, Customer and Supplier, both subclassing the HasAddresses mixin, which ensures that the parent class is provided with an addresses collection which contains Address objects.

The discriminator_on_association.py and generic_fk.py scripts are modernized versions of recipes presented in the 2007 blog post Polymorphic Associations with SQLAlchemy.

Listing of files:

  • table_per_association.py - Illustrates a mixin which provides a generic association via a individually generated association tables for each parent class. The associated objects themselves are persisted in a single table shared among all parents.

  • discriminator_on_association.py - Illustrates a mixin which provides a generic association using a single target table and a single association table, referred to by all parent tables. The association table contains a “discriminator” column which determines what type of parent object associates to each particular row in the association table.

  • generic_fk.py - Illustrates a so-called “generic foreign key”, in a similar fashion to that of popular frameworks such as Django, ROR, etc. This approach bypasses standard referential integrity practices, in that the “foreign key” column is not actually constrained to refer to any particular table; instead, in-application logic is used to determine which table is referenced.

  • table_per_related.py - Illustrates a generic association which persists association objects within individual tables, each one generated to persist those objects on behalf of a particular parent class.

物化路径

Materialized Paths

说明使用 SQLAlchemy ORM 的分层数据的“物化路径(materialized paths)”模式。

Illustrates the “materialized paths” pattern for hierarchical data using the SQLAlchemy ORM.

Listing of files:

嵌套集

Nested Sets

说明使用 SQLAlchemy ORM 实现分层数据“嵌套集”模式的基本方法。

Illustrates a rudimentary way to implement the “nested sets” pattern for hierarchical data using the SQLAlchemy ORM.

Listing of files:

性能

Performance

一个针对多种 SQLAlchemy 用例的性能分析套件。

每个套件侧重于一个具有特定性能特征和相关影响的用例:

  • 批量插入

  • 单独插入(带或不带事务)

  • 获取大量行

  • 执行大量短查询

所有套件都包含各种使用模式,展示了 Core 和 ORM 的用法,通常按性能从差到好排序, 这种排序与 SQLAlchemy 所提供功能的多寡成反比,从最多到最少 (这两个维度通常是完全对应的)。

该包提供了一个命令行工具,可以运行各个测试套件:

$ python -m examples.performance --help
usage: python -m examples.performance [-h] [--test TEST] [--dburl DBURL]
                                      [--num NUM] [--profile] [--dump]
                                      [--echo]

                                      {bulk_inserts,large_resultsets,single_inserts}

位置参数
  {bulk_inserts,large_resultsets,single_inserts}
                        要运行的套件

可选参数
  -h, --help            显示此帮助信息并退出
  --test TEST           运行指定测试名称
  --dburl DBURL         数据库 URL默认值为 sqlite:///profile.db
  --num NUM             指定测试的迭代次数/项目数量等
                        默认值为各模块特定
  --profile             运行性能分析并输出调用计数
  --dump                输出完整调用分析隐含 --profile
  --echo                显示 SQL 输出

一个示例运行如下:

$ python -m examples.performance bulk_inserts

或者带上选项:

$ python -m examples.performance bulk_inserts \
    --dburl mysql+mysqldb://scott:tiger@localhost/test \
    --profile --num 1000

A performance profiling suite for a variety of SQLAlchemy use cases.

Each suite focuses on a specific use case with a particular performance profile and associated implications:

  • bulk inserts

  • individual inserts, with or without transactions

  • fetching large numbers of rows

  • running lots of short queries

All suites include a variety of use patterns illustrating both Core and ORM use, and are generally sorted in order of performance from worst to greatest, inversely based on amount of functionality provided by SQLAlchemy, greatest to least (these two things generally correspond perfectly).

A command line tool is presented at the package level which allows individual suites to be run:

$ python -m examples.performance --help
usage: python -m examples.performance [-h] [--test TEST] [--dburl DBURL]
                                      [--num NUM] [--profile] [--dump]
                                      [--echo]

                                      {bulk_inserts,large_resultsets,single_inserts}

positional arguments:
  {bulk_inserts,large_resultsets,single_inserts}
                        suite to run

optional arguments:
  -h, --help            show this help message and exit
  --test TEST           run specific test name
  --dburl DBURL         database URL, default sqlite:///profile.db
  --num NUM             Number of iterations/items/etc for tests;
                        default is module-specific
  --profile             run profiling and dump call counts
  --dump                dump full call profile (implies --profile)
  --echo                Echo SQL output

An example run looks like:

$ python -m examples.performance bulk_inserts

Or with options:

$ python -m examples.performance bulk_inserts \
    --dburl mysql+mysqldb://scott:tiger@localhost/test \
    --profile --num 1000

文件列表

File Listing

Listing of files:

  • short_selects.py - This series of tests illustrates different ways to SELECT a single record by primary key

  • __main__.py - Allows the examples/performance package to be run as a script.

  • single_inserts.py - In this series of tests, we’re looking at a method that inserts a row within a distinct transaction, and afterwards returns to essentially a “closed” state. This would be analogous to an API call that starts up a database connection, inserts the row, commits and closes.

  • bulk_updates.py - This series of tests will illustrate different ways to UPDATE a large number of rows in bulk (under construction! there’s just one test at the moment)

  • large_resultsets.py - In this series of tests, we are looking at time to load a large number of very small and simple rows.

  • bulk_inserts.py - This series of tests illustrates different ways to INSERT a large number of rows in bulk.

随时间运行所有测试

Running all tests with time

这是运行的默认形式:

$ python -m examples.performance single_inserts
Tests to run: test_orm_commit, test_bulk_save,
              test_bulk_insert_dictionaries, test_core,
              test_core_query_caching, test_dbapi_raw_w_connect,
              test_dbapi_raw_w_pool

test_orm_commit : Individual INSERT/COMMIT pairs via the
    ORM (10000 iterations); total time 13.690218 sec
test_bulk_save : Individual INSERT/COMMIT pairs using
    the "bulk" API  (10000 iterations); total time 11.290371 sec
test_bulk_insert_dictionaries : Individual INSERT/COMMIT pairs using
    the "bulk" API with dictionaries (10000 iterations);
    total time 10.814626 sec
test_core : Individual INSERT/COMMIT pairs using Core.
    (10000 iterations); total time 9.665620 sec
test_core_query_caching : Individual INSERT/COMMIT pairs using Core
    with query caching (10000 iterations); total time 9.209010 sec
test_dbapi_raw_w_connect : Individual INSERT/COMMIT pairs w/ DBAPI +
    connection each time (10000 iterations); total time 9.551103 sec
test_dbapi_raw_w_pool : Individual INSERT/COMMIT pairs w/ DBAPI +
    connection pool (10000 iterations); total time 8.001813 sec

This is the default form of run:

$ python -m examples.performance single_inserts
Tests to run: test_orm_commit, test_bulk_save,
              test_bulk_insert_dictionaries, test_core,
              test_core_query_caching, test_dbapi_raw_w_connect,
              test_dbapi_raw_w_pool

test_orm_commit : Individual INSERT/COMMIT pairs via the
    ORM (10000 iterations); total time 13.690218 sec
test_bulk_save : Individual INSERT/COMMIT pairs using
    the "bulk" API  (10000 iterations); total time 11.290371 sec
test_bulk_insert_dictionaries : Individual INSERT/COMMIT pairs using
    the "bulk" API with dictionaries (10000 iterations);
    total time 10.814626 sec
test_core : Individual INSERT/COMMIT pairs using Core.
    (10000 iterations); total time 9.665620 sec
test_core_query_caching : Individual INSERT/COMMIT pairs using Core
    with query caching (10000 iterations); total time 9.209010 sec
test_dbapi_raw_w_connect : Individual INSERT/COMMIT pairs w/ DBAPI +
    connection each time (10000 iterations); total time 9.551103 sec
test_dbapi_raw_w_pool : Individual INSERT/COMMIT pairs w/ DBAPI +
    connection pool (10000 iterations); total time 8.001813 sec

转储单个测试的配置文件

Dumping Profiles for Individual Tests

可以为所有测试转储 Python 配置文件输出,或者更常见的是单个测试:

$ python -m examples.performance single_inserts --test test_core --num 1000 --dump
Tests to run: test_core
test_core : Individual INSERT/COMMIT pairs using Core. (1000 iterations); total fn calls 186109
         186109 function calls (186102 primitive calls) in 1.089 seconds

   Ordered by: internal time, call count

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1000    0.634    0.001    0.634    0.001 {method 'commit' of 'sqlite3.Connection' objects}
     1000    0.154    0.000    0.154    0.000 {method 'execute' of 'sqlite3.Cursor' objects}
     1000    0.021    0.000    0.074    0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/sql/compiler.py:1950(_get_colparams)
     1000    0.015    0.000    0.034    0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/engine/default.py:503(_init_compiled)
        1    0.012    0.012    1.091    1.091 examples/performance/single_inserts.py:79(test_core)

    ...

A Python profile output can be dumped for all tests, or more commonly individual tests:

$ python -m examples.performance single_inserts --test test_core --num 1000 --dump
Tests to run: test_core
test_core : Individual INSERT/COMMIT pairs using Core. (1000 iterations); total fn calls 186109
         186109 function calls (186102 primitive calls) in 1.089 seconds

   Ordered by: internal time, call count

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1000    0.634    0.001    0.634    0.001 {method 'commit' of 'sqlite3.Connection' objects}
     1000    0.154    0.000    0.154    0.000 {method 'execute' of 'sqlite3.Cursor' objects}
     1000    0.021    0.000    0.074    0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/sql/compiler.py:1950(_get_colparams)
     1000    0.015    0.000    0.034    0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/engine/default.py:503(_init_compiled)
        1    0.012    0.012    1.091    1.091 examples/performance/single_inserts.py:79(test_core)

    ...

编写您自己的套件

Writing your Own Suites

性能分析套件系统是可扩展的,可以应用于你自己的一组测试中。这在决定某些对性能要求较高的代码路径的最佳实现方式时,是一个非常有价值的技术。例如,如果我们想分析几种加载方式之间的性能差异,我们可以创建一个名为 test_loads.py 的文件,内容如下:

from examples.performance import Profiler
from sqlalchemy import Integer, Column, create_engine, ForeignKey
from sqlalchemy.orm import relationship, joinedload, subqueryload, Session
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()
engine = None
session = None


class Parent(Base):
    __tablename__ = "parent"
    id = Column(Integer, primary_key=True)
    children = relationship("Child")


class Child(Base):
    __tablename__ = "child"
    id = Column(Integer, primary_key=True)
    parent_id = Column(Integer, ForeignKey("parent.id"))


# 使用文件名和默认数据量进行初始化
Profiler.init("test_loads", 1000)


@Profiler.setup_once
def setup_once(dburl, echo, num):
    "仅初始化一次。创建引擎并插入测试数据。"
    global engine
    engine = create_engine(dburl, echo=echo)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)
    sess = Session(engine)
    sess.add_all(
        [
            Parent(children=[Child() for j in range(100)])
            for i in range(num)
        ]
    )
    sess.commit()


@Profiler.setup
def setup(dburl, echo, num):
    "每次测试初始化。创建一个新的 Session。"
    global session
    session = Session(engine)
    # 提前连接,避免这部分被计入性能分析(可选)
    session.connection()


@Profiler.profile
def test_lazyload(n):
    "加载所有数据,不使用任何预加载。"

    for parent in session.query(Parent):
        parent.children


@Profiler.profile
def test_joinedload(n):
    "加载所有数据,使用 join 方式的预加载。"

    for parent in session.query(Parent).options(joinedload("children")):
        parent.children


@Profiler.profile
def test_subqueryload(n):
    "加载所有数据,使用子查询方式的预加载。"

    for parent in session.query(Parent).options(subqueryload("children")):
        parent.children


if __name__ == "__main__":
    Profiler.main()

我们可以直接运行这个新脚本:

$ python test_loads.py  --dburl postgresql+psycopg2://scott:tiger@localhost/test
Running setup once...
Tests to run: test_lazyload, test_joinedload, test_subqueryload
test_lazyload : load everything, no eager loading. (1000 iterations); total time 11.971159 sec
test_joinedload : load everything, joined eager loading. (1000 iterations); total time 2.754592 sec
test_subqueryload : load everything, subquery eager loading. (1000 iterations); total time 2.977696 sec

The profiler suite system is extensible, and can be applied to your own set of tests. This is a valuable technique to use in deciding upon the proper approach for some performance-critical set of routines. For example, if we wanted to profile the difference between several kinds of loading, we can create a file test_loads.py, with the following content:

from examples.performance import Profiler
from sqlalchemy import Integer, Column, create_engine, ForeignKey
from sqlalchemy.orm import relationship, joinedload, subqueryload, Session
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()
engine = None
session = None


class Parent(Base):
    __tablename__ = "parent"
    id = Column(Integer, primary_key=True)
    children = relationship("Child")


class Child(Base):
    __tablename__ = "child"
    id = Column(Integer, primary_key=True)
    parent_id = Column(Integer, ForeignKey("parent.id"))


# Init with name of file, default number of items
Profiler.init("test_loads", 1000)


@Profiler.setup_once
def setup_once(dburl, echo, num):
    "setup once.  create an engine, insert fixture data"
    global engine
    engine = create_engine(dburl, echo=echo)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)
    sess = Session(engine)
    sess.add_all(
        [
            Parent(children=[Child() for j in range(100)])
            for i in range(num)
        ]
    )
    sess.commit()


@Profiler.setup
def setup(dburl, echo, num):
    "setup per test.  create a new Session."
    global session
    session = Session(engine)
    # pre-connect so this part isn't profiled (if we choose)
    session.connection()


@Profiler.profile
def test_lazyload(n):
    "load everything, no eager loading."

    for parent in session.query(Parent):
        parent.children


@Profiler.profile
def test_joinedload(n):
    "load everything, joined eager loading."

    for parent in session.query(Parent).options(joinedload("children")):
        parent.children


@Profiler.profile
def test_subqueryload(n):
    "load everything, subquery eager loading."

    for parent in session.query(Parent).options(subqueryload("children")):
        parent.children


if __name__ == "__main__":
    Profiler.main()

We can run our new script directly:

$ python test_loads.py  --dburl postgresql+psycopg2://scott:tiger@localhost/test
Running setup once...
Tests to run: test_lazyload, test_joinedload, test_subqueryload
test_lazyload : load everything, no eager loading. (1000 iterations); total time 11.971159 sec
test_joinedload : load everything, joined eager loading. (1000 iterations); total time 2.754592 sec
test_subqueryload : load everything, subquery eager loading. (1000 iterations); total time 2.977696 sec

太空入侵者

Space Invaders

一款使用 SQLite 作为状态机的太空侵略者游戏。

最初于 2012 年开发。适用于 Python 3。

使用 ASCII 艺术在文本控制台中运行。

../_images/space_invaders.jpg

运行:

$ python -m examples.space_invaders.space_invaders

运行时,查看日志中的 SQL 输出:

$ tail -f space_invaders.log

尽情享受吧!

A Space Invaders game using SQLite as the state machine.

Originally developed in 2012. Adapted to work in Python 3.

Runs in a textual console using ASCII art.

../_images/space_invaders.jpg

To run:

$ python -m examples.space_invaders.space_invaders

While it runs, watch the SQL output in the log:

$ tail -f space_invaders.log

enjoy!

Listing of files:

对象版本控制

Versioning Objects

使用历史表进行版本控制

Versioning with a History Table

展示了一个扩展,它为实体创建版本表,并在每次变更时存储记录。 该扩展会生成一个匿名的“历史”类,用于表示目标对象的历史版本。

可与 使用时间行进行版本控制 示例进行对比,后者是在同一张表中将更新作为新行写入, 而不是使用单独的历史表。

通过一个单元测试模块 test_versioning.py 来演示用法,该模块使用 SQLAlchemy 的内部 pytest 插件运行:

$ pytest test/base/test_examples.py

以下是一个使用声明式(declarative)进行示例操作的片段:

from history_meta import Versioned, versioned_session


class Base(DeclarativeBase):
    pass


class SomeClass(Versioned, Base):
    __tablename__ = "sometable"

    id = Column(Integer, primary_key=True)
    name = Column(String(50))

    def __eq__(self, other):
        assert type(other) is SomeClass and other.id == self.id


Session = sessionmaker(bind=engine)
versioned_session(Session)

sess = Session()
sc = SomeClass(name="sc1")
sess.add(sc)
sess.commit()

sc.name = "sc1modified"
sess.commit()

assert sc.version == 2

SomeClassHistory = SomeClass.__history_mapper__.class_

assert sess.query(SomeClassHistory).filter(
    SomeClassHistory.version == 1
).all() == [SomeClassHistory(version=1, name="sc1")]

Versioned mixin 设计用于配合声明式使用。 若要在经典映射(classical mapper)中使用该扩展,可使用 _history_mapper 函数:

from history_meta import _history_mapper

m = mapper(SomeClass, sometable)
_history_mapper(m)

SomeHistoryClass = SomeClass.__history_mapper__.class_

该版本控制示例还可与 ORM 的乐观并发控制特性集成,相关文档见 配置版本计数器。 若要启用该功能,可将 Versioned.use_mapper_versioning 设置为 True:

class SomeClass(Versioned, Base):
    __tablename__ = "sometable"

    use_mapper_versioning = True

    id = Column(Integer, primary_key=True)
    name = Column(String(50))

    def __eq__(self, other):
        assert type(other) is SomeClass and other.id == self.id

如上所示,若两个具有相同版本标识符的 SomeClass 实例同时更新并提交到数据库, 而数据库的隔离级别允许两个 UPDATE 语句同时进行, 那么其中一个将会失败,因为它已不再基于最新的版本标识符进行操作。

Illustrates an extension which creates version tables for entities and stores records for each change. The given extensions generate an anonymous “history” class which represents historical versions of the target object.

Compare to the 使用时间行进行版本控制 examples which write updates as new rows in the same table, without using a separate history table.

Usage is illustrated via a unit test module test_versioning.py, which is run using SQLAlchemy’s internal pytest plugin:

$ pytest test/base/test_examples.py

A fragment of example usage, using declarative:

from history_meta import Versioned, versioned_session


class Base(DeclarativeBase):
    pass


class SomeClass(Versioned, Base):
    __tablename__ = "sometable"

    id = Column(Integer, primary_key=True)
    name = Column(String(50))

    def __eq__(self, other):
        assert type(other) is SomeClass and other.id == self.id


Session = sessionmaker(bind=engine)
versioned_session(Session)

sess = Session()
sc = SomeClass(name="sc1")
sess.add(sc)
sess.commit()

sc.name = "sc1modified"
sess.commit()

assert sc.version == 2

SomeClassHistory = SomeClass.__history_mapper__.class_

assert sess.query(SomeClassHistory).filter(
    SomeClassHistory.version == 1
).all() == [SomeClassHistory(version=1, name="sc1")]

The Versioned mixin is designed to work with declarative. To use the extension with classical mappers, the _history_mapper function can be applied:

from history_meta import _history_mapper

m = mapper(SomeClass, sometable)
_history_mapper(m)

SomeHistoryClass = SomeClass.__history_mapper__.class_

The versioning example also integrates with the ORM optimistic concurrency feature documented at 配置版本计数器. To enable this feature, set the flag Versioned.use_mapper_versioning to True:

class SomeClass(Versioned, Base):
    __tablename__ = "sometable"

    use_mapper_versioning = True

    id = Column(Integer, primary_key=True)
    name = Column(String(50))

    def __eq__(self, other):
        assert type(other) is SomeClass and other.id == self.id

Above, if two instance of SomeClass with the same version identifier are updated and sent to the database for UPDATE concurrently, if the database isolation level allows the two UPDATE statements to proceed, one will fail because it no longer is against the last known version identifier.

Listing of files:

使用时间行进行版本控制

Versioning using Temporal Rows

几个示例说明了拦截更改的技术,这些更改最初会被解释为对某行的更新,而是将其转换为对新行的插入,而前一行则保持完整,作为历史版本。

使用历史表进行版本控制 示例相比,该示例将历史行写入单独的历史表。

Several examples that illustrate the technique of intercepting changes that would be first interpreted as an UPDATE on a row, and instead turning it into an INSERT of a new row, leaving the previous row intact as a historical version.

Compare to the 使用历史表进行版本控制 example which writes a history row to a separate history table.

Listing of files:

  • versioned_rows.py - Illustrates a method to intercept changes on objects, turning an UPDATE statement on a single row into an INSERT statement, so that a new row is inserted with the new data, keeping the old row intact.

  • versioned_map.py - A variant of the versioned_rows example built around the concept of a “vertical table” structure, like those illustrated in 垂直属性映射 examples.

  • versioned_update_old_row.py - Illustrates the same UPDATE into INSERT technique of versioned_rows.py, but also emits an UPDATE on the old row to affect a change in timestamp. Also includes a SessionEvents.do_orm_execute() hook to limit queries to only the most recent version.

  • versioned_rows_w_versionid.py - Illustrates a method to intercept changes on objects, turning an UPDATE statement on a single row into an INSERT statement, so that a new row is inserted with the new data, keeping the old row intact.

垂直属性映射

Vertical Attribute Mapping

展示了“纵向表”(vertical table)映射方式。

“纵向表”是一种技术,将对象的各个属性作为单独的行存储在表中。 这种“纵向表”技术用于持久化那些属性集合不固定的对象, 其代价是牺牲了查询的简洁性和控制的直接性。 此技术常用于内容/文档管理系统中,以灵活地表示用户自定义的结构。

此示例提供了两种变体方式。 在第二种方式中,每一行会引用一个“数据类型”(datatype)对象, 该对象包含有关该属性所存储信息的类型(如整数、字符串或日期)的信息。

示例:

shrew = Animal("shrew")
shrew["cuteness"] = 5
shrew["weasel-like"] = False
shrew["poisonous"] = True

session.add(shrew)
session.flush()

q = session.query(Animal).filter(
    Animal.facts.any(
        and_(AnimalFact.key == "weasel-like", AnimalFact.value == True)
    )
)
print("weasel-like animals", q.all())

Illustrates “vertical table” mappings.

A “vertical table” refers to a technique where individual attributes of an object are stored as distinct rows in a table. The “vertical table” technique is used to persist objects which can have a varied set of attributes, at the expense of simple query control and brevity. It is commonly found in content/document management systems in order to represent user-created structures flexibly.

Two variants on the approach are given. In the second, each row references a “datatype” which contains information about the type of information stored in the attribute, such as integer, string, or date.

Example:

shrew = Animal("shrew")
shrew["cuteness"] = 5
shrew["weasel-like"] = False
shrew["poisonous"] = True

session.add(shrew)
session.flush()

q = session.query(Animal).filter(
    Animal.facts.any(
        and_(AnimalFact.key == "weasel-like", AnimalFact.value == True)
    )
)
print("weasel-like animals", q.all())

Listing of files:

继承映射配方

Inheritance Mapping Recipes

Basic Inheritance Mappings

单表、连接表和具体表继承的工作示例,如 映射类继承层次结构 中所述。

Working examples of single-table, joined-table, and concrete-table inheritance as described in 映射类继承层次结构.

Listing of files:

  • joined.py - Joined-table (table-per-subclass) inheritance example.

  • single.py - Single-table (table-per-hierarchy) inheritance example.

  • concrete.py - Concrete-table (table-per-class) inheritance example.

特殊 API

Special APIs

属性检测

Attribute Instrumentation

说明对 SQLAlchemy 的属性管理系统的修改的示例。

Examples illustrating modifications to SQLAlchemy’s attribute management system.

Listing of files:

水平分片

Horizontal Sharding

使用 SQLAlchemy 分片(Sharding)API 的基础示例。 分片指的是在多个数据库之间横向扩展数据的能力。

“分片”映射的基本组成部分包括:

  • 多个 Engine 实例,每个实例分配一个“分片 ID”(shard id)。 这些 Engine 实例可以指向不同的数据库, 或是同一数据库中不同的 schema / accounts,甚至也可以仅通过一些选项加以区分, 使其在使用时访问不同的 schema 或表。

  • 一个函数,它接收一个待保存的实例并返回一个分片 ID; 这个函数被称为 “shard_chooser”。

  • 一个函数,它接收一个实例标识符,并返回一个适用的分片 ID 列表; 这个函数称为 “id_chooser”。如果该函数返回所有分片 ID,则表示将搜索所有分片。

  • 一个函数,它接收一个查询对象并返回一个要尝试的分片 ID 列表; 这个函数称为 “query_chooser”。如果返回所有分片 ID,则表示将对所有分片进行查询, 并将结果合并。

在这些示例中,我们对相同的基础模型使用了不同类型的分片方式, 该模型用于按大陆存储天气数据。我们提供了示例性的 shard_chooser、id_chooser 和 query_chooser 函数。其中 query_chooser 展示了如何通过检查 SQL 表达式元素, 尝试推断出所请求的唯一分片。

构建通用的分片操作是一种富有挑战性的方式,目的是为多个数据库之间的实例组织问题提供解决方案。 作为一种更简单直白的替代方式,“独立实体(distinct entity)”方法可以显式地将对象分配到不同的表 (并可能是不同的数据库节点)中 —— 该方法在 Wiki 页面中有描述, 参见 EntityName

A basic example of using the SQLAlchemy Sharding API. Sharding refers to horizontally scaling data across multiple databases.

The basic components of a “sharded” mapping are:

  • multiple Engine instances, each assigned a “shard id”. These Engine instances may refer to different databases, or different schemas / accounts within the same database, or they can even be differentiated only by options that will cause them to access different schemas or tables when used.

  • a function which can return a single shard id, given an instance to be saved; this is called “shard_chooser”

  • a function which can return a list of shard ids which apply to a particular instance identifier; this is called “id_chooser”.If it returns all shard ids, all shards will be searched.

  • a function which can return a list of shard ids to try, given a particular Query (“query_chooser”). If it returns all shard ids, all shards will be queried and the results joined together.

In these examples, different kinds of shards are used against the same basic example which accommodates weather data on a per-continent basis. We provide example shard_chooser, id_chooser and query_chooser functions. The query_chooser illustrates inspection of the SQL expression element in order to attempt to determine a single shard being requested.

The construction of generic sharding routines is an ambitious approach to the issue of organizing instances among multiple databases. For a more plain-spoken alternative, the “distinct entity” approach is a simple method of assigning objects to different tables (and potentially database nodes) in an explicit way - described on the wiki at EntityName.

Listing of files:

  • asyncio.py - Illustrates sharding API used with asyncio.

  • separate_databases.py - Illustrates sharding using distinct SQLite databases.

  • separate_schema_translates.py - Illustrates sharding using a single database with multiple schemas, where a different “schema_translates_map” can be used for each shard.

  • separate_tables.py - Illustrates sharding using a single SQLite database, that will however have multiple tables using a naming convention.

扩展核心

Extending Core

Extending Statements like SELECT, INSERT, etc

扩展 Select 构造以包含新的非 SQL 标准子句 QUALIFY 的详细示例。

此示例说明了 自定义 SQL 构造和编译扩展 以及称为 SyntaxExtension 的扩展。

A detailed example of extending the Select construct to include a new non-SQL standard clause QUALIFY.

This example illustrates both the 自定义 SQL 构造和编译扩展 as well as an extension known as SyntaxExtension.

Listing of files:

扩展 ORM

Extending the ORM

ORM 查询事件

ORM Query Events-

这些配方说明了 Session.execute() 使用的 ORM SELECT 行为的增强,以及 2.0 样式select() 以及 1.x 样式 Query 对象。

示例包括 with_loader_criteria() 选项以及 SessionEvents.do_orm_execute() 钩子的演示。

从 SQLAlchemy 1.4 开始,Query 构造与 Select 构造统一,因此这两个对象大致相同。

Recipes which illustrate augmentation of ORM SELECT behavior as used by Session.execute() with 2.0 style use of select(), as well as the 1.x style Query object.

Examples include demonstrations of the with_loader_criteria() option as well as the SessionEvents.do_orm_execute() hook.

As of SQLAlchemy 1.4, the Query construct is unified with the Select construct, so that these two objects are mostly the same.

Listing of files:

  • temporal_range.py - Illustrates a custom per-query criteria that will be applied to selected entities.

  • filter_public.py - Illustrates a global criteria applied to entities of a particular type.

Dogpile 缓存

Dogpile Caching

演示了如何将 dogpile.cache 的功能 嵌入到 ORM 查询中,从而实现对缓存的全面控制,以及从长期缓存中获取 “惰性加载”(lazy loaded)属性的能力。

在本演示中,涵盖了以下技术:

  • 使用 SessionEvents.do_orm_execute() 事件钩子

  • 利用基本技术绕过 Session.execute(),从自定义缓存源中获取数据, 而不是从数据库中查询。

  • 使用 dogpile.cache 进行基础缓存,借助“区域”(regions)机制对一组固定配置进行全局控制。

  • 使用自定义的 UserDefinedOption 对象,在语句对象中配置选项。

参见

重新执行语句 - 包含一个展示此处技术的一般性示例。

示例代码如下:

# 查询 Person 对象,并指定使用缓存
stmt = select(Person).options(FromCache("default"))

# 指定每个 Person 的 "addresses" 集合也来自缓存
stmt = stmt.options(RelationshipCache(Person.addresses, "default"))

# 执行语句并获取结果
result = session.execute(stmt)

print(result.scalars().all())

运行该示例前,需要确保 SQLAlchemy 和 dogpile.cache 都已安装, 或已位于当前 PYTHONPATH 中。该演示将创建一个本地目录用于存放数据文件, 插入初始数据并运行。第二次运行该演示时将利用已存在的缓存文件, 整个过程只会对两个表执行一条 SQL 语句——但所展示的结果 将会通过缓存执行数十次惰性加载操作。

演示脚本本身按复杂度排序,并作为 Python 模块运行以确保相对导入正常工作:

$ python -m examples.dogpile_caching.helloworld

$ python -m examples.dogpile_caching.relationship_caching

$ python -m examples.dogpile_caching.advanced

$ python -m examples.dogpile_caching.local_session_caching

Illustrates how to embed dogpile.cache functionality with ORM queries, allowing full cache control as well as the ability to pull “lazy loaded” attributes from long term cache.

In this demo, the following techniques are illustrated:

  • Using the SessionEvents.do_orm_execute() event hook

  • Basic technique of circumventing Session.execute() to pull from a custom cache source instead of the database.

  • Rudimental caching with dogpile.cache, using “regions” which allow global control over a fixed set of configurations.

  • Using custom UserDefinedOption objects to configure options in a statement object.

参见

重新执行语句 - includes a general example of the technique presented here.

E.g.:

# query for Person objects, specifying cache
stmt = select(Person).options(FromCache("default"))

# specify that each Person's "addresses" collection comes from
# cache too
stmt = stmt.options(RelationshipCache(Person.addresses, "default"))

# execute and results
result = session.execute(stmt)

print(result.scalars().all())

To run, both SQLAlchemy and dogpile.cache must be installed or on the current PYTHONPATH. The demo will create a local directory for datafiles, insert initial data, and run. Running the demo a second time will utilize the cache files already present, and exactly one SQL statement against two tables will be emitted - the displayed result however will utilize dozens of lazyloads that all pull from cache.

The demo scripts themselves, in order of complexity, are run as Python modules so that relative imports work:

$ python -m examples.dogpile_caching.helloworld

$ python -m examples.dogpile_caching.relationship_caching

$ python -m examples.dogpile_caching.advanced

$ python -m examples.dogpile_caching.local_session_caching

Listing of files:

  • environment.py - Establish data / cache file paths, and configurations, bootstrap fixture data if necessary.

  • caching_query.py - Represent functions and classes which allow the usage of Dogpile caching with SQLAlchemy. Introduces a query option called FromCache.

  • model.py - The datamodel, which represents Person that has multiple Address objects, each with PostalCode, City, Country.

  • fixture_data.py - Installs some sample data. Here we have a handful of postal codes for a few US/Canadian cities. Then, 100 Person records are installed, each with a randomly selected postal code.

  • helloworld.py - Illustrate how to load some data, and cache the results.

  • relationship_caching.py - Illustrates how to add cache options on relationship endpoints, so that lazyloads load from cache.

  • advanced.py - Illustrate usage of Query combined with the FromCache option, including front-end loading, cache invalidation and collection caching.

  • local_session_caching.py - This example creates a new dogpile.cache backend that will persist data in a dictionary which is local to the current session. remove() the session and the cache is gone.