配置关系连接方式¶
Configuring how Relationship Joins
relationship()
通常会通过检查两个表之间的外键关系来确定应比较哪些列来创建两个表之间的连接。在各种情况下,此行为都需要定制。
relationship()
will normally create a join between two tables by examining the foreign key relationship between the two tables to determine which columns should be compared. There are a variety of situations where this behavior needs to be customized.
处理多个连接路径¶
Handling Multiple Join Paths
最常见的情况之一是两个表之间有多个外键路径。
考虑一个 Customer
类,其中包含两个指向 Address
类的外键:
from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship
class Base(DeclarativeBase):
pass
class Customer(Base):
__tablename__ = "customer"
id = mapped_column(Integer, primary_key=True)
name = mapped_column(String)
billing_address_id = mapped_column(Integer, ForeignKey("address.id"))
shipping_address_id = mapped_column(Integer, ForeignKey("address.id"))
billing_address = relationship("Address")
shipping_address = relationship("Address")
class Address(Base):
__tablename__ = "address"
id = mapped_column(Integer, primary_key=True)
street = mapped_column(String)
city = mapped_column(String)
state = mapped_column(String)
zip = mapped_column(String)
上述映射,当我们尝试使用它时,会产生错误:
sqlalchemy.exc.AmbiguousForeignKeysError: Could not determine join
condition between parent/child tables on relationship
Customer.billing_address - there are multiple foreign key
paths linking the tables. Specify the 'foreign_keys' argument,
providing a list of those columns which should be
counted as containing a foreign key reference to the parent table.
上述消息相当长。relationship()
可以返回许多潜在的消息,这些消息经过精心定制以检测各种常见的配置问题;大多数情况下会建议解决歧义或其他缺失信息所需的额外配置。
在这种情况下,消息希望我们通过为每个 relationship()
指定应考虑的外键列来限定每个 relationship()
,适当的形式如下:
class Customer(Base):
__tablename__ = "customer"
id = mapped_column(Integer, primary_key=True)
name = mapped_column(String)
billing_address_id = mapped_column(Integer, ForeignKey("address.id"))
shipping_address_id = mapped_column(Integer, ForeignKey("address.id"))
billing_address = relationship("Address", foreign_keys=[billing_address_id])
shipping_address = relationship("Address", foreign_keys=[shipping_address_id])
在上面,我们指定了 foreign_keys
参数,这是一个 Column
或 Column
对象列表,用于指示应被视为“外键”的列,换句话说,即包含引用父表值的列。从 Customer
对象加载 Customer.billing_address
关系将使用 billing_address_id
中存在的值来标识要加载的 Address
中的行;类似地, shipping_address_id
用于 shipping_address
关系。这两个列的链接在持久化期间也起作用;在刷新期间,刚插入的 Address
对象的新生成主键将被复制到关联的 Customer
对象的适当外键列中。
在使用声明式指定 foreign_keys
时,我们也可以使用字符串名称来指定,但重要的是,如果使用列表, 列表是字符串的一部分:
billing_address = relationship("Address", foreign_keys="[Customer.billing_address_id]")
在这个特定示例中,列表在任何情况下都是不必要的,因为我们只需要一个 Column
:
billing_address = relationship("Address", foreign_keys="Customer.billing_address_id")
警告
作为 Python 可评估字符串传递时,relationship.foreign_keys
参数使用 Python 的 eval()
函数进行解释。 不要将不受信任的输入传递给此字符串。有关声明式评估 relationship()
参数的详细信息,请参见 Evaluation of relationship arguments。
One of the most common situations to deal with is when there are more than one foreign key path between two tables.
Consider a Customer
class that contains two foreign keys to an Address
class:
from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship
class Base(DeclarativeBase):
pass
class Customer(Base):
__tablename__ = "customer"
id = mapped_column(Integer, primary_key=True)
name = mapped_column(String)
billing_address_id = mapped_column(Integer, ForeignKey("address.id"))
shipping_address_id = mapped_column(Integer, ForeignKey("address.id"))
billing_address = relationship("Address")
shipping_address = relationship("Address")
class Address(Base):
__tablename__ = "address"
id = mapped_column(Integer, primary_key=True)
street = mapped_column(String)
city = mapped_column(String)
state = mapped_column(String)
zip = mapped_column(String)
The above mapping, when we attempt to use it, will produce the error:
sqlalchemy.exc.AmbiguousForeignKeysError: Could not determine join
condition between parent/child tables on relationship
Customer.billing_address - there are multiple foreign key
paths linking the tables. Specify the 'foreign_keys' argument,
providing a list of those columns which should be
counted as containing a foreign key reference to the parent table.
The above message is pretty long. There are many potential messages
that relationship()
can return, which have been carefully tailored
to detect a variety of common configurational issues; most will suggest
the additional configuration that’s needed to resolve the ambiguity
or other missing information.
In this case, the message wants us to qualify each relationship()
by instructing for each one which foreign key column should be considered, and
the appropriate form is as follows:
class Customer(Base):
__tablename__ = "customer"
id = mapped_column(Integer, primary_key=True)
name = mapped_column(String)
billing_address_id = mapped_column(Integer, ForeignKey("address.id"))
shipping_address_id = mapped_column(Integer, ForeignKey("address.id"))
billing_address = relationship("Address", foreign_keys=[billing_address_id])
shipping_address = relationship("Address", foreign_keys=[shipping_address_id])
Above, we specify the foreign_keys
argument, which is a Column
or list
of Column
objects which indicate those columns to be considered “foreign”,
or in other words, the columns that contain a value referring to a parent table.
Loading the Customer.billing_address
relationship from a Customer
object will use the value present in billing_address_id
in order to
identify the row in Address
to be loaded; similarly, shipping_address_id
is used for the shipping_address
relationship. The linkage of the two
columns also plays a role during persistence; the newly generated primary key
of a just-inserted Address
object will be copied into the appropriate
foreign key column of an associated Customer
object during a flush.
When specifying foreign_keys
with Declarative, we can also use string
names to specify, however it is important that if using a list, the list
is part of the string:
billing_address = relationship("Address", foreign_keys="[Customer.billing_address_id]")
In this specific example, the list is not necessary in any case as there’s only
one Column
we need:
billing_address = relationship("Address", foreign_keys="Customer.billing_address_id")
警告
When passed as a Python-evaluable string, the
relationship.foreign_keys
argument is interpreted using Python’s
eval()
function. DO NOT PASS UNTRUSTED INPUT TO THIS STRING. See
Evaluation of relationship arguments for details on declarative
evaluation of relationship()
arguments.
指定备用连接条件¶
Specifying Alternate Join Conditions
relationship()
在构建连接时的默认行为是将一侧主键列的值等同于另一侧引用外键的列的值。我们可以使用 relationship.primaryjoin
参数(以及在使用“secondary”表时使用的 relationship.secondaryjoin
参数)将此标准更改为我们想要的任何内容。
在下面的示例中,使用 User
类以及存储街道地址的 Address
类,我们创建一个关系 boston_addresses
,它只加载那些指定城市为“Boston”的 Address
对象:
from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "user"
id = mapped_column(Integer, primary_key=True)
name = mapped_column(String)
boston_addresses = relationship(
"Address",
primaryjoin="and_(User.id==Address.user_id, Address.city=='Boston')",
)
class Address(Base):
__tablename__ = "address"
id = mapped_column(Integer, primary_key=True)
user_id = mapped_column(Integer, ForeignKey("user.id"))
street = mapped_column(String)
city = mapped_column(String)
state = mapped_column(String)
zip = mapped_column(String)
在这个字符串 SQL 表达式中,我们使用了 and_()
连接构造来建立两个不同的连接条件谓词 - 将 User.id
和 Address.user_id
列连接到一起,并将 Address
中的行限制为仅 city='Boston'
。在使用声明式时,基本的 SQL 函数如 and_()
在字符串 relationship()
参数的评估命名空间中自动可用。
警告
当作为 Python 可评估字符串传递时,relationship.primaryjoin
参数使用 Python 的 eval()
函数进行解释。 不要将不受信任的输入传递给此字符串。有关声明式评估 relationship()
参数的详细信息,请参见 Evaluation of relationship arguments。
我们在 relationship.primaryjoin
中使用的自定义条件通常仅在 SQLAlchemy 渲染 SQL 以加载或表示此关系时才重要。也就是说,它用于发出 SQL 语句以执行每个属性的延迟加载,或在查询时构建连接时使用,例如通过 Select.join()
,或通过“joined”或“subquery”风格的预加载。当正在操作内存中的对象时,我们可以将任何我们想要的 Address
对象放入 boston_addresses
集合中,而不管 .city
属性的值是什么。对象将保持在集合中,直到属性过期并从数据库中重新加载应用的条件。当发生刷新时, boston_addresses
内的对象将无条件刷新,将主键 user.id
列的值分配到每行的持有外键的 address.user_id
列中。 city
条件在这里没有效果,因为刷新过程只关心将主键值同步到引用的外键值中。
The default behavior of relationship()
when constructing a join
is that it equates the value of primary key columns
on one side to that of foreign-key-referring columns on the other.
We can change this criterion to be anything we’d like using the
relationship.primaryjoin
argument, as well as the relationship.secondaryjoin
argument in the case when a “secondary” table is used.
In the example below, using the User
class
as well as an Address
class which stores a street address, we
create a relationship boston_addresses
which will only
load those Address
objects which specify a city of “Boston”:
from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "user"
id = mapped_column(Integer, primary_key=True)
name = mapped_column(String)
boston_addresses = relationship(
"Address",
primaryjoin="and_(User.id==Address.user_id, Address.city=='Boston')",
)
class Address(Base):
__tablename__ = "address"
id = mapped_column(Integer, primary_key=True)
user_id = mapped_column(Integer, ForeignKey("user.id"))
street = mapped_column(String)
city = mapped_column(String)
state = mapped_column(String)
zip = mapped_column(String)
Within this string SQL expression, we made use of the and_()
conjunction
construct to establish two distinct predicates for the join condition - joining
both the User.id
and Address.user_id
columns to each other, as well as
limiting rows in Address
to just city='Boston'
. When using
Declarative, rudimentary SQL functions like and_()
are automatically
available in the evaluated namespace of a string relationship()
argument.
警告
When passed as a Python-evaluable string, the
relationship.primaryjoin
argument is interpreted using
Python’s
eval()
function. DO NOT PASS UNTRUSTED INPUT TO THIS STRING. See
Evaluation of relationship arguments for details on declarative
evaluation of relationship()
arguments.
The custom criteria we use in a relationship.primaryjoin
is generally only significant when SQLAlchemy is rendering SQL in
order to load or represent this relationship. That is, it’s used in
the SQL statement that’s emitted in order to perform a per-attribute
lazy load, or when a join is constructed at query time, such as via
Select.join()
, or via the eager “joined” or “subquery” styles of
loading. When in-memory objects are being manipulated, we can place
any Address
object we’d like into the boston_addresses
collection, regardless of what the value of the .city
attribute
is. The objects will remain present in the collection until the
attribute is expired and re-loaded from the database where the
criterion is applied. When a flush occurs, the objects inside of
boston_addresses
will be flushed unconditionally, assigning value
of the primary key user.id
column onto the foreign-key-holding
address.user_id
column for each row. The city
criteria has no
effect here, as the flush process only cares about synchronizing
primary key values into referencing foreign key values.
创建自定义外部条件¶
Creating Custom Foreign Conditions
主连接条件的另一个元素是如何确定那些被认为是“外键”的列。通常,某些 Column
对象将指定 ForeignKey
,或者是与连接条件相关的 ForeignKeyConstraint
的一部分。relationship()
会查看这种外键状态,因为它决定了如何为此关系加载和持久化数据。然而,relationship.primaryjoin
参数可以用来创建不涉及任何“schema”级别外键的连接条件。我们可以结合使用 relationship.primaryjoin
以及 relationship.foreign_keys
和 relationship.remote_side
显式地建立这样的连接。
在下面的示例中,一个类 HostEntry
自引用连接,将字符串 content
列等同于 ip_address
列,这是一个名为 INET
的 PostgreSQL 类型。我们需要使用 cast()
将连接的一侧强制转换为另一侧的类型:
from sqlalchemy import cast, String, Column, Integer
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import INET
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class HostEntry(Base):
__tablename__ = "host_entry"
id = mapped_column(Integer, primary_key=True)
ip_address = mapped_column(INET)
content = mapped_column(String(50))
# relationship() 使用显式 foreign_keys 和 remote_side
parent_host = relationship(
"HostEntry",
primaryjoin=ip_address == cast(content, INET),
foreign_keys=content,
remote_side=ip_address,
)
上述关系将产生如下连接:
SELECT host_entry.id, host_entry.ip_address, host_entry.content
FROM host_entry JOIN host_entry AS host_entry_1
ON host_entry_1.ip_address = CAST(host_entry.content AS INET)
上述的另一种语法是在线使用 foreign()
和 remote()
annotations,在 relationship.primaryjoin
表达式中。这种语法表示 relationship()
通常自行应用于连接条件的注释,给定 relationship.foreign_keys
和 relationship.remote_side
参数。当存在显式连接条件时,这些函数可能更简洁,并且还标记了“外键”或“远程”的确切列,而不管该列是否多次声明或在复杂 SQL 表达式中:
from sqlalchemy.orm import foreign, remote
class HostEntry(Base):
__tablename__ = "host_entry"
id = mapped_column(Integer, primary_key=True)
ip_address = mapped_column(INET)
content = mapped_column(String(50))
# relationship() 使用显式 foreign() 和 remote() 注释
# 代替单独的参数
parent_host = relationship(
"HostEntry",
primaryjoin=remote(ip_address) == cast(foreign(content), INET),
)
Another element of the primary join condition is how those columns
considered “foreign” are determined. Usually, some subset
of Column
objects will specify ForeignKey
, or otherwise
be part of a ForeignKeyConstraint
that’s relevant to the join condition.
relationship()
looks to this foreign key status as it decides
how it should load and persist data for this relationship. However, the
relationship.primaryjoin
argument can be used to create a join condition that
doesn’t involve any “schema” level foreign keys. We can combine relationship.primaryjoin
along with relationship.foreign_keys
and relationship.remote_side
explicitly in order to
establish such a join.
Below, a class HostEntry
joins to itself, equating the string content
column to the ip_address
column, which is a PostgreSQL type called INET
.
We need to use cast()
in order to cast one side of the join to the
type of the other:
from sqlalchemy import cast, String, Column, Integer
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import INET
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class HostEntry(Base):
__tablename__ = "host_entry"
id = mapped_column(Integer, primary_key=True)
ip_address = mapped_column(INET)
content = mapped_column(String(50))
# relationship() using explicit foreign_keys, remote_side
parent_host = relationship(
"HostEntry",
primaryjoin=ip_address == cast(content, INET),
foreign_keys=content,
remote_side=ip_address,
)
The above relationship will produce a join like:
SELECT host_entry.id, host_entry.ip_address, host_entry.content
FROM host_entry JOIN host_entry AS host_entry_1
ON host_entry_1.ip_address = CAST(host_entry.content AS INET)
An alternative syntax to the above is to use the foreign()
and
remote()
annotations,
inline within the relationship.primaryjoin
expression.
This syntax represents the annotations that relationship()
normally
applies by itself to the join condition given the relationship.foreign_keys
and
relationship.remote_side
arguments. These functions may
be more succinct when an explicit join condition is present, and additionally
serve to mark exactly the column that is “foreign” or “remote” independent
of whether that column is stated multiple times or within complex
SQL expressions:
from sqlalchemy.orm import foreign, remote
class HostEntry(Base):
__tablename__ = "host_entry"
id = mapped_column(Integer, primary_key=True)
ip_address = mapped_column(INET)
content = mapped_column(String(50))
# relationship() using explicit foreign() and remote() annotations
# in lieu of separate arguments
parent_host = relationship(
"HostEntry",
primaryjoin=remote(ip_address) == cast(foreign(content), INET),
)
在连接条件中使用自定义运算符¶
Using custom operators in join conditions
关系的另一个用例是使用自定义运算符,例如在与 INET
和 CIDR
类型连接时使用 PostgreSQL 的“包含于” <<
运算符。对于自定义布尔运算符,我们使用 Operators.bool_op()
函数:
inet_column.bool_op("<<")(cidr_column)
如上所述的比较可以直接与 relationship.primaryjoin
一起使用,当构建 relationship()
时:
class IPA(Base):
__tablename__ = "ip_address"
id = mapped_column(Integer, primary_key=True)
v4address = mapped_column(INET)
network = relationship(
"Network",
primaryjoin="IPA.v4address.bool_op('<<')(foreign(Network.v4representation))",
viewonly=True,
)
class Network(Base):
__tablename__ = "network"
id = mapped_column(Integer, primary_key=True)
v4representation = mapped_column(CIDR)
如上所述的查询:
select(IPA).join(IPA.network)
将渲染为:
SELECT ip_address.id AS ip_address_id, ip_address.v4address AS ip_address_v4address
FROM ip_address JOIN network ON ip_address.v4address << network.v4representation
Another use case for relationships is the use of custom operators, such
as PostgreSQL’s “is contained within” <<
operator when joining with
types such as INET
and CIDR
.
For custom boolean operators we use the Operators.bool_op()
function:
inet_column.bool_op("<<")(cidr_column)
A comparison like the above may be used directly with
relationship.primaryjoin
when constructing
a relationship()
:
class IPA(Base):
__tablename__ = "ip_address"
id = mapped_column(Integer, primary_key=True)
v4address = mapped_column(INET)
network = relationship(
"Network",
primaryjoin="IPA.v4address.bool_op('<<')(foreign(Network.v4representation))",
viewonly=True,
)
class Network(Base):
__tablename__ = "network"
id = mapped_column(Integer, primary_key=True)
v4representation = mapped_column(CIDR)
Above, a query such as:
select(IPA).join(IPA.network)
Will render as:
SELECT ip_address.id AS ip_address_id, ip_address.v4address AS ip_address_v4address
FROM ip_address JOIN network ON ip_address.v4address << network.v4representation
基于 SQL 函数的自定义运算符¶
Custom operators based on SQL functions
:param:`~.Operators.op.is_comparison` 的用例变体是当我们不使用运算符,而是使用 SQL 函数时。此用例的典型示例是 PostgreSQL PostGIS 函数,但任何解析为二进制条件的数据库上的任何 SQL 函数都可以应用。为了适应这种用例,FunctionElement.as_comparison()
方法可以修改任何 SQL 函数,例如从 func
命名空间调用的那些,以向 ORM 表明函数生成两个表达式的比较。下面的示例使用了 Geoalchemy2 库:
from geoalchemy2 import Geometry
from sqlalchemy import Column, Integer, func
from sqlalchemy.orm import relationship, foreign
class Polygon(Base):
__tablename__ = "polygon"
id = mapped_column(Integer, primary_key=True)
geom = mapped_column(Geometry("POLYGON", srid=4326))
points = relationship(
"Point",
primaryjoin="func.ST_Contains(foreign(Polygon.geom), Point.geom).as_comparison(1, 2)",
viewonly=True,
)
class Point(Base):
__tablename__ = "point"
id = mapped_column(Integer, primary_key=True)
geom = mapped_column(Geometry("POINT", srid=4326))
上面,FunctionElement.as_comparison()
表示 func.ST_Contains()
SQL 函数正在比较 Polygon.geom
和 Point.geom
表达式。foreign()
注释另外指出在此特定关系中哪个列承担“外键”角色。
A variant to the use case for Operators.op.is_comparison
is
when we aren’t using an operator, but a SQL function. The typical example
of this use case is the PostgreSQL PostGIS functions however any SQL
function on any database that resolves to a binary condition may apply.
To suit this use case, the FunctionElement.as_comparison()
method
can modify any SQL function, such as those invoked from the func
namespace, to indicate to the ORM that the function produces a comparison of
two expressions. The below example illustrates this with the
Geoalchemy2 library:
from geoalchemy2 import Geometry
from sqlalchemy import Column, Integer, func
from sqlalchemy.orm import relationship, foreign
class Polygon(Base):
__tablename__ = "polygon"
id = mapped_column(Integer, primary_key=True)
geom = mapped_column(Geometry("POLYGON", srid=4326))
points = relationship(
"Point",
primaryjoin="func.ST_Contains(foreign(Polygon.geom), Point.geom).as_comparison(1, 2)",
viewonly=True,
)
class Point(Base):
__tablename__ = "point"
id = mapped_column(Integer, primary_key=True)
geom = mapped_column(Geometry("POINT", srid=4326))
Above, the FunctionElement.as_comparison()
indicates that the
func.ST_Contains()
SQL function is comparing the Polygon.geom
and
Point.geom
expressions. The foreign()
annotation additionally notes
which column takes on the “foreign key” role in this particular relationship.
重叠外键¶
Overlapping Foreign Keys
在使用复合外键时可能会出现一种罕见的情况,即单个列可能是通过外键约束引用的多个列的主题。
考虑一个(确实复杂的)映射,例如 Magazine
对象,通过复合主键方案由 Writer
对象和 Article
对象引用,两者都包括 magazine_id
;然后为了使 Article
也引用 Writer
, Article.magazine_id
参与了两个单独的关系; Article.magazine
和 Article.writer
:
class Magazine(Base):
__tablename__ = "magazine"
id = mapped_column(Integer, primary_key=True)
class Article(Base):
__tablename__ = "article"
article_id = mapped_column(Integer)
magazine_id = mapped_column(ForeignKey("magazine.id"))
writer_id = mapped_column()
magazine = relationship("Magazine")
writer = relationship("Writer")
__table_args__ = (
PrimaryKeyConstraint("article_id", "magazine_id"),
ForeignKeyConstraint(
["writer_id", "magazine_id"], ["writer.id", "writer.magazine_id"]
),
)
class Writer(Base):
__tablename__ = "writer"
id = mapped_column(Integer, primary_key=True)
magazine_id = mapped_column(ForeignKey("magazine.id"), primary_key=True)
magazine = relationship("Magazine")
配置上述映射时,我们会看到发出的警告:
SAWarning: relationship 'Article.writer' will copy column
writer.magazine_id to column article.magazine_id,
which conflicts with relationship(s): 'Article.magazine'
(copies magazine.id to article.magazine_id). Consider applying
viewonly=True to read-only relationships, or provide a primaryjoin
condition marking writable columns with the foreign() annotation.
这源于这样一个事实,即 Article.magazine_id
是两个不同外键约束的主题;它直接作为源列引用 Magazine.id
,但在引用 Writer
的复合键的上下文中也作为源列引用 Writer.magazine_id
。如果我们将一个 Article
与一个特定的 Magazine
关联起来,但随后将 Article
与一个与 不同 的 Magazine
关联的 Writer
关联,那么 ORM 将以非确定性的方式覆盖 Article.magazine_id
,静默地更改我们引用的杂志;如果我们将 Writer
与 Article
取消关联,它也可能尝试将 NULL 放入此列中。警告让我们知道这是事实。
为了解决这个问题,我们需要将 Article
的行为分解为包括以下三个特性:
Article
首先根据仅在Article.magazine
关系中持久化的数据写入Article.magazine_id
,即从Magazine.id
复制的值。Article
可以代表在Article.writer
关系中持久化的数据写入Article.writer_id
,但仅写入Writer.id
列;Writer.magazine_id
列不应该写入Article.magazine_id
,因为它最终来自Magazine.id
。Article
在加载Article.writer
时会考虑Article.magazine_id
,尽管它*不会*代表此关系写入它。
为了实现 #1 和 #2,我们可以仅将 Article.writer_id
指定为 Article.writer
的“外键”:
class Article(Base):
# ...
writer = relationship("Writer", foreign_keys="Article.writer_id")
然而,这样做的效果是 Article.writer
在查询 Writer
时不考虑 Article.magazine_id
:
SELECT article.article_id AS article_article_id,
article.magazine_id AS article_magazine_id,
article.writer_id AS article_writer_id
FROM article
JOIN writer ON writer.id = article.writer_id
因此,为了实现 #1、#2 和 #3,我们可以通过完全表达连接条件以及要写入的列来实现,结合使用 relationship.primaryjoin
,以及 relationship.foreign_keys
参数,或者更简洁地通过注释 foreign()
:
class Article(Base):
# ...
writer = relationship(
"Writer",
primaryjoin="and_(Writer.id == foreign(Article.writer_id), "
"Writer.magazine_id == Article.magazine_id)",
)
A rare scenario can arise when composite foreign keys are used, such that a single column may be the subject of more than one column referred to via foreign key constraint.
Consider an (admittedly complex) mapping such as the Magazine
object,
referred to both by the Writer
object and the Article
object
using a composite primary key scheme that includes magazine_id
for both; then to make Article
refer to Writer
as well,
Article.magazine_id
is involved in two separate relationships;
Article.magazine
and Article.writer
:
class Magazine(Base):
__tablename__ = "magazine"
id = mapped_column(Integer, primary_key=True)
class Article(Base):
__tablename__ = "article"
article_id = mapped_column(Integer)
magazine_id = mapped_column(ForeignKey("magazine.id"))
writer_id = mapped_column()
magazine = relationship("Magazine")
writer = relationship("Writer")
__table_args__ = (
PrimaryKeyConstraint("article_id", "magazine_id"),
ForeignKeyConstraint(
["writer_id", "magazine_id"], ["writer.id", "writer.magazine_id"]
),
)
class Writer(Base):
__tablename__ = "writer"
id = mapped_column(Integer, primary_key=True)
magazine_id = mapped_column(ForeignKey("magazine.id"), primary_key=True)
magazine = relationship("Magazine")
When the above mapping is configured, we will see this warning emitted:
SAWarning: relationship 'Article.writer' will copy column
writer.magazine_id to column article.magazine_id,
which conflicts with relationship(s): 'Article.magazine'
(copies magazine.id to article.magazine_id). Consider applying
viewonly=True to read-only relationships, or provide a primaryjoin
condition marking writable columns with the foreign() annotation.
What this refers to originates from the fact that Article.magazine_id
is
the subject of two different foreign key constraints; it refers to
Magazine.id
directly as a source column, but also refers to
Writer.magazine_id
as a source column in the context of the
composite key to Writer
. If we associate an Article
with a
particular Magazine
, but then associate the Article
with a
Writer
that’s associated with a different Magazine
, the ORM
will overwrite Article.magazine_id
non-deterministically, silently
changing which magazine to which we refer; it may
also attempt to place NULL into this column if we de-associate a
Writer
from an Article
. The warning lets us know this is the case.
To solve this, we need to break out the behavior of Article
to include
all three of the following features:
Article
first and foremost writes toArticle.magazine_id
based on data persisted in theArticle.magazine
relationship only, that is a value copied fromMagazine.id
.Article
can write toArticle.writer_id
on behalf of data persisted in theArticle.writer
relationship, but only theWriter.id
column; theWriter.magazine_id
column should not be written intoArticle.magazine_id
as it ultimately is sourced fromMagazine.id
.Article
takesArticle.magazine_id
into account when loadingArticle.writer
, even though it doesn’t write to it on behalf of this relationship.
To get just #1 and #2, we could specify only Article.writer_id
as the
“foreign keys” for Article.writer
:
class Article(Base):
# ...
writer = relationship("Writer", foreign_keys="Article.writer_id")
However, this has the effect of Article.writer
not taking
Article.magazine_id
into account when querying against Writer
:
SELECT article.article_id AS article_article_id,
article.magazine_id AS article_magazine_id,
article.writer_id AS article_writer_id
FROM article
JOIN writer ON writer.id = article.writer_id
Therefore, to get at all of #1, #2, and #3, we express the join condition
as well as which columns to be written by combining
relationship.primaryjoin
fully, along with either the
relationship.foreign_keys
argument, or more succinctly by
annotating with foreign()
:
class Article(Base):
# ...
writer = relationship(
"Writer",
primaryjoin="and_(Writer.id == foreign(Article.writer_id), "
"Writer.magazine_id == Article.magazine_id)",
)
非关系比较/物化路径¶
Non-relational Comparisons / Materialized Path
警告
本节详细介绍了一个实验性功能。
使用自定义表达式意味着我们可以生成不遵循通常主键/外键模型的不正统连接条件。一个这样的例子是物化路径模式,我们比较字符串以获得重叠的路径标记,从而生成树结构。
通过谨慎使用 foreign()
和 remote()
,我们可以构建一个有效地生成基本物化路径系统的关系。本质上,当 foreign()
和 remote()
在比较表达式的*同一*侧时,关系被认为是“一对多”;当它们在 不同 侧时,关系被认为是“多对一”。对于这里使用的比较,我们将处理集合,因此我们将其配置为“一对多”:
- class Element(Base):
__tablename__ = “element”
path = mapped_column(String, primary_key=True)
- descendants = relationship(
“Element”, primaryjoin=remote(foreign(path)).like(path.concat(“/%”)), viewonly=True, order_by=path,
)
上面,如果给定一个路径属性为 "/foo/bar2"
的 Element
对象,我们希望加载 Element.descendants
看起来像:
SELECT element.path AS element_path
FROM element
WHERE element.path LIKE ('/foo/bar2' || '/%') ORDER BY element.path
警告
this section details an experimental feature.
Using custom expressions means we can produce unorthodox join conditions that don’t obey the usual primary/foreign key model. One such example is the materialized path pattern, where we compare strings for overlapping path tokens in order to produce a tree structure.
Through careful use of foreign()
and remote()
, we can build
a relationship that effectively produces a rudimentary materialized path
system. Essentially, when foreign()
and remote()
are
on the same side of the comparison expression, the relationship is considered
to be “one to many”; when they are on different sides, the relationship
is considered to be “many to one”. For the comparison we’ll use here,
we’ll be dealing with collections so we keep things configured as “one to many”:
class Element(Base):
__tablename__ = "element"
path = mapped_column(String, primary_key=True)
descendants = relationship(
"Element",
primaryjoin=remote(foreign(path)).like(path.concat("/%")),
viewonly=True,
order_by=path,
)
Above, if given an Element
object with a path attribute of "/foo/bar2"
,
we seek for a load of Element.descendants
to look like:
SELECT element.path AS element_path
FROM element
WHERE element.path LIKE ('/foo/bar2' || '/%') ORDER BY element.path
自引用多对多关系¶
Self-Referential Many-to-Many Relationship
多对多关系可以通过 relationship.primaryjoin
和 relationship.secondaryjoin
之一或两者进行定制 - 后者对于使用 relationship.secondary
参数指定多对多引用的关系非常重要。
一种涉及使用 relationship.primaryjoin
和 relationship.secondaryjoin
的常见情况是建立一个从类到自身的多对多关系,如下所示:
from typing import List
from sqlalchemy import Integer, ForeignKey, Column, Table
from sqlalchemy.orm import DeclarativeBase, Mapped
from sqlalchemy.orm import mapped_column, relationship
class Base(DeclarativeBase):
pass
node_to_node = Table(
"node_to_node",
Base.metadata,
Column("left_node_id", Integer, ForeignKey("node.id"), primary_key=True),
Column("right_node_id", Integer, ForeignKey("node.id"), primary_key=True),
)
class Node(Base):
__tablename__ = "node"
id: Mapped[int] = mapped_column(primary_key=True)
label: Mapped[str]
right_nodes: Mapped[List["Node"]] = relationship(
"Node",
secondary=node_to_node,
primaryjoin=id == node_to_node.c.left_node_id,
secondaryjoin=id == node_to_node.c.right_node_id,
back_populates="left_nodes",
)
left_nodes: Mapped[List["Node"]] = relationship(
"Node",
secondary=node_to_node,
primaryjoin=id == node_to_node.c.right_node_id,
secondaryjoin=id == node_to_node.c.left_node_id,
back_populates="right_nodes",
)
如上所示,SQLAlchemy 无法自动知道哪些列应该连接到 right_nodes
和 left_nodes
关系。relationship.primaryjoin
和 relationship.secondaryjoin
参数建立了我们希望如何连接到关联表。
在上面的声明形式中,由于我们在与 Node
类对应的 Python 块中声明了这些条件,所以 id
变量可以直接作为我们希望连接的 Column
对象使用。
或者,我们可以使用字符串定义 relationship.primaryjoin
和 relationship.secondaryjoin
参数,这在我们的配置中尚未提供 Node.id
列对象或 node_to_node
表可能尚未可用的情况下是合适的。
当在声明字符串中引用普通的 Table
对象时,我们使用表在 MetaData
中的字符串名称:
class Node(Base):
__tablename__ = "node"
id = mapped_column(Integer, primary_key=True)
label = mapped_column(String)
right_nodes = relationship(
"Node",
secondary="node_to_node",
primaryjoin="Node.id==node_to_node.c.left_node_id",
secondaryjoin="Node.id==node_to_node.c.right_node_id",
backref="left_nodes",
)
警告
当作为 Python 可评估字符串传递时,relationship.primaryjoin
和 relationship.secondaryjoin
参数使用 Python 的 eval()
函数进行解释。 不要将不受信任的输入传递给这些字符串。有关声明式评估 relationship()
参数的详细信息,请参见 Evaluation of relationship arguments。
这里的经典映射情况类似,其中 node_to_node
可以连接到 node.c.id
:
from sqlalchemy import Integer, ForeignKey, String, Column, Table, MetaData
from sqlalchemy.orm import relationship, registry
metadata_obj = MetaData()
mapper_registry = registry()
node_to_node = Table(
"node_to_node",
metadata_obj,
Column("left_node_id", Integer, ForeignKey("node.id"), primary_key=True),
Column("right_node_id", Integer, ForeignKey("node.id"), primary_key=True),
)
node = Table(
"node",
metadata_obj,
Column("id", Integer, primary_key=True),
Column("label", String),
)
class Node:
pass
mapper_registry.map_imperatively(
Node,
node,
properties={
"right_nodes": relationship(
Node,
secondary=node_to_node,
primaryjoin=node.c.id == node_to_node.c.left_node_id,
secondaryjoin=node.c.id == node_to_node.c.right_node_id,
backref="left_nodes",
)
},
)
请注意,在这两个示例中,relationship.backref
关键字指定了一个 left_nodes
反向引用 - 当 relationship()
以相反方向创建第二个关系时,它足够智能地反转 relationship.primaryjoin
和 relationship.secondaryjoin
参数。
参见
This section documents a two-table variant of the “adjacency list” pattern, which is documented at 邻接表关系. Be sure to review the self-referential querying patterns in subsections 自引用查询策略 and 配置自引用预加载 which apply equally well to the mapping pattern discussed here.
Many to many relationships can be customized by one or both of relationship.primaryjoin
and relationship.secondaryjoin
- the latter is significant for a relationship that
specifies a many-to-many reference using the relationship.secondary
argument.
A common situation which involves the usage of relationship.primaryjoin
and relationship.secondaryjoin
is when establishing a many-to-many relationship from a class to itself, as shown below:
from typing import List
from sqlalchemy import Integer, ForeignKey, Column, Table
from sqlalchemy.orm import DeclarativeBase, Mapped
from sqlalchemy.orm import mapped_column, relationship
class Base(DeclarativeBase):
pass
node_to_node = Table(
"node_to_node",
Base.metadata,
Column("left_node_id", Integer, ForeignKey("node.id"), primary_key=True),
Column("right_node_id", Integer, ForeignKey("node.id"), primary_key=True),
)
class Node(Base):
__tablename__ = "node"
id: Mapped[int] = mapped_column(primary_key=True)
label: Mapped[str]
right_nodes: Mapped[List["Node"]] = relationship(
"Node",
secondary=node_to_node,
primaryjoin=id == node_to_node.c.left_node_id,
secondaryjoin=id == node_to_node.c.right_node_id,
back_populates="left_nodes",
)
left_nodes: Mapped[List["Node"]] = relationship(
"Node",
secondary=node_to_node,
primaryjoin=id == node_to_node.c.right_node_id,
secondaryjoin=id == node_to_node.c.left_node_id,
back_populates="right_nodes",
)
Where above, SQLAlchemy can’t know automatically which columns should connect
to which for the right_nodes
and left_nodes
relationships. The relationship.primaryjoin
and relationship.secondaryjoin
arguments establish how we’d like to join to the association table.
In the Declarative form above, as we are declaring these conditions within the Python
block that corresponds to the Node
class, the id
variable is available directly
as the Column
object we wish to join with.
Alternatively, we can define the relationship.primaryjoin
and relationship.secondaryjoin
arguments using strings, which is suitable
in the case that our configuration does not have either the Node.id
column
object available yet or the node_to_node
table perhaps isn’t yet available.
When referring to a plain Table
object in a declarative string, we
use the string name of the table as it is present in the MetaData
:
class Node(Base):
__tablename__ = "node"
id = mapped_column(Integer, primary_key=True)
label = mapped_column(String)
right_nodes = relationship(
"Node",
secondary="node_to_node",
primaryjoin="Node.id==node_to_node.c.left_node_id",
secondaryjoin="Node.id==node_to_node.c.right_node_id",
backref="left_nodes",
)
警告
When passed as a Python-evaluable string, the
relationship.primaryjoin
and
relationship.secondaryjoin
arguments are interpreted using
Python’s eval()
function. DO NOT PASS UNTRUSTED INPUT TO THESE
STRINGS. See Evaluation of relationship arguments for details on
declarative evaluation of relationship()
arguments.
A classical mapping situation here is similar, where node_to_node
can be joined
to node.c.id
:
from sqlalchemy import Integer, ForeignKey, String, Column, Table, MetaData
from sqlalchemy.orm import relationship, registry
metadata_obj = MetaData()
mapper_registry = registry()
node_to_node = Table(
"node_to_node",
metadata_obj,
Column("left_node_id", Integer, ForeignKey("node.id"), primary_key=True),
Column("right_node_id", Integer, ForeignKey("node.id"), primary_key=True),
)
node = Table(
"node",
metadata_obj,
Column("id", Integer, primary_key=True),
Column("label", String),
)
class Node:
pass
mapper_registry.map_imperatively(
Node,
node,
properties={
"right_nodes": relationship(
Node,
secondary=node_to_node,
primaryjoin=node.c.id == node_to_node.c.left_node_id,
secondaryjoin=node.c.id == node_to_node.c.right_node_id,
backref="left_nodes",
)
},
)
Note that in both examples, the relationship.backref
keyword specifies a left_nodes
backref - when
relationship()
creates the second relationship in the reverse
direction, it’s smart enough to reverse the
relationship.primaryjoin
and
relationship.secondaryjoin
arguments.
复合“次要”连接¶
Composite “Secondary” Joins
备注
本节介绍了 SQLAlchemy 支持的一些极端情况,但建议尽可能通过使用合理的关系布局和/或 in-Python attributes 以更简单的方式解决这些问题。
有时,当需要在两个表之间建立 relationship()
时,需要涉及两个或三个以上的表才能将它们连接起来。这是一个 relationship()
的领域,试图突破可能的界限,许多这些特殊用例的最终解决方案通常需要在 SQLAlchemy 邮件列表中讨论。
在较新的 SQLAlchemy 版本中,可以在某些情况下使用 relationship.secondary
参数来提供由多个表组成的复合目标。以下是这样的连接条件的示例(要求至少版本 0.9.2 才能按原样运行):
class A(Base):
__tablename__ = "a"
id = mapped_column(Integer, primary_key=True)
b_id = mapped_column(ForeignKey("b.id"))
d = relationship(
"D",
secondary="join(B, D, B.d_id == D.id).join(C, C.d_id == D.id)",
primaryjoin="and_(A.b_id == B.id, A.id == C.a_id)",
secondaryjoin="D.id == B.d_id",
uselist=False,
viewonly=True,
)
class B(Base):
__tablename__ = "b"
id = mapped_column(Integer, primary_key=True)
d_id = mapped_column(ForeignKey("d.id"))
class C(Base):
__tablename__ = "c"
id = mapped_column(Integer, primary_key=True)
a_id = mapped_column(ForeignKey("a.id"))
d_id = mapped_column(ForeignKey("d.id"))
class D(Base):
__tablename__ = "d"
id = mapped_column(Integer, primary_key=True)
在上面的示例中,我们以声明方式提供了 relationship.secondary
、relationship.primaryjoin
和 relationship.secondaryjoin
,直接引用命名表 a
、 b
、 c
、 d
。从 A
到 D
的查询如下所示:
sess.scalars(select(A).join(A.d)).all()
SELECT a.id AS a_id, a.b_id AS a_b_id
FROM a JOIN (
b AS b_1 JOIN d AS d_1 ON b_1.d_id = d_1.id
JOIN c AS c_1 ON c_1.d_id = d_1.id)
ON a.b_id = b_1.id AND a.id = c_1.a_id JOIN d ON d.id = b_1.d_id
在上面的示例中,我们利用能够将多个表放入“secondary”容器的优势,以便我们可以跨多个表进行连接,同时保持 relationship()
的“简单性”,因为在“左侧”和“右侧”都有“一个”表;复杂性保持在中间。
警告
像上面这样的关系通常标记为 viewonly=True
,使用 relationship.viewonly
,并应被视为只读。尽管有时可以使这样的关系可写,但这通常很复杂且容易出错。
备注
This section features far edge cases that are somewhat supported by SQLAlchemy, however it is recommended to solve problems like these in simpler ways whenever possible, by using reasonable relational layouts and / or in-Python attributes.
Sometimes, when one seeks to build a relationship()
between two tables
there is a need for more than just two or three tables to be involved in
order to join them. This is an area of relationship()
where one seeks
to push the boundaries of what’s possible, and often the ultimate solution to
many of these exotic use cases needs to be hammered out on the SQLAlchemy mailing
list.
In more recent versions of SQLAlchemy, the relationship.secondary
parameter can be used in some of these cases in order to provide a composite
target consisting of multiple tables. Below is an example of such a
join condition (requires version 0.9.2 at least to function as is):
class A(Base):
__tablename__ = "a"
id = mapped_column(Integer, primary_key=True)
b_id = mapped_column(ForeignKey("b.id"))
d = relationship(
"D",
secondary="join(B, D, B.d_id == D.id).join(C, C.d_id == D.id)",
primaryjoin="and_(A.b_id == B.id, A.id == C.a_id)",
secondaryjoin="D.id == B.d_id",
uselist=False,
viewonly=True,
)
class B(Base):
__tablename__ = "b"
id = mapped_column(Integer, primary_key=True)
d_id = mapped_column(ForeignKey("d.id"))
class C(Base):
__tablename__ = "c"
id = mapped_column(Integer, primary_key=True)
a_id = mapped_column(ForeignKey("a.id"))
d_id = mapped_column(ForeignKey("d.id"))
class D(Base):
__tablename__ = "d"
id = mapped_column(Integer, primary_key=True)
In the above example, we provide all three of relationship.secondary
,
relationship.primaryjoin
, and relationship.secondaryjoin
,
in the declarative style referring to the named tables a
, b
, c
, d
directly. A query from A
to D
looks like:
sess.scalars(select(A).join(A.d)).all()
SELECT a.id AS a_id, a.b_id AS a_b_id
FROM a JOIN (
b AS b_1 JOIN d AS d_1 ON b_1.d_id = d_1.id
JOIN c AS c_1 ON c_1.d_id = d_1.id)
ON a.b_id = b_1.id AND a.id = c_1.a_id JOIN d ON d.id = b_1.d_id
In the above example, we take advantage of being able to stuff multiple
tables into a “secondary” container, so that we can join across many
tables while still keeping things “simple” for relationship()
, in that
there’s just “one” table on both the “left” and the “right” side; the
complexity is kept within the middle.
警告
A relationship like the above is typically marked as viewonly=True
, using relationship.viewonly
, and should be considered as read-only. While there are sometimes ways to make relationships like the above writable, this is generally complicated and error prone.
与别名类的关系¶
Relationship to Aliased Class
在上一节中,我们展示了一种技术,其中使用 relationship.secondary
将附加表放入连接条件中。有一种复杂的连接情况,即使这种技术也不足以解决;当我们希望从 A
连接到 B
时,使用任意数量的中间表 C
、D
等,但在 A
和 B
之间也有直接的连接条件。在这种情况下,从 A
到 B
的连接可能很难仅通过复杂的 relationship.primaryjoin
条件来表达,因为中间表可能需要特殊处理,并且不能通过 relationship.secondary
对象来表达,因为 A->secondary->B
模式不支持在 A
和 B
之间的任何直接引用。当遇到这种 极其高级 情况时,我们可以求助于创建第二个映射作为关系的目标。这就是我们使用 AliasedClass
来映射一个包含我们需要的所有附加表的类。为了产生这个类的“替代”映射,我们使用 aliased()
函数生成新的构造,然后对该对象使用 relationship()
,就像它是一个普通的映射类一样。
下面展示了一个简单的从 A
到 B
的 relationship()
连接,但是 primaryjoin 条件通过两个附加实体 C
和 D
得到了增强,这些实体的行也必须同时与 A
和 B
的行对齐:
class A(Base):
__tablename__ = "a"
id = mapped_column(Integer, primary_key=True)
b_id = mapped_column(ForeignKey("b.id"))
class B(Base):
__tablename__ = "b"
id = mapped_column(Integer, primary_key=True)
class C(Base):
__tablename__ = "c"
id = mapped_column(Integer, primary_key=True)
a_id = mapped_column(ForeignKey("a.id"))
some_c_value = mapped_column(String)
class D(Base):
__tablename__ = "d"
id = mapped_column(Integer, primary_key=True)
c_id = mapped_column(ForeignKey("c.id"))
b_id = mapped_column(ForeignKey("b.id"))
some_d_value = mapped_column(String)
# 1. 将 join() 设置为一个变量,以便我们可以在映射中多次引用它。
j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
# 2. 创建一个 AliasedClass 到 B
B_viacd = aliased(B, j, flat=True)
A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)
通过上述映射,一个简单的连接如下所示:
sess.scalars(select(A).join(A.b)).all()
SELECT a.id AS a_id, a.b_id AS a_b_id
FROM a JOIN (b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) ON a.b_id = b.id
In the previous section, we illustrated a technique where we used
relationship.secondary
in order to place additional
tables within a join condition. There is one complex join case where
even this technique is not sufficient; when we seek to join from A
to B
, making use of any number of C
, D
, etc. in between,
however there are also join conditions between A
and B
directly. In this case, the join from A
to B
may be
difficult to express with just a complex
relationship.primaryjoin
condition, as the intermediary
tables may need special handling, and it is also not expressible with
a relationship.secondary
object, since the
A->secondary->B
pattern does not support any references between
A
and B
directly. When this extremely advanced case
arises, we can resort to creating a second mapping as a target for the
relationship. This is where we use AliasedClass
in order to make a
mapping to a class that includes all the additional tables we need for
this join. In order to produce this mapper as an “alternative” mapping
for our class, we use the aliased()
function to produce the new
construct, then use relationship()
against the object as though it
were a plain mapped class.
Below illustrates a relationship()
with a simple join from A
to
B
, however the primaryjoin condition is augmented with two additional
entities C
and D
, which also must have rows that line up with
the rows in both A
and B
simultaneously:
class A(Base):
__tablename__ = "a"
id = mapped_column(Integer, primary_key=True)
b_id = mapped_column(ForeignKey("b.id"))
class B(Base):
__tablename__ = "b"
id = mapped_column(Integer, primary_key=True)
class C(Base):
__tablename__ = "c"
id = mapped_column(Integer, primary_key=True)
a_id = mapped_column(ForeignKey("a.id"))
some_c_value = mapped_column(String)
class D(Base):
__tablename__ = "d"
id = mapped_column(Integer, primary_key=True)
c_id = mapped_column(ForeignKey("c.id"))
b_id = mapped_column(ForeignKey("b.id"))
some_d_value = mapped_column(String)
# 1. set up the join() as a variable, so we can refer
# to it in the mapping multiple times.
j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
# 2. Create an AliasedClass to B
B_viacd = aliased(B, j, flat=True)
A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)
With the above mapping, a simple join looks like:
sess.scalars(select(A).join(A.b)).all()
SELECT a.id AS a_id, a.b_id AS a_b_id
FROM a JOIN (b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) ON a.b_id = b.id
将 AliasedClass 映射与类型集成并避免早期映射器配置¶
Integrating AliasedClass Mappings with Typing and Avoiding Early Mapper Configuration
创建 aliased()
构造针对映射类会强制 configure_mappers()
步骤进行,这将解析所有当前类及其关系。如果当前映射所需的无关映射类尚未声明,或者关系本身的配置需要访问尚未声明的类,这可能会有问题。此外,SQLAlchemy 的声明模式在关系声明在前时与 Python 类型最有效地协作。
要组织关系的构造以解决这些问题,可以使用配置级事件钩子,如 MapperEvents.before_mapper_configured()
,它将在所有映射准备好配置时调用配置代码:
from sqlalchemy import event
class A(Base):
__tablename__ = "a"
id = mapped_column(Integer, primary_key=True)
b_id = mapped_column(ForeignKey("b.id"))
@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
# 在配置钩子中进行上述配置
j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
B_viacd = aliased(B, j, flat=True)
A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)
如上所述,函数 _configure_ab_relationship()
仅在请求完全配置的 A
版本时调用,此时类 B
、D
和 C
将可用。
对于与内联类型集成的方法,可以使用类似的技术有效地生成用于别名类的“单例”创建模式,其中它作为全局变量进行延迟初始化,然后可以在关系内联中使用:
from typing import Any
B_viacd: Any = None
b_viacd_join: Any = None
class A(Base):
__tablename__ = "a"
id: Mapped[int] = mapped_column(primary_key=True)
b_id: Mapped[int] = mapped_column(ForeignKey("b.id"))
# 1. 使用 lambdas 声明关系,允许其解析为延迟配置的目标
b: Mapped[B] = relationship(
lambda: B_viacd, primaryjoin=lambda: A.b_id == b_viacd_join.c.b_id
)
# 2. 使用 before_mapper_configured 钩子配置关系的目标。
@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
# 3. 在配置钩子中将 join() 和 AliasedClass 设置为全局变量。
global B_viacd, b_viacd_join
b_viacd_join = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
B_viacd = aliased(B, b_viacd_join, flat=True)
The creation of the aliased()
construct against a mapped class
forces the configure_mappers()
step to proceed, which will resolve
all current classes and their relationships. This may be problematic if
unrelated mapped classes needed by the current mappings have not yet been
declared, or if the configuration of the relationship itself needs access
to as-yet undeclared classes. Additionally, SQLAlchemy’s Declarative pattern
works with Python typing most effectively when relationships are declared
up front.
To organize the construction of the relationship to work with these issues, a
configure level event hook like MapperEvents.before_mapper_configured()
may be used, which will invoke the configuration code only when all mappings
are ready for configuration:
from sqlalchemy import event
class A(Base):
__tablename__ = "a"
id = mapped_column(Integer, primary_key=True)
b_id = mapped_column(ForeignKey("b.id"))
@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
# do the above configuration in a configuration hook
j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
B_viacd = aliased(B, j, flat=True)
A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)
Above, the function _configure_ab_relationship()
will be invoked only
when a fully configured version of A
is requested, at which point the
classes B
, D
and C
would be available.
For an approach that integrates with inline typing, a similar technique can be used to effectively generate a “singleton” creation pattern for the aliased class where it is late-initialized as a global variable, which can then be used in the relationship inline:
from typing import Any
B_viacd: Any = None
b_viacd_join: Any = None
class A(Base):
__tablename__ = "a"
id: Mapped[int] = mapped_column(primary_key=True)
b_id: Mapped[int] = mapped_column(ForeignKey("b.id"))
# 1. the relationship can be declared using lambdas, allowing it to resolve
# to targets that are late-configured
b: Mapped[B] = relationship(
lambda: B_viacd, primaryjoin=lambda: A.b_id == b_viacd_join.c.b_id
)
# 2. configure the targets of the relationship using a before_mapper_configured
# hook.
@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
# 3. set up the join() and AliasedClass as globals from within
# the configuration hook.
global B_viacd, b_viacd_join
b_viacd_join = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
B_viacd = aliased(B, b_viacd_join, flat=True)
在查询中使用 AliasedClass 目标¶
Using the AliasedClass target in Queries
在前面的示例中, A.b
关系引用 B_viacd
实体作为目标,而 不是 直接引用 B
类。要添加涉及 A.b
关系的附加条件,通常需要直接引用 B_viacd
,而不是使用 B
,特别是在 A.b
的目标实体要转换为别名或子查询的情况下。下面展示了使用子查询而不是连接的相同关系:
subq = select(B).join(D, D.b_id == B.id).join(C, C.id == D.c_id).subquery()
B_viacd_subquery = aliased(B, subq)
A.b = relationship(B_viacd_subquery, primaryjoin=A.b_id == subq.c.id)
使用上述 A.b
关系的查询将呈现一个子查询:
sess.scalars(select(A).join(A.b)).all()
SELECT a.id AS a_id, a.b_id AS a_b_id
FROM a JOIN (SELECT b.id AS id, b.some_b_column AS some_b_column
FROM b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) AS anon_1 ON a.b_id = anon_1.id
如果我们想添加基于 A.b
连接的附加条件,必须以 B_viacd_subquery
而不是直接 B
的形式进行:
sess.scalars(
select(A)
.join(A.b)
.where(B_viacd_subquery.some_b_column == "some b")
.order_by(B_viacd_subquery.id)
).all()
SELECT a.id AS a_id, a.b_id AS a_b_id
FROM a JOIN (SELECT b.id AS id, b.some_b_column AS some_b_column
FROM b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) AS anon_1 ON a.b_id = anon_1.id
WHERE anon_1.some_b_column = ? ORDER BY anon_1.id
In the previous example, the A.b
relationship refers to the B_viacd
entity as the target, and not the B
class directly. To add additional
criteria involving the A.b
relationship, it’s typically necessary to
reference the B_viacd
directly rather than using B
, especially in a
case where the target entity of A.b
is to be transformed into an alias or a
subquery. Below illustrates the same relationship using a subquery, rather than
a join:
subq = select(B).join(D, D.b_id == B.id).join(C, C.id == D.c_id).subquery()
B_viacd_subquery = aliased(B, subq)
A.b = relationship(B_viacd_subquery, primaryjoin=A.b_id == subq.c.id)
A query using the above A.b
relationship will render a subquery:
sess.scalars(select(A).join(A.b)).all()
SELECT a.id AS a_id, a.b_id AS a_b_id
FROM a JOIN (SELECT b.id AS id, b.some_b_column AS some_b_column
FROM b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) AS anon_1 ON a.b_id = anon_1.id
If we want to add additional criteria based on the A.b
join, we must do
so in terms of B_viacd_subquery
rather than B
directly:
sess.scalars(
select(A)
.join(A.b)
.where(B_viacd_subquery.some_b_column == "some b")
.order_by(B_viacd_subquery.id)
).all()
SELECT a.id AS a_id, a.b_id AS a_b_id
FROM a JOIN (SELECT b.id AS id, b.some_b_column AS some_b_column
FROM b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) AS anon_1 ON a.b_id = anon_1.id
WHERE anon_1.some_b_column = ? ORDER BY anon_1.id
使用窗口函数的行限制关系¶
Row-Limited Relationships with Window Functions
另一个有趣的用例是将关系应用到 AliasedClass
对象的情况,其中关系需要连接到任何形式的特殊 SELECT。一个场景是需要使用窗口函数,例如限制关系应该返回多少行。下面的示例展示了一个非主映射器关系,它将为每个集合加载前十个项目:
class A(Base):
__tablename__ = "a"
id = mapped_column(Integer, primary_key=True)
class B(Base):
__tablename__ = "b"
id = mapped_column(Integer, primary_key=True)
a_id = mapped_column(ForeignKey("a.id"))
partition = select(
B, func.row_number().over(order_by=B.id, partition_by=B.a_id).label("index")
).alias()
partitioned_b = aliased(B, partition)
A.partitioned_bs = relationship(
partitioned_b, primaryjoin=and_(partitioned_b.a_id == A.id, partition.c.index < 10)
)
我们可以在大多数加载器策略中使用上述 partitioned_bs
关系,例如 selectinload()
:
for a1 in session.scalars(select(A).options(selectinload(A.partitioned_bs))):
print(a1.partitioned_bs) # <-- 最多不超过十个对象
上述“selectinload”查询如下所示:
SELECT
a_1.id AS a_1_id, anon_1.id AS anon_1_id, anon_1.a_id AS anon_1_a_id,
anon_1.data AS anon_1_data, anon_1.index AS anon_1_index
FROM a AS a_1
JOIN (
SELECT b.id AS id, b.a_id AS a_id, b.data AS data,
row_number() OVER (PARTITION BY b.a_id ORDER BY b.id) AS index
FROM b) AS anon_1
ON anon_1.a_id = a_1.id AND anon_1.index < %(index_1)s
WHERE a_1.id IN ( ... primary key collection ...)
ORDER BY a_1.id
如上所述,对于“a”中的每个匹配主键,我们将按“b.id”顺序获取前十个“bs”。通过在“a_id”上分区,我们确保每个“行号”都是特定于父“a_id”的。
这样的映射通常还包括从“A”到“B”的“普通”关系,用于持久化操作以及需要每个“A”的完整“B”对象集时。
Another interesting use case for relationships to AliasedClass
objects are situations where
the relationship needs to join to a specialized SELECT of any form. One
scenario is when the use of a window function is desired, such as to limit
how many rows should be returned for a relationship. The example below
illustrates a non-primary mapper relationship that will load the first
ten items for each collection:
class A(Base):
__tablename__ = "a"
id = mapped_column(Integer, primary_key=True)
class B(Base):
__tablename__ = "b"
id = mapped_column(Integer, primary_key=True)
a_id = mapped_column(ForeignKey("a.id"))
partition = select(
B, func.row_number().over(order_by=B.id, partition_by=B.a_id).label("index")
).alias()
partitioned_b = aliased(B, partition)
A.partitioned_bs = relationship(
partitioned_b, primaryjoin=and_(partitioned_b.a_id == A.id, partition.c.index < 10)
)
We can use the above partitioned_bs
relationship with most of the loader
strategies, such as selectinload()
:
for a1 in session.scalars(select(A).options(selectinload(A.partitioned_bs))):
print(a1.partitioned_bs) # <-- will be no more than ten objects
Where above, the “selectinload” query looks like:
SELECT
a_1.id AS a_1_id, anon_1.id AS anon_1_id, anon_1.a_id AS anon_1_a_id,
anon_1.data AS anon_1_data, anon_1.index AS anon_1_index
FROM a AS a_1
JOIN (
SELECT b.id AS id, b.a_id AS a_id, b.data AS data,
row_number() OVER (PARTITION BY b.a_id ORDER BY b.id) AS index
FROM b) AS anon_1
ON anon_1.a_id = a_1.id AND anon_1.index < %(index_1)s
WHERE a_1.id IN ( ... primary key collection ...)
ORDER BY a_1.id
Above, for each matching primary key in “a”, we will get the first ten “bs” as ordered by “b.id”. By partitioning on “a_id” we ensure that each “row number” is local to the parent “a_id”.
Such a mapping would ordinarily also include a “plain” relationship from “A” to “B”, for persistence operations as well as when the full set of “B” objects per “A” is desired.
构建启用查询的属性¶
Building Query-Enabled Properties
非常复杂的自定义连接条件可能无法直接持久化,在某些情况下甚至可能无法正确加载。要移除持久化部分,可以在 relationship()
上使用 relationship.viewonly
标志,将其设为只读属性(写入集合的数据将在刷新时被忽略)。但是,在极端情况下,可以考虑将常规 Python 属性与 Query
结合使用,如下所示:
class User(Base):
__tablename__ = "user"
id = mapped_column(Integer, primary_key=True)
@property
def addresses(self):
return object_session(self).query(Address).with_parent(self).filter(...).all()
在其他情况下,描述符可以构建为利用现有的 Python 数据。有关特殊 Python 属性的更多讨论,请参见 使用描述符和混合 部分。
参见
Very ambitious custom join conditions may fail to be directly persistable, and
in some cases may not even load correctly. To remove the persistence part of
the equation, use the flag relationship.viewonly
on the
relationship()
, which establishes it as a read-only
attribute (data written to the collection will be ignored on flush()).
However, in extreme cases, consider using a regular Python property in
conjunction with Query
as follows:
class User(Base):
__tablename__ = "user"
id = mapped_column(Integer, primary_key=True)
@property
def addresses(self):
return object_session(self).query(Address).with_parent(self).filter(...).all()
In other cases, the descriptor can be built to make use of existing in-Python data. See the section on 使用描述符和混合 for more general discussion of special Python attributes.
参见
使用 viewonly 关系参数的注意事项¶
Notes on using the viewonly relationship parameter
relationship.viewonly
参数应用于 relationship()
构造时,表示该 relationship()
不会参与任何 ORM unit of work 操作,此外,该属性不期望参与其表示的集合的 Python 内部变异。这意味着虽然 viewonly 关系可能引用可变的 Python 集合(如列表或集合),但在映射实例上对该列表或集合进行更改对 ORM 刷新过程 没有影响。
要探索此场景,请考虑以下映射:
from __future__ import annotations
import datetime
from sqlalchemy import and_
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "user_account"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str | None]
all_tasks: Mapped[list[Task]] = relationship()
current_week_tasks: Mapped[list[Task]] = relationship(
primaryjoin=lambda: and_(
User.id == Task.user_account_id,
# 该表达式适用于 PostgreSQL,但可能不支持其他数据库引擎
Task.task_date >= func.now() - datetime.timedelta(days=7),
),
viewonly=True,
)
class Task(Base):
__tablename__ = "task"
id: Mapped[int] = mapped_column(primary_key=True)
user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
description: Mapped[str | None]
task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
user: Mapped[User] = relationship(back_populates="current_week_tasks")
以下部分将指出此配置的不同方面。
The relationship.viewonly
parameter when applied to a
relationship()
construct indicates that this relationship()
will not take part in any ORM unit of work operations, and additionally
that the attribute does not expect to participate within in-Python mutations
of its represented collection. This means
that while the viewonly relationship may refer to a mutable Python collection
like a list or set, making changes to that list or set as present on a
mapped instance will have no effect on the ORM flush process.
To explore this scenario consider this mapping:
from __future__ import annotations
import datetime
from sqlalchemy import and_
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "user_account"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str | None]
all_tasks: Mapped[list[Task]] = relationship()
current_week_tasks: Mapped[list[Task]] = relationship(
primaryjoin=lambda: and_(
User.id == Task.user_account_id,
# this expression works on PostgreSQL but may not be supported
# by other database engines
Task.task_date >= func.now() - datetime.timedelta(days=7),
),
viewonly=True,
)
class Task(Base):
__tablename__ = "task"
id: Mapped[int] = mapped_column(primary_key=True)
user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
description: Mapped[str | None]
task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
user: Mapped[User] = relationship(back_populates="current_week_tasks")
The following sections will note different aspects of this configuration.
在 Python 中,包括 backref 的突变不适用于 viewonly=True¶
In-Python mutations including backrefs are not appropriate with viewonly=True
上面的映射将 User.current_week_tasks
视为 Task.user
属性的 backref 目标。SQLAlchemy 的 ORM 配置过程目前不会对此进行标记,但这实际上是一个配置错误。更改 Task
上的 .user
属性不会影响 .current_week_tasks
属性:
>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[]
这里有另一个名为 relationship.sync_backrefs
的参数,可以打开它以允许在这种情况下更改 .current_week_tasks
,但这不被认为是 viewonly 关系的最佳实践,viewonly 关系不应该依赖于 Python 内部变异。
在此映射中,可以在 User.all_tasks
和 Task.user
之间配置 backrefs,因为它们都不是 viewonly 并且将正常同步。
除了 viewonly 关系禁用 backref 变异的问题外,Python 中对 User.all_tasks
集合的简单更改在将更改刷新到数据库之前,也不会反映在 User.current_week_tasks
集合中。
总体而言,对于需要自定义集合立即响应 Python 内部变异的用例,viewonly 关系通常不合适。更好的方法是使用 SQLAlchemy 的 混合属性 功能,或对于仅实例的情况使用 Python @property
,其中可以实现基于当前 Python 实例生成的用户定义集合。要更改我们的示例以这种方式工作,我们修复 Task.user
上的 relationship.back_populates
参数以引用 User.all_tasks
,然后展示一个简单的 @property
,该属性将根据即时的 User.all_tasks
集合提供结果:
class User(Base):
__tablename__ = "user_account"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str | None]
all_tasks: Mapped[list[Task]] = relationship(back_populates="user")
@property
def current_week_tasks(self) -> list[Task]:
past_seven_days = datetime.datetime.now() - datetime.timedelta(days=7)
return [t for t in self.all_tasks if t.task_date >= past_seven_days]
class Task(Base):
__tablename__ = "task"
id: Mapped[int] = mapped_column(primary_key=True)
user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
description: Mapped[str | None]
task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
user: Mapped[User] = relationship(back_populates="all_tasks")
使用每次动态计算的 Python 集合,我们可以保证始终获得正确的答案,而无需使用数据库:
>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[<__main__.Task object at 0x7f3d699523c0>]
The above mapping targets the User.current_week_tasks
viewonly relationship
as the backref target of the Task.user
attribute. This is not
currently flagged by SQLAlchemy’s ORM configuration process, however is a
configuration error. Changing the .user
attribute on a Task
will not
affect the .current_week_tasks
attribute:
>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[]
There is another parameter called relationship.sync_backrefs
which can be turned on here to allow .current_week_tasks
to be mutated in this
case, however this is not considered to be a best practice with a viewonly
relationship, which instead should not be relied upon for in-Python mutations.
In this mapping, backrefs can be configured between User.all_tasks
and
Task.user
, as these are both not viewonly and will synchronize normally.
Beyond the issue of backref mutations being disabled for viewonly relationships,
plain changes to the User.all_tasks
collection in Python
are also not reflected in the User.current_week_tasks
collection until
changes have been flushed to the database.
Overall, for a use case where a custom collection should respond immediately to
in-Python mutations, the viewonly relationship is generally not appropriate. A
better approach is to use the 混合属性 feature of SQLAlchemy, or
for instance-only cases to use a Python @property
, where a user-defined
collection that is generated in terms of the current Python instance can be
implemented. To change our example to work this way, we repair the
relationship.back_populates
parameter on Task.user
to
reference User.all_tasks
, and
then illustrate a simple @property
that will deliver results in terms of
the immediate User.all_tasks
collection:
class User(Base):
__tablename__ = "user_account"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str | None]
all_tasks: Mapped[list[Task]] = relationship(back_populates="user")
@property
def current_week_tasks(self) -> list[Task]:
past_seven_days = datetime.datetime.now() - datetime.timedelta(days=7)
return [t for t in self.all_tasks if t.task_date >= past_seven_days]
class Task(Base):
__tablename__ = "task"
id: Mapped[int] = mapped_column(primary_key=True)
user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
description: Mapped[str | None]
task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
user: Mapped[User] = relationship(back_populates="all_tasks")
Using an in-Python collection calculated on the fly each time, we are guaranteed to have the correct answer at all times, without the need to use a database at all:
>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[<__main__.Task object at 0x7f3d699523c0>]
viewonly=True 集合/属性在过期之前不会被重新查询¶
viewonly=True collections / attributes do not get re-queried until expired
继续使用原始的 viewonly 属性,如果我们确实对 persistent 对象上的 User.all_tasks
集合进行更改,那么 viewonly 集合只能在发生 两 件事后显示此更改的净结果。首先是对 User.all_tasks
的更改被 flushed,以便新数据至少在本地事务范围内在数据库中可用。其次是 User.current_week_tasks
属性被 expired 并通过新的 SQL 查询重新加载到数据库中。
为了支持此要求,最简单的流程是仅在主要是只读的操作中使用 viewonly 关系。例如,如果我们从数据库中重新检索一个 User
,集合将是最新的:
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b906b0>]
当我们对 u1.all_tasks
进行修改时,如果我们希望这些更改反映在 u1.current_week_tasks
viewonly 关系中,这些更改需要被刷新,并且 u1.current_week_tasks
属性需要被过期,以便在下一次访问时 lazy load。最简单的方法是使用 Session.commit()
,保持 Session.expire_on_commit
参数设置为默认值 True
:
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
... sess.commit()
... print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b90ec0>, <__main__.Task object at 0x7f8711b90a10>]
如上所述,调用 Session.commit()
将 u1.all_tasks
的更改刷新到数据库,然后使所有对象过期,因此当我们访问 u1.current_week_tasks
时,发生了 lazy load,从数据库中重新获取此属性的内容。
要拦截操作而不实际提交事务,需要先显式使属性 expired 。一种简单的方法是直接调用它。在下面的示例中,Session.flush()
将待处理的更改发送到数据库,然后使用 Session.expire()
使 u1.current_week_tasks
集合过期,以便在下一次访问时重新获取:
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
... sess.flush()
... sess.expire(u1, ["current_week_tasks"])
... print(u1.current_week_tasks)
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]
我们实际上可以跳过调用 Session.flush()
,假设 Session
保持 Session.autoflush
为默认值 True
,因为过期的 current_week_tasks
属性将在过期后访问时触发自动刷新:
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
... sess.expire(u1, ["current_week_tasks"])
... print(u1.current_week_tasks) # 触发查询前的自动刷新
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]
继续上述方法更复杂一点,我们可以在相关的 User.all_tasks
集合更改时使用 event hooks 程序化地应用过期。这是一种 高级技术,首先应该考虑更简单的架构,如 @property
或坚持只读用例。在我们的简单示例中,这将配置为:
from sqlalchemy import event, inspect
@event.listens_for(User.all_tasks, “append”) @event.listens_for(User.all_tasks, “remove”) @event.listens_for(User.all_tasks, “bulk_replace”) def _expire_User_current_week_tasks(target, value, initiator):
inspect(target).session.expire(target, [“current_week_tasks”])
通过上述钩子,变异操作会被拦截,并导致 User.current_week_tasks
集合自动过期:
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
... print(u1.current_week_tasks)
[<__main__.Task object at 0x7f66d093ccb0>, <__main__.Task object at 0x7f66d093cce0>]
上述 AttributeEvents
事件钩子也会被 backref 变异触发,因此通过上述钩子对 Task.user
的更改也会被拦截:
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... t1 = Task(task_date=datetime.datetime.now())
... t1.user = u1
... sess.add(t1)
... print(u1.current_week_tasks)
[<__main__.Task object at 0x7f3b0c070d10>, <__main__.Task object at 0x7f3b0c057d10>]
Continuing with the original viewonly attribute, if we do in fact make changes
to the User.all_tasks
collection on a persistent object, the
viewonly collection can only show the net result of this change after two
things occur. The first is that the change to User.all_tasks
is
flushed, so that the new data is available in the database, at least
within the scope of the local transaction. The second is that the User.current_week_tasks
attribute is expired and reloaded via a new SQL query to the database.
To support this requirement, the simplest flow to use is one where the
viewonly relationship is consumed only in operations that are primarily read
only to start with. Such as below, if we retrieve a User
fresh from
the database, the collection will be current:
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b906b0>]
When we make modifications to u1.all_tasks
, if we want to see these changes
reflected in the u1.current_week_tasks
viewonly relationship, these changes need to be flushed
and the u1.current_week_tasks
attribute needs to be expired, so that
it will lazy load on next access. The simplest approach to this is
to use Session.commit()
, keeping the Session.expire_on_commit
parameter set at its default of True
:
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
... sess.commit()
... print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b90ec0>, <__main__.Task object at 0x7f8711b90a10>]
Above, the call to Session.commit()
flushed the changes to u1.all_tasks
to the database, then expired all objects, so that when we accessed u1.current_week_tasks
,
a :term:` lazy load` occurred which fetched the contents for this attribute
freshly from the database.
To intercept operations without actually committing the transaction,
the attribute needs to be explicitly expired
first. A simplistic way to do this is to just call it directly. In
the example below, Session.flush()
sends pending changes to the
database, then Session.expire()
is used to expire the u1.current_week_tasks
collection so that it re-fetches on next access:
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
... sess.flush()
... sess.expire(u1, ["current_week_tasks"])
... print(u1.current_week_tasks)
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]
We can in fact skip the call to Session.flush()
, assuming a
Session
that keeps Session.autoflush
at its
default value of True
, as the expired current_week_tasks
attribute will
trigger autoflush when accessed after expiration:
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
... sess.expire(u1, ["current_week_tasks"])
... print(u1.current_week_tasks) # triggers autoflush before querying
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]
Continuing with the above approach to something more elaborate, we can apply
the expiration programmatically when the related User.all_tasks
collection
changes, using event hooks. This an advanced
technique, where simpler architectures like @property
or sticking to
read-only use cases should be examined first. In our simple example, this
would be configured as:
from sqlalchemy import event, inspect
@event.listens_for(User.all_tasks, "append")
@event.listens_for(User.all_tasks, "remove")
@event.listens_for(User.all_tasks, "bulk_replace")
def _expire_User_current_week_tasks(target, value, initiator):
inspect(target).session.expire(target, ["current_week_tasks"])
With the above hooks, mutation operations are intercepted and result in
the User.current_week_tasks
collection to be expired automatically:
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
... print(u1.current_week_tasks)
[<__main__.Task object at 0x7f66d093ccb0>, <__main__.Task object at 0x7f66d093cce0>]
The AttributeEvents
event hooks used above are also triggered
by backref mutations, so with the above hooks a change to Task.user
is
also intercepted:
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... t1 = Task(task_date=datetime.datetime.now())
... t1.user = u1
... sess.add(t1)
... print(u1.current_week_tasks)
[<__main__.Task object at 0x7f3b0c070d10>, <__main__.Task object at 0x7f3b0c057d10>]