事务和连接管理

Transactions and Connection Management

管理事务

Managing Transactions

在 1.4 版本发生变更: Session 的事务管理逻辑已被重新设计,使其更加清晰和易于使用。特别是现在引入了 “autobegin” 操作,意味着可以控制事务的开始点,而无需依赖传统的 “autocommit” 模式。

Session 一次只追踪一个“虚拟”事务(virtual transaction)的状态, 它使用一个名为 SessionTransaction 的对象来实现这一点。 该对象会在需要时通过底层的 Engine 或绑定到 Session 的多个 engine, 来使用 Connection 启动真正的连接级事务。

这个“虚拟”事务会在需要时自动创建, 也可以通过显式调用 Session.begin() 方法来启动。 在尽可能的范围内,Python 的上下文管理器语法可用于创建 Session 对象, 也可以用于维持 SessionTransaction 的作用域。

以下是一个基本的 Session 使用示例:

from sqlalchemy.orm import Session

session = Session(engine)

我们可以使用上下文管理器语法,在明确的事务作用域中执行操作:

with session.begin():
    session.add(some_object())
    session.add(some_other_object())
# 上下文结束时提交事务;如果有异常抛出则会回滚事务

在上述代码块结束时,如果没有异常抛出,任何待处理的对象都会被 flush 到数据库中, 随后数据库事务会被提交;若代码块中抛出了异常,则事务会被回滚。 无论是哪种情况,该 Session 在离开代码块之后都可以继续用于下一个事务。

Session.begin() 方法是可选的, 也可以采用“边执行边提交”的模式来使用 Session, 这种模式下事务会按需自动开启;用户只需在合适的时机进行提交或回滚:

session = Session(engine)

session.add(some_object())
session.add(some_other_object())

session.commit()  # 提交事务

# 会自动重新开始新事务
result = session.execute(text("< some select statement >"))
session.add_all([more_objects, ...])
session.commit()  # 再次提交

session.add(still_another_object)
session.flush()  # flush still_another_object
session.rollback()  # 回滚 still_another_object 的更改

Session 本身提供了一个 Session.close() 方法。 如果此时仍处于一个尚未提交或回滚的事务中,调用该方法将取消(即回滚)该事务, 并从该 Session 的状态中移除所有对象。 如果 Session 的使用方式不能保证一定会调用 Session.commit()Session.close() 来确保所有资源被正确释放:

# 清除所有对象、无条件回滚事务并释放连接资源
session.close()

最后,Session 的创建和关闭过程本身也可以使用上下文管理器语法管理。 这是确保 Session 使用范围受控的最佳方式。 首先通过 Session 构造器来演示:

with Session(engine) as session:
    session.add(some_object())
    session.add(some_other_object())

    session.commit()  # 提交事务

    session.add(still_another_object)
    session.flush()  # flush still_another_object

    session.commit()  # 再次提交

    result = session.execute(text("<some SELECT statement>"))

# 上述 execute() 调用产生的剩余事务状态将被丢弃

同样,sessionmaker 也可以以相同方式使用:

Session = sessionmaker(engine)

with Session() as session:
    with session.begin():
        session.add(some_object)
    # 提交事务

# 关闭 Session

sessionmaker 本身也包含一个 sessionmaker.begin() 方法, 用于一次性执行创建和开启事务两个操作:

with Session.begin() as session:
    session.add(some_object)

在 1.4 版本发生变更: Session transaction management has been revised to be clearer and easier to use. In particular, it now features “autobegin” operation, which means the point at which a transaction begins may be controlled, without using the legacy “autocommit” mode.

The Session tracks the state of a single “virtual” transaction at a time, using an object called SessionTransaction. This object then makes use of the underlying Engine or engines to which the Session object is bound in order to start real connection-level transactions using the Connection object as needed.

This “virtual” transaction is created automatically when needed, or can alternatively be started using the Session.begin() method. To as great a degree as possible, Python context manager use is supported both at the level of creating Session objects as well as to maintain the scope of the SessionTransaction.

Below, assume we start with a Session:

from sqlalchemy.orm import Session

session = Session(engine)

We can now run operations within a demarcated transaction using a context manager:

with session.begin():
    session.add(some_object())
    session.add(some_other_object())
# commits transaction at the end, or rolls back if there
# was an exception raised

At the end of the above context, assuming no exceptions were raised, any pending objects will be flushed to the database and the database transaction will be committed. If an exception was raised within the above block, then the transaction would be rolled back. In both cases, the above Session subsequent to exiting the block is ready to be used in subsequent transactions.

The Session.begin() method is optional, and the Session may also be used in a commit-as-you-go approach, where it will begin transactions automatically as needed; these only need be committed or rolled back:

session = Session(engine)

session.add(some_object())
session.add(some_other_object())

session.commit()  # commits

# will automatically begin again
result = session.execute(text("< some select statement >"))
session.add_all([more_objects, ...])
session.commit()  # commits

session.add(still_another_object)
session.flush()  # flush still_another_object
session.rollback()  # rolls back still_another_object

The Session itself features a Session.close() method. If the Session is begun within a transaction that has not yet been committed or rolled back, this method will cancel (i.e. rollback) that transaction, and also expunge all objects contained within the Session object’s state. If the Session is being used in such a way that a call to Session.commit() or Session.rollback() is not guaranteed (e.g. not within a context manager or similar), the close method may be used to ensure all resources are released:

# expunges all objects, releases all transactions unconditionally
# (with rollback), releases all database connections back to their
# engines
session.close()

Finally, the session construction / close process can itself be run via context manager. This is the best way to ensure that the scope of a Session object’s use is scoped within a fixed block. Illustrated via the Session constructor first:

with Session(engine) as session:
    session.add(some_object())
    session.add(some_other_object())

    session.commit()  # commits

    session.add(still_another_object)
    session.flush()  # flush still_another_object

    session.commit()  # commits

    result = session.execute(text("<some SELECT statement>"))

# remaining transactional state from the .execute() call is
# discarded

Similarly, the sessionmaker can be used in the same way:

Session = sessionmaker(engine)

with Session() as session:
    with session.begin():
        session.add(some_object)
    # commits

# closes the Session

sessionmaker itself includes a sessionmaker.begin() method to allow both operations to take place at once:

with Session.begin() as session:
    session.add(some_object)

使用 SAVEPOINT

Using SAVEPOINT

如果底层数据库引擎支持 SAVEPOINT 事务,则可以使用 Session.begin_nested() 方法来划定一个保存点(SAVEPOINT):

Session = sessionmaker()

with Session.begin() as session:
    session.add(u1)
    session.add(u2)

    nested = session.begin_nested()  # 创建一个保存点
    session.add(u3)
    nested.rollback()  # 回滚 u3,保留 u1 和 u2

# 提交 u1 和 u2

每次调用 Session.begin_nested() 时,都会向数据库发出一个新的 “BEGIN SAVEPOINT” 命令(如果尚未处于事务中,则会先开始一个事务), 并返回一个 SessionTransaction 类型的对象,作为该保存点的句柄。 调用该对象的 .commit() 方法将向数据库发出 “RELEASE SAVEPOINT” 命令; 而调用 .rollback() 方法则发出 “ROLLBACK TO SAVEPOINT” 命令。 外层数据库事务将继续保持进行中状态。

Session.begin_nested() 通常结合上下文管理器一起使用, 可用于捕捉每个记录级别的错误并针对当前事务的一部分状态进行回滚, 而不影响整个事务,例如:

for record in records:
    try:
        with session.begin_nested():
            session.merge(record)
    except:
        print("Skipped record %s" % record)
session.commit()

Session.begin_nested() 所返回的上下文管理器退出时, 该保存点将被“提交”,并执行常规的 flush 行为以同步所有挂起状态。 如果在上下文块中抛出异常,该保存点将被回滚, 同时与此次嵌套事务有关的 Session 中的对象状态将被过期(expired)。

这种模式非常适用于如 PostgreSQL 场景下捕捉 IntegrityError 异常来检测主键冲突; PostgreSQL 默认在遇到此类错误时会中止整个事务, 但使用 SAVEPOINT 机制时,外层事务可以保持继续进行。 下面的示例中,一组数据将尝试写入数据库,对于偶尔出现的“主键重复”记录将跳过, 而不影响整个操作:

from sqlalchemy import exc

with session.begin():
    for record in records:
        try:
            with session.begin_nested():
                obj = SomeRecord(id=record["identifier"], name=record["name"])
                session.add(obj)
        except exc.IntegrityError:
            print(f"Skipped record {record} - row already exists")

调用 Session.begin_nested() 时,Session 会首先将当前所有挂起状态 flush 到数据库中; 这个行为是无条件的,即使指定了 Session.autoflush=False 也不会跳过。 其设计目的是:当嵌套事务回滚时,能够明确识别并过期掉保存点范围内所修改的内存对象状态, 同时确保这些对象在刷新时能从数据库重新加载其在保存点开始前的状态。

在 SQLAlchemy 的现代版本中,当使用 Session.begin_nested() 创建的保存点被回滚时, 自该保存点起被修改过的内存对象状态将被标记为过期(expired), 而未被更改的对象状态则会保留, 这样后续操作可以继续使用这些未受影响的数据,而无需重新从数据库加载。

参见

Connection.begin_nested() - Core 层级的 SAVEPOINT 接口

SAVEPOINT transactions, if supported by the underlying engine, may be delineated using the Session.begin_nested() method:

Session = sessionmaker()

with Session.begin() as session:
    session.add(u1)
    session.add(u2)

    nested = session.begin_nested()  # establish a savepoint
    session.add(u3)
    nested.rollback()  # rolls back u3, keeps u1 and u2

# commits u1 and u2

Each time Session.begin_nested() is called, a new “BEGIN SAVEPOINT” command is emitted to the database within the scope of the current database transaction (starting one if not already in progress), and an object of type SessionTransaction is returned, which represents a handle to this SAVEPOINT. When the .commit() method on this object is called, “RELEASE SAVEPOINT” is emitted to the database, and if instead the .rollback() method is called, “ROLLBACK TO SAVEPOINT” is emitted. The enclosing database transaction remains in progress.

Session.begin_nested() is typically used as a context manager where specific per-instance errors may be caught, in conjunction with a rollback emitted for that portion of the transaction’s state, without rolling back the whole transaction, as in the example below:

for record in records:
    try:
        with session.begin_nested():
            session.merge(record)
    except:
        print("Skipped record %s" % record)
session.commit()

When the context manager yielded by Session.begin_nested() completes, it “commits” the savepoint, which includes the usual behavior of flushing all pending state. When an error is raised, the savepoint is rolled back and the state of the Session local to the objects that were changed is expired.

This pattern is ideal for situations such as using PostgreSQL and catching IntegrityError to detect duplicate rows; PostgreSQL normally aborts the entire transaction when such an error is raised, however when using SAVEPOINT, the outer transaction is maintained. In the example below a list of data is persisted into the database, with the occasional “duplicate primary key” record skipped, without rolling back the entire operation:

from sqlalchemy import exc

with session.begin():
    for record in records:
        try:
            with session.begin_nested():
                obj = SomeRecord(id=record["identifier"], name=record["name"])
                session.add(obj)
        except exc.IntegrityError:
            print(f"Skipped record {record} - row already exists")

When Session.begin_nested() is called, the Session first flushes all currently pending state to the database; this occurs unconditionally, regardless of the value of the Session.autoflush parameter which normally may be used to disable automatic flush. The rationale for this behavior is so that when a rollback on this nested transaction occurs, the Session may expire any in-memory state that was created within the scope of the SAVEPOINT, while ensuring that when those expired objects are refreshed, the state of the object graph prior to the beginning of the SAVEPOINT will be available to re-load from the database.

In modern versions of SQLAlchemy, when a SAVEPOINT initiated by Session.begin_nested() is rolled back, in-memory object state that was modified since the SAVEPOINT was created is expired, however other object state that was not altered since the SAVEPOINT began is maintained. This is so that subsequent operations can continue to make use of the otherwise unaffected data without the need for refreshing it from the database.

参见

Connection.begin_nested() - Core SAVEPOINT API

会话级与引擎级事务控制

Session-level vs. Engine level transaction control

sessionmakerEngine 的层级, 还是 SessionConnection 的层级。 以下章节将根据如下对照表详细说明这些场景:

ORM                                           Core
-----------------------------------------     -----------------------------------
sessionmaker                                  Engine
Session                                       Connection
sessionmaker.begin()                          Engine.begin()
some_session.commit()                         some_connection.commit()
with some_sessionmaker() as session:          with some_engine.connect() as conn:
with some_sessionmaker.begin() as session:    with some_engine.begin() as conn:
with some_session.begin_nested() as sp:       with some_connection.begin_nested() as sp:

The Connection in Core and _session.Session in ORM feature equivalent transactional semantics, both at the level of the sessionmaker vs. the Engine, as well as the Session vs. the Connection. The following sections detail these scenarios based on the following scheme:

ORM                                           Core
-----------------------------------------     -----------------------------------
sessionmaker                                  Engine
Session                                       Connection
sessionmaker.begin()                          Engine.begin()
some_session.commit()                         some_connection.commit()
with some_sessionmaker() as session:          with some_engine.connect() as conn:
with some_sessionmaker.begin() as session:    with some_engine.begin() as conn:
with some_session.begin_nested() as sp:       with some_connection.begin_nested() as sp:

随时提交

Commit as you go

SessionConnection 都提供了 Connection.commit()Connection.rollback() 方法。 在 SQLAlchemy 2.0 风格的操作中,这些方法始终作用于 最外层事务。 对于 Session,假定 Session.autobegin 参数保留默认值 True

Engine 示例:

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")

with engine.connect() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
    conn.commit()

Session 示例:

Session = sessionmaker(engine)

with Session() as session:
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
    session.commit()

Both Session and Connection feature Connection.commit() and Connection.rollback() methods. Using SQLAlchemy 2.0-style operation, these methods affect the outermost transaction in all cases. For the Session, it is assumed that Session.autobegin is left at its default value of True.

Engine:

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")

with engine.connect() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
    conn.commit()

Session:

Session = sessionmaker(engine)

with Session() as session:
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
    session.commit()

开始一次

Begin Once

sessionmakerEngine 都提供了 Engine.begin() 方法, 用于创建一个新的 SQL 执行对象(ORM 中为 Session,Core 中为 Connection), 并返回一个上下文管理器,用于封装该对象的事务处理(开始、提交或回滚)。

Engine 示例:

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")

with engine.begin() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
# 自动提交并关闭连接

Session 示例:

Session = sessionmaker(engine)

with Session.begin() as session:
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
# 自动提交并关闭会话

Both sessionmaker and Engine feature a Engine.begin() method that will both procure a new object with which to execute SQL statements (the Session and Connection, respectively) and then return a context manager that will maintain a begin/commit/rollback context for that object.

Engine:

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")

with engine.begin() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
# commits and closes automatically

Session:

Session = sessionmaker(engine)

with Session.begin() as session:
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
# commits and closes automatically

嵌套事务

Nested Transaction

当使用 Session.begin_nested()Connection.begin_nested() 方法时,返回的事务对象必须用于提交或回滚该 SAVEPOINT。 调用 Session.commit()Connection.commit() 方法将始终提交 最外层 事务;这是 SQLAlchemy 2.0 特有的行为,与 1.x 版本系列相反。

Engine 示例:

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")

with engine.begin() as conn:
    savepoint = conn.begin_nested()
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
    savepoint.commit()  # 或者回滚

# 自动提交

Session 示例:

Session = sessionmaker(engine)

with Session.begin() as session:
    savepoint = session.begin_nested()
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
    savepoint.commit()  # 或者回滚
# 自动提交

When using a SAVEPOINT via the Session.begin_nested() or Connection.begin_nested() methods, the transaction object returned must be used to commit or rollback the SAVEPOINT. Calling the Session.commit() or Connection.commit() methods will always commit the outermost transaction; this is a SQLAlchemy 2.0 specific behavior that is reversed from the 1.x series.

Engine:

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")

with engine.begin() as conn:
    savepoint = conn.begin_nested()
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
    savepoint.commit()  # or rollback

# commits automatically

Session:

Session = sessionmaker(engine)

with Session.begin() as session:
    savepoint = session.begin_nested()
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
    savepoint.commit()  # or rollback
# commits automatically

显式开始

Explicit Begin

Session 具有 “autobegin” 行为,意味着一旦开始执行操作,它会确保有一个 SessionTransaction 用来追踪正在进行的操作。 该事务会在调用 Session.commit() 时完成。

在框架集成中,通常希望控制“开始”操作的时机。为此, Session 使用了一个 “autobegin” 策略,使得即使是尚未开始事务的 Session, 也可以直接调用 Session.begin() 方法:

Session = sessionmaker(bind=engine)
session = Session()
session.begin()
try:
    item1 = session.get(Item, 1)
    item2 = session.get(Item, 2)
    item1.foo = "bar"
    item2.bar = "foo"
    session.commit()
except:
    session.rollback()
    raise

上述模式通常通过上下文管理器以更符合惯例的方式调用:

Session = sessionmaker(bind=engine)
session = Session()
with session.begin():
    item1 = session.get(Item, 1)
    item2 = session.get(Item, 2)
    item1.foo = "bar"
    item2.bar = "foo"

Session.begin() 方法和会话的 “autobegin” 过程使用相同的步骤来启动事务。 这包括在启动事务时会调用 SessionEvents.after_transaction_create() 事件;该钩子允许框架在 ORM Session 的事务处理流程中集成其自定义的事务处理过程。

The Session features “autobegin” behavior, meaning that as soon as operations begin to take place, it ensures a SessionTransaction is present to track ongoing operations. This transaction is completed when Session.commit() is called.

It is often desirable, particularly in framework integrations, to control the point at which the “begin” operation occurs. To suit this, the Session uses an “autobegin” strategy, such that the Session.begin() method may be called directly for a Session that has not already had a transaction begun:

Session = sessionmaker(bind=engine)
session = Session()
session.begin()
try:
    item1 = session.get(Item, 1)
    item2 = session.get(Item, 2)
    item1.foo = "bar"
    item2.bar = "foo"
    session.commit()
except:
    session.rollback()
    raise

The above pattern is more idiomatically invoked using a context manager:

Session = sessionmaker(bind=engine)
session = Session()
with session.begin():
    item1 = session.get(Item, 1)
    item2 = session.get(Item, 2)
    item1.foo = "bar"
    item2.bar = "foo"

The Session.begin() method and the session’s “autobegin” process use the same sequence of steps to begin the transaction. This includes that the SessionEvents.after_transaction_create() event is invoked when it occurs; this hook is used by frameworks in order to integrate their own transactional processes with that of the ORM Session.

启用两阶段提交

Enabling Two-Phase Commit

对于支持两阶段操作的后端(目前为 MySQL 和 PostgreSQL),可以指示会话使用两阶段提交语义。这将协调跨数据库的事务提交,以确保事务在所有数据库中要么提交,要么回滚。你还可以通过 Session.prepare() 方法准备会话与 SQLAlchemy 未管理的事务进行交互。要使用两阶段事务,在会话中设置标志 twophase=True:

engine1 = create_engine("postgresql+psycopg2://db1")
engine2 = create_engine("postgresql+psycopg2://db2")

Session = sessionmaker(twophase=True)

# 将 User 操作绑定到 engine 1,将 Account 操作绑定到 engine 2
Session.configure(binds={User: engine1, Account: engine2})

session = Session()

# .... 处理账户和用户

# 提交。会话将向所有数据库发出刷新,并对所有数据库执行准备步骤,
# 然后提交两个事务
session.commit()

For backends which support two-phase operation (currently MySQL and PostgreSQL), the session can be instructed to use two-phase commit semantics. This will coordinate the committing of transactions across databases so that the transaction is either committed or rolled back in all databases. You can also Session.prepare() the session for interacting with transactions not managed by SQLAlchemy. To use two phase transactions set the flag twophase=True on the session:

engine1 = create_engine("postgresql+psycopg2://db1")
engine2 = create_engine("postgresql+psycopg2://db2")

Session = sessionmaker(twophase=True)

# bind User operations to engine 1, Account operations to engine 2
Session.configure(binds={User: engine1, Account: engine2})

session = Session()

# .... work with accounts and users

# commit.  session will issue a flush to all DBs, and a prepare step to all DBs,
# before committing both transactions
session.commit()

设置事务隔离级别/DBAPI AUTOCOMMIT

Setting Transaction Isolation Levels / DBAPI AUTOCOMMIT

大多数 DBAPI 支持可配置的事务隔离级别。传统上有四个级别:“READ UNCOMMITTED”(读未提交)、“READ COMMITTED”(读已提交)、“REPEATABLE READ”(可重复读)和“SERIALIZABLE”(可串行化)。 这些通常在 DBAPI 连接开始新事务之前应用,注意大多数 DBAPI 在首次发出 SQL 语句时会隐式开始该事务。

支持隔离级别的 DBAPI 通常还支持真正的“自动提交”概念,这意味着 DBAPI 连接将被置于非事务性自动提交模式。 这通常意味着 DBAPI 的典型行为(自动发出 “BEGIN” 语句到数据库)不再发生,但也可能包括其他指令。 在使用此模式时, DBAPI 在任何情况下都不使用事务。像 .begin(), .commit().rollback() 等 SQLAlchemy 方法将默默通过,不做任何操作。

SQLAlchemy 的方言支持在每个 EngineConnection 上设置隔离模式, 通过在 create_engine()Connection.execution_options() 级别上的标志进行配置。

当使用 ORM Session 时,它作为引擎和连接的 外观,但不会直接暴露事务隔离。 因此,为了影响事务隔离级别,我们需要根据需要操作 EngineConnection

参见

设置事务隔离级别(包括 DBAPI 自动提交) - 请确保查看 SQLAlchemy Connection 对象级别的隔离级别如何工作。

Most DBAPIs support the concept of configurable transaction isolation levels. These are traditionally the four levels “READ UNCOMMITTED”, “READ COMMITTED”, “REPEATABLE READ” and “SERIALIZABLE”. These are usually applied to a DBAPI connection before it begins a new transaction, noting that most DBAPIs will begin this transaction implicitly when SQL statements are first emitted.

DBAPIs that support isolation levels also usually support the concept of true “autocommit”, which means that the DBAPI connection itself will be placed into a non-transactional autocommit mode. This usually means that the typical DBAPI behavior of emitting “BEGIN” to the database automatically no longer occurs, but it may also include other directives. When using this mode, the DBAPI does not use a transaction under any circumstances. SQLAlchemy methods like .begin(), .commit() and .rollback() pass silently.

SQLAlchemy’s dialects support settable isolation modes on a per-Engine or per-Connection basis, using flags at both the create_engine() level as well as at the Connection.execution_options() level.

When using the ORM Session, it acts as a facade for engines and connections, but does not expose transaction isolation directly. So in order to affect transaction isolation level, we need to act upon the Engine or Connection as appropriate.

参见

设置事务隔离级别(包括 DBAPI 自动提交) - be sure to review how isolation levels work at the level of the SQLAlchemy Connection object as well.

为 Sessionmaker/引擎范围设置隔离

Setting Isolation For A Sessionmaker / Engine Wide

要全局设置特定隔离级别的 Sessionsessionmaker,第一种方法是,可以通过在所有情况下构造一个具有特定隔离级别的 Engine,然后将其用作 Session 和/或 sessionmaker 的连接源:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

eng = create_engine(
    "postgresql+psycopg2://scott:tiger@localhost/test",
    isolation_level="REPEATABLE READ",
)

Session = sessionmaker(eng)

另一种方法,适用于同时有两个不同隔离级别的引擎,是使用 Engine.execution_options() 方法,它将生成原始 Engine 的浅拷贝,并共享与父引擎相同的连接池。 当操作被分成“事务性”和“自动提交”操作时,这种方法通常更合适:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")

autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")

transactional_session = sessionmaker(eng)
autocommit_session = sessionmaker(autocommit_engine)

如上所示,”eng” 和 “autocommit_engine” 共享相同的方言和连接池。 然而,当从 autocommit_engine 获取连接时,将设置“AUTOCOMMIT”模式。 这两个 sessionmaker 对象 “transactional_session” 和 “autocommit_session” 在与数据库连接交互时,将继承这些特性。

autocommit_session仍然具有事务语义,包括 Session.commit()Session.rollback() 仍然将自己视为“提交”和“回滚”对象, 但是事务将默默缺失。正因为如此, 通常(尽管不是严格要求)建议使用 AUTOCOMMIT 隔离级别的会话时以只读方式使用,即:

with autocommit_session() as session:
    some_objects = session.execute(text("<statement>"))
    some_other_objects = session.execute(text("<statement>"))

# 连接关闭

To set up a Session or sessionmaker with a specific isolation level globally, the first technique is that an Engine can be constructed against a specific isolation level in all cases, which is then used as the source of connectivity for a Session and/or sessionmaker:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

eng = create_engine(
    "postgresql+psycopg2://scott:tiger@localhost/test",
    isolation_level="REPEATABLE READ",
)

Session = sessionmaker(eng)

Another option, useful if there are to be two engines with different isolation levels at once, is to use the Engine.execution_options() method, which will produce a shallow copy of the original Engine which shares the same connection pool as the parent engine. This is often preferable when operations will be separated into “transactional” and “autocommit” operations:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")

autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")

transactional_session = sessionmaker(eng)
autocommit_session = sessionmaker(autocommit_engine)

Above, both “eng” and "autocommit_engine" share the same dialect and connection pool. However the “AUTOCOMMIT” mode will be set upon connections when they are acquired from the autocommit_engine. The two sessionmaker objects “transactional_session” and “autocommit_session" then inherit these characteristics when they work with database connections.

The “autocommit_sessioncontinues to have transactional semantics, including that Session.commit() and Session.rollback() still consider themselves to be “committing” and “rolling back” objects, however the transaction will be silently absent. For this reason, it is typical, though not strictly required, that a Session with AUTOCOMMIT isolation be used in a read-only fashion, that is:

with autocommit_session() as session:
    some_objects = session.execute(text("<statement>"))
    some_other_objects = session.execute(text("<statement>"))

# closes connection

为单个会话设置隔离

Setting Isolation for Individual Sessions

当我们创建新的 Session 时,既可以直接使用构造函数,也可以使用 sessionmaker 生成的可调用对象,我们可以直接传递 bind 参数,覆盖预先存在的绑定。 例如,我们可以从默认的 sessionmaker 创建 Session,并传递设置为自动提交的引擎:

plain_engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")

autocommit_engine = plain_engine.execution_options(isolation_level="AUTOCOMMIT")

# 通常使用 plain_engine
Session = sessionmaker(plain_engine)

# 创建一个特定的会话,将使用 "autocommit" 引擎
with Session(bind=autocommit_engine) as session:
    # 与会话一起工作
    ...

如果 Sessionsessionmaker 配置了多个“bind”,我们可以完全重新指定 binds 参数, 或者如果我们只想替换特定的绑定,可以使用 Session.bind_mapper()Session.bind_table() 方法:

with Session() as session:
    session.bind_mapper(User, autocommit_engine)

When we make a new Session, either using the constructor directly or when we call upon the callable produced by a sessionmaker, we can pass the bind argument directly, overriding the pre-existing bind. We can for example create our Session from a default sessionmaker and pass an engine set for autocommit:

plain_engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")

autocommit_engine = plain_engine.execution_options(isolation_level="AUTOCOMMIT")

# will normally use plain_engine
Session = sessionmaker(plain_engine)

# make a specific Session that will use the "autocommit" engine
with Session(bind=autocommit_engine) as session:
    # work with session
    ...

For the case where the Session or sessionmaker is configured with multiple “binds”, we can either re-specify the binds argument fully, or if we want to only replace specific binds, we can use the Session.bind_mapper() or Session.bind_table() methods:

with Session() as session:
    session.bind_mapper(User, autocommit_engine)

为单个事务设置隔离

Setting Isolation for Individual Transactions

关于隔离级别的一个关键注意事项是,不能安全地在已经启动事务的 Connection 上修改隔离级别。 数据库不能更改正在进行的事务的隔离级别,并且某些 DBAPIs 和 SQLAlchemy 方言在这方面的行为不一致。

因此,最好使用一个预先绑定到所需隔离级别的 Session。 但是,可以通过在事务开始时使用 Session.connection() 方法来在每个连接级别上影响隔离级别:

from sqlalchemy.orm import Session

# 假设会话刚刚构建
sess = Session(bind=engine)

# 在任何其他操作之前调用 connection() 并设置选项
# 这将从绑定的引擎中获取一个新连接并开始一个真实的数据库事务。
sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})

# ... 在 SERIALIZABLE 隔离级别下与会话一起工作...

# 提交事务。连接将被释放
# 并恢复到先前的隔离级别。
sess.commit()

# 在上面的 commit() 之后,可以根据需要开始新的事务,
# 该事务将继续使用先前的默认隔离级别,除非再次设置。

如上所示,我们首先通过构造函数或 sessionmaker 创建一个 Session。 然后,我们显式地通过调用 Session.connection() 设置数据库级别事务的开始,该方法提供的执行选项将在数据库级事务开始之前传递给连接。 该事务以选定的隔离级别继续进行。当事务完成后,连接的隔离级别将被重置为其默认值,然后返回到连接池。

同样,也可以使用 Session.begin() 方法开始 Session 级别的事务; 在此调用之后调用 Session.connection() 可以用于设置每个连接事务的隔离级别:

sess = Session(bind=engine)

with sess.begin():
    # 在任何其他操作之前调用 connection() 并设置选项
    # 这将从绑定的引擎中获取一个新连接并开始一个真实的数据库事务。
    sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})

    # ... 在 SERIALIZABLE 隔离级别下与会话一起工作...

# 在块外,事务已提交。连接被释放,并恢复到先前的隔离级别。

A key caveat regarding isolation level is that the setting cannot be safely modified on a Connection where a transaction has already started. Databases cannot change the isolation level of a transaction in progress, and some DBAPIs and SQLAlchemy dialects have inconsistent behaviors in this area.

Therefore it is preferable to use a Session that is up front bound to an engine with the desired isolation level. However, the isolation level on a per-connection basis can be affected by using the Session.connection() method at the start of a transaction:

from sqlalchemy.orm import Session

# assume session just constructed
sess = Session(bind=engine)

# call connection() with options before any other operations proceed.
# this will procure a new connection from the bound engine and begin a real
# database transaction.
sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})

# ... work with session in SERIALIZABLE isolation level...

# commit transaction.  the connection is released
# and reverted to its previous isolation level.
sess.commit()

# subsequent to commit() above, a new transaction may be begun if desired,
# which will proceed with the previous default isolation level unless
# it is set again.

Above, we first produce a Session using either the constructor or a sessionmaker. Then we explicitly set up the start of a database-level transaction by calling upon Session.connection(), which provides for execution options that will be passed to the connection before the database-level transaction is begun. The transaction proceeds with this selected isolation level. When the transaction completes, the isolation level is reset on the connection to its default before the connection is returned to the connection pool.

The Session.begin() method may also be used to begin the Session level transaction; calling upon Session.connection() subsequent to that call may be used to set up the per-connection-transaction isolation level:

sess = Session(bind=engine)

with sess.begin():
    # call connection() with options before any other operations proceed.
    # this will procure a new connection from the bound engine and begin a
    # real database transaction.
    sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})

    # ... work with session in SERIALIZABLE isolation level...

# outside the block, the transaction has been committed.  the connection is
# released and reverted to its previous isolation level.

使用事件跟踪事务状态

Tracking Transaction State with Events

请参阅 事务事件 一节,了解会话事务状态改变的可用事件挂钩的概述。

See the section 事务事件 for an overview of the available event hooks for session transaction state changes.

将会话加入外部事务(例如测试套件)

Joining a Session into an External Transaction (such as for test suites)

如果正在使用的 Connection 已经处于事务状态(即已经建立了 Transaction), 可以通过将 Session 绑定到该 Connection 来让 Session 参与该事务。 这种做法通常适用于测试套件,允许 ORM 代码自由地使用 Session,包括能够调用 Session.commit(), 然后将整个数据库交互回滚。

在 2.0 版本发生变更: “加入外部事务” 的方法在 2.0 中得到了改进;不再需要事件处理程序来“重置”嵌套事务。

该方法的工作原理是:首先在事务中建立一个 Connection,并可选地创建一个 SAVEPOINT,然后将其传递给 Session 作为“绑定”; 将 Session.join_transaction_mode 参数设置为 "create_savepoint",这表示在 Session 中应创建新的 SAVEPOINT,以便实现 BEGIN/COMMIT/ROLLBACK, 并确保外部事务保持原样。

当测试结束时,外部事务将被回滚,因此测试期间的所有数据更改都会被恢复:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from unittest import TestCase

# 全局应用范围。创建 Session 类和引擎
Session = sessionmaker()

engine = create_engine("postgresql+psycopg2://...")


class SomeTest(TestCase):
    def setUp(self):
        # 连接到数据库
        self.connection = engine.connect()

        # 开始一个非 ORM 事务
        self.trans = self.connection.begin()

        # 将个别 Session 绑定到连接,选择 "create_savepoint" join_transaction_mode
        self.session = Session(
            bind=self.connection, join_transaction_mode="create_savepoint"
        )

    def test_something(self):
        # 在测试中使用会话。

        self.session.add(Foo())
        self.session.commit()

    def test_something_with_rollbacks(self):
        self.session.add(Bar())
        self.session.flush()
        self.session.rollback()

        self.session.add(Foo())
        self.session.commit()

    def tearDown(self):
        self.session.close()

        # 回滚 - 上面使用 Session 所做的所有操作(包括调用 commit())将被回滚。
        self.trans.rollback()

        # 将连接返回给引擎
        self.connection.close()

上述方法是 SQLAlchemy 自身 CI 的一部分,用于确保其按预期工作。

If a Connection is being used which is already in a transactional state (i.e. has a Transaction established), a Session can be made to participate within that transaction by just binding the Session to that Connection. The usual rationale for this is a test suite that allows ORM code to work freely with a Session, including the ability to call Session.commit(), where afterwards the entire database interaction is rolled back.

在 2.0 版本发生变更: The “join into an external transaction” recipe is newly improved again in 2.0; event handlers to “reset” the nested transaction are no longer required.

The recipe works by establishing a Connection within a transaction and optionally a SAVEPOINT, then passing it to a Session as the “bind”; the Session.join_transaction_mode parameter is passed with the setting "create_savepoint", which indicates that new SAVEPOINTs should be created in order to implement BEGIN/COMMIT/ROLLBACK for the Session, which will leave the external transaction in the same state in which it was passed.

When the test tears down, the external transaction is rolled back so that any data changes throughout the test are reverted:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from unittest import TestCase

# global application scope.  create Session class, engine
Session = sessionmaker()

engine = create_engine("postgresql+psycopg2://...")


class SomeTest(TestCase):
    def setUp(self):
        # connect to the database
        self.connection = engine.connect()

        # begin a non-ORM transaction
        self.trans = self.connection.begin()

        # bind an individual Session to the connection, selecting
        # "create_savepoint" join_transaction_mode
        self.session = Session(
            bind=self.connection, join_transaction_mode="create_savepoint"
        )

    def test_something(self):
        # use the session in tests.

        self.session.add(Foo())
        self.session.commit()

    def test_something_with_rollbacks(self):
        self.session.add(Bar())
        self.session.flush()
        self.session.rollback()

        self.session.add(Foo())
        self.session.commit()

    def tearDown(self):
        self.session.close()

        # rollback - everything that happened with the
        # Session above (including calls to commit())
        # is rolled back.
        self.trans.rollback()

        # return connection to the Engine
        self.connection.close()

The above recipe is part of SQLAlchemy’s own CI to ensure that it remains working as expected.