连接池

Connection Pooling

连接池是一种标准技术,用于在内存中维护长时间运行的连接以实现高效重用,并管理应用程序可能同时使用的总连接数。

特别是对于服务器端Web应用程序,连接池是维护内存中活动数据库连接“池(pool)”的标准方法,这些连接在请求之间重用。

SQLAlchemy包括几种连接池实现,它们与 Engine 集成。它们也可以直接用于希望为普通DBAPI方法添加池化的应用程序。

A connection pool is a standard technique used to maintain long running connections in memory for efficient re-use, as well as to provide management for the total number of connections an application might use simultaneously.

Particularly for server-side web applications, a connection pool is the standard way to maintain a “pool” of active database connections in memory which are reused across requests.

SQLAlchemy includes several connection pool implementations which integrate with the Engine. They can also be used directly for applications that want to add pooling to an otherwise plain DBAPI approach.

连接池配置

Connection Pool Configuration

create_engine() 函数返回的 Engine 在大多数情况下默认集成了 QueuePool,并且已经预先配置了合理的连接池默认值。如果你阅读本节只是为了了解如何启用连接池——恭喜你!你已经完成了。

最常用的 QueuePool 调优参数可以作为关键字参数直接传递给 create_engine():包括 pool_sizemax_overflowpool_recyclepool_timeout。例如:

engine = create_engine(
    "postgresql+psycopg2://me@localhost/mydb", pool_size=20, max_overflow=0
)

所有 SQLAlchemy 的连接池实现有一个共同点: 不会“预创建”连接 —— 所有实现都会在首次使用时才创建连接。在那一时刻,如果没有更多的并发连接请求,那么也不会创建更多的连接。因此,create_engine() 默认使用一个大小为 5 的 QueuePool 是完全可以接受的,而不需要考虑应用是否真的需要五个连接排队 —— 连接池只会在应用实际并发使用了五个连接时才增长到该大小,在这种情况下使用一个小型连接池是完全合理的默认行为。

备注

QueuePool 不兼容 asyncio。 当使用 create_async_engine 创建 AsyncEngine 实例时, 会使用 AsyncAdaptedQueuePool 类,该类基于 asyncio 兼容的队列实现。

The Engine returned by the create_engine() function in most cases has a QueuePool integrated, pre-configured with reasonable pooling defaults. If you’re reading this section only to learn how to enable pooling - congratulations! You’re already done.

The most common QueuePool tuning parameters can be passed directly to create_engine() as keyword arguments: pool_size, max_overflow, pool_recycle and pool_timeout. For example:

engine = create_engine(
    "postgresql+psycopg2://me@localhost/mydb", pool_size=20, max_overflow=0
)

All SQLAlchemy pool implementations have in common that none of them “pre create” connections - all implementations wait until first use before creating a connection. At that point, if no additional concurrent checkout requests for more connections are made, no additional connections are created. This is why it’s perfectly fine for create_engine() to default to using a QueuePool of size five without regard to whether or not the application really needs five connections queued up - the pool would only grow to that size if the application actually used five connections concurrently, in which case the usage of a small pool is an entirely appropriate default behavior.

备注

The QueuePool class is not compatible with asyncio. When using create_async_engine to create an instance of AsyncEngine, the AsyncAdaptedQueuePool class, which makes use of an asyncio-compatible queue implementation, is used instead.

切换池实现

Switching Pool Implementations

使用 create_engine() 使用其他类型的连接池的常规方法是使用 poolclass 参数。该参数接受从 sqlalchemy.pool 模块导入的类,并自动处理构建连接池的细节。一个常见的用例是禁用连接池,可以通过使用 NullPool 实现来实现该目标:

from sqlalchemy.pool import NullPool

engine = create_engine(
    "postgresql+psycopg2://scott:tiger@localhost/test", poolclass=NullPool
)

The usual way to use a different kind of pool with create_engine() is to use the poolclass argument. This argument accepts a class imported from the sqlalchemy.pool module, and handles the details of building the pool for you. A common use case here is when connection pooling is to be disabled, which can be achieved by using the NullPool implementation:

from sqlalchemy.pool import NullPool

engine = create_engine(
    "postgresql+psycopg2://scott:tiger@localhost/test", poolclass=NullPool
)

使用自定义连接函数

Using a Custom Connection Function

请参阅 自定义 DBAPI connect() 参数/连接例程 部分,了解各种连接定制例程的概述。

See the section 自定义 DBAPI connect() 参数/连接例程 for a rundown of the various connection customization routines.

构建连接池

Constructing a Pool

要单独使用 Pool,唯一必需的参数是 creator 函数,它作为第一个参数传入,其后可以附加其他选项:

import sqlalchemy.pool as pool
import psycopg2


def getconn():
    c = psycopg2.connect(user="ed", host="127.0.0.1", dbname="test")
    return c


mypool = pool.QueuePool(getconn, max_overflow=10, pool_size=5)

然后可以使用 Pool.connect() 方法从连接池中获取 DBAPI 连接。该方法返回一个 DBAPI 连接对象,但该对象被封装在一个透明代理中:

# 获取连接
conn = mypool.connect()

# 使用连接
cursor_obj = conn.cursor()
cursor_obj.execute("select foo")

该透明代理的目的是拦截 close() 调用,使其不会真正关闭 DBAPI 连接,而是将其归还给连接池:

# “关闭”连接 —— 实际上是将其归还给连接池
conn.close()

当代理对象被垃圾回收时,它也会将其内部的 DBAPI 连接归还给连接池,但由于 Python 中垃圾回收的非确定性(尽管在 cPython 中通常是及时的),因此不推荐依赖此行为;特别是在 asyncio DBAPI 驱动下,这种用法是不受支持的。

To use a Pool by itself, the creator function is the only argument that’s required and is passed first, followed by any additional options:

import sqlalchemy.pool as pool
import psycopg2


def getconn():
    c = psycopg2.connect(user="ed", host="127.0.0.1", dbname="test")
    return c


mypool = pool.QueuePool(getconn, max_overflow=10, pool_size=5)

DBAPI connections can then be procured from the pool using the Pool.connect() function. The return value of this method is a DBAPI connection that’s contained within a transparent proxy:

# get a connection
conn = mypool.connect()

# use it
cursor_obj = conn.cursor()
cursor_obj.execute("select foo")

The purpose of the transparent proxy is to intercept the close() call, such that instead of the DBAPI connection being closed, it is returned to the pool:

# "close" the connection.  Returns
# it to the pool.
conn.close()

The proxy also returns its contained DBAPI connection to the pool when it is garbage collected, though it’s not deterministic in Python that this occurs immediately (though it is typical with cPython). This usage is not recommended however and in particular is not supported with asyncio DBAPI drivers.

返回时重置

Reset On Return

该连接池实现包含了“返回时重置”行为 —— 在连接归还到连接池时会调用 DBAPI 连接的 rollback() 方法。这是为了清除连接上的任何事务状态,不仅包括未提交的数据,还包括表和行级别的锁。对大多数 DBAPI 而言,调用 rollback() 的开销很小;如果当前并没有活动事务,该方法通常是空操作(no-op)。

The pool includes “reset on return” behavior which will call the rollback() method of the DBAPI connection when the connection is returned to the pool. This is so that any existing transactional state is removed from the connection, which includes not just uncommitted data but table and row locks as well. For most DBAPIs, the call to rollback() is inexpensive, and if the DBAPI has already completed a transaction, the method should be a no-op.

禁用非事务性连接的返回时重置

Disabling Reset on Return for non-transactional connections

在某些特定场景下,这种 rollback() 行为可能是不必要的,例如当连接被配置为 autocommit 模式,或所使用的数据库引擎(如 MySQL 的 MyISAM 引擎)本身不具备 ACID 能力时。这种情况下通常出于性能考虑,会禁用返回时的重置行为。可以通过设置 PoolPool.reset_on_return 参数来实现禁用,也可以通过 create_engine()create_engine.pool_reset_on_return 参数传入 None 值来设置。例如,结合 create_engine.isolation_level 参数设为 AUTOCOMMIT:

non_acid_engine = create_engine(
    "mysql://scott:tiger@host/db",
    pool_reset_on_return=None,
    isolation_level="AUTOCOMMIT",
)

上述 engine 在连接归还连接池时将不会执行 ROLLBACK;因为启用了 AUTOCOMMIT,驱动也不会执行 BEGIN 操作。

For very specific cases where this rollback() is not useful, such as when using a connection that is configured for autocommit or when using a database that has no ACID capabilities such as the MyISAM engine of MySQL, the reset-on-return behavior can be disabled, which is typically done for performance reasons. This can be affected by using the Pool.reset_on_return parameter of Pool, which is also available from create_engine() as create_engine.pool_reset_on_return, passing a value of None. This is illustrated in the example below, in conjunction with the create_engine.isolation_level parameter setting of AUTOCOMMIT:

non_acid_engine = create_engine(
    "mysql://scott:tiger@host/db",
    pool_reset_on_return=None,
    isolation_level="AUTOCOMMIT",
)

The above engine won’t actually perform ROLLBACK when connections are returned to the pool; since AUTOCOMMIT is enabled, the driver will also not perform any BEGIN operation.

自定义返回时重置方案

Custom Reset-on-Return Schemes

仅仅使用 rollback() 进行“重置”对于某些用例来说可能还不够充分;特别是对于使用临时表的应用,可能希望连接归还时自动清除这些临时表。部分(但并非所有)后端数据库提供了“重置”这些连接作用域资源的机制,这可能是连接池重置行为中所期望的功能。其他资源如 prepared statement 句柄、服务器端语句缓存等也可能在连接归还后继续存在,这取决于具体情况是否合适。某些后端(同样,并非全部)也可能提供相关的状态重置机制。SQLAlchemy 提供的 dialect 中,有两个已知支持此类重置机制的包括:

  • Microsoft SQL Server:有一个非官方但广为人知的存储过程 sp_reset_connection

  • PostgreSQL:提供一组文档化良好的命令,如 DISCARDRESETDEALLOCATEUNLISTEN

备注

下一段及其示例应与 mssql/base.py 示例保持一致

下面的示例展示了如何使用 Microsoft SQL Server 的 sp_reset_connection 存储过程替换默认的“返回时重置”行为,通过 PoolEvents.reset() 事件钩子实现。我们将 create_engine.pool_reset_on_return 设置为 None,从而完全禁用默认行为,并由自定义方案取代。在实现中仍然调用了 .rollback(),以确保 DBAPI 内部的提交/回滚状态与事务状态保持一致:

from sqlalchemy import create_engine
from sqlalchemy import event

mssql_engine = create_engine(
    "mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+17+for+SQL+Server",
    # 禁用默认的“返回时重置”机制
    pool_reset_on_return=None,
)


@event.listens_for(mssql_engine, "reset")
def _reset_mssql(dbapi_connection, connection_record, reset_state):
    if not reset_state.terminate_only:
        dbapi_connection.execute("{call sys.sp_reset_connection}")

    # 保证 DBAPI 自身状态的一致性
    dbapi_connection.rollback()

在 2.0.0b3 版本发生变更: PoolEvents.reset() 事件添加了更多状态参数,并确保该事件在所有“重置”场景中均被触发,使其成为自定义“重置”处理器的推荐位置。 之前依赖 PoolEvents.checkin() 的方案仍然可用。

“reset on return” consisting of a single rollback() may not be sufficient for some use cases; in particular, applications which make use of temporary tables may wish for these tables to be automatically removed on connection checkin. Some (but notably not all) backends include features that can “reset” such tables within the scope of a database connection, which may be a desirable behavior for connection pool reset. Other server resources such as prepared statement handles and server-side statement caches may persist beyond the checkin process, which may or may not be desirable, depending on specifics. Again, some (but again not all) backends may provide for a means of resetting this state. The two SQLAlchemy included dialects which are known to have such reset schemes include Microsoft SQL Server, where an undocumented but widely known stored procedure called sp_reset_connection is often used, and PostgreSQL, which has a well-documented series of commands including DISCARD RESET, DEALLOCATE, and UNLISTEN.

The following example illustrates how to replace reset on return with the Microsoft SQL Server sp_reset_connection stored procedure, using the PoolEvents.reset() event hook. The create_engine.pool_reset_on_return parameter is set to None so that the custom scheme can replace the default behavior completely. The custom hook implementation calls .rollback() in any case, as it’s usually important that the DBAPI’s own tracking of commit/rollback will remain consistent with the state of the transaction:

from sqlalchemy import create_engine
from sqlalchemy import event

mssql_engine = create_engine(
    "mssql+pyodbc://scott:tiger^5HHH@mssql2017:1433/test?driver=ODBC+Driver+17+for+SQL+Server",
    # disable default reset-on-return scheme
    pool_reset_on_return=None,
)


@event.listens_for(mssql_engine, "reset")
def _reset_mssql(dbapi_connection, connection_record, reset_state):
    if not reset_state.terminate_only:
        dbapi_connection.execute("{call sys.sp_reset_connection}")

    # so that the DBAPI itself knows that the connection has been
    # reset
    dbapi_connection.rollback()

在 2.0.0b3 版本发生变更: Added additional state arguments to the PoolEvents.reset() event and additionally ensured the event is invoked for all “reset” occurrences, so that it’s appropriate as a place for custom “reset” handlers. Previous schemes which use the PoolEvents.checkin() handler remain usable as well.

记录返回时重置事件

Logging reset-on-return events

连接池事件(包括“返回时重置”)的日志记录可以通过将 sqlalchemy.pool 记录器的日志级别设置为 logging.DEBUG 来启用,或者在使用 create_engine() 时,将 create_engine.echo_pool 参数设为 "debug" 来启用:

>>> from sqlalchemy import create_engine
>>> engine = create_engine("postgresql://scott:tiger@localhost/test", echo_pool="debug")

上述连接池将在日志中显示详细信息,包括“返回时重置”行为:

>>> c1 = engine.connect()
DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <connection object ...>
DEBUG sqlalchemy.pool.impl.QueuePool Connection <connection object ...> checked out from pool
>>> c1.close()
DEBUG sqlalchemy.pool.impl.QueuePool Connection <connection object ...> being returned to pool
DEBUG sqlalchemy.pool.impl.QueuePool Connection <connection object ...> rollback-on-return

Logging for pool events including reset on return can be set logging.DEBUG log level along with the sqlalchemy.pool logger, or by setting create_engine.echo_pool to "debug" when using create_engine():

>>> from sqlalchemy import create_engine
>>> engine = create_engine("postgresql://scott:tiger@localhost/test", echo_pool="debug")

The above pool will show verbose logging including reset on return:

>>> c1 = engine.connect()
DEBUG sqlalchemy.pool.impl.QueuePool Created new connection <connection object ...>
DEBUG sqlalchemy.pool.impl.QueuePool Connection <connection object ...> checked out from pool
>>> c1.close()
DEBUG sqlalchemy.pool.impl.QueuePool Connection <connection object ...> being returned to pool
DEBUG sqlalchemy.pool.impl.QueuePool Connection <connection object ...> rollback-on-return

池事件

Pool Events

连接池支持一个事件接口,允许在首次连接、每次新建连接、以及连接的借出(checkout)与归还(checkin)时执行钩子操作。详见 PoolEvents

Connection pools support an event interface that allows hooks to execute upon first connect, upon each new connection, and upon checkout and checkin of connections. See PoolEvents for details.

处理断开连接

Dealing with Disconnects

连接池具备刷新单个连接及其整体连接集的能力,能够将先前保留的连接标记为“无效(invalid)”。这种机制的一个常见用途是当数据库服务器重启导致所有已有连接失效时,允许连接池平稳恢复。对此有两种常见策略。

The connection pool has the ability to refresh individual connections as well as its entire set of connections, setting the previously pooled connections as “invalid”. A common use case is allow the connection pool to gracefully recover when the database server has been restarted, and all previously established connections are no longer functional. There are two approaches to this.

断开连接处理 - 悲观

Disconnect Handling - Pessimistic

悲观策略(Pessimistic Approach)是在每次连接借出时执行一条测试语句,以验证数据库连接仍然有效。其实现依赖于具体的方言(dialect),可以使用 DBAPI 特定的 ping 方法,或执行一条简单的 SQL 语句(如 “SELECT 1”)来检测连接是否存活。

这种方法在连接借出的过程中引入了少许额外开销,但它是最简单可靠的方法,能够有效防止由于过期连接导致的数据库错误。应用程序无需自行组织逻辑来处理连接池中借出的过期连接。

可以通过 Pool.pre_ping 参数启用借出时的连接测试功能,使用 create_engine() 时可通过 create_engine.pool_pre_ping 参数设置:

engine = create_engine("mysql+pymysql://user:pw@host/db", pool_pre_ping=True)

“预 ping”功能是基于每个方言实现的。如果方言支持 DBAPI 的“ping”方法,则会调用该方法;否则将发出等效于 “SELECT 1” 的 SQL 查询,并捕获错误以判断是否为“断开连接”的情况。如果判断出连接不可用,该连接将被立即回收,并且所有旧于当前时间的其他连接也将被标记为无效,这样它们下次被借出时也会被自动回收。

如果在“预 ping”运行时数据库仍不可用,则初始连接会失败,并正常抛出连接失败的错误。在一种少见情况下,如果数据库能接受连接但无法响应“ping”,则“预 ping”最多重试三次,若仍失败,则抛出最后捕获的数据库错误。

需要特别注意的是,“预 ping”方法 无法处理在事务过程中或其他 SQL 操作中连接中断的情况 。如果在事务执行过程中数据库变得不可用,则该事务将丢失,同时抛出数据库错误。尽管 Connection 对象会检测到“断开连接”状态,并自动回收该连接并使整个连接池失效,但引发异常的具体操作将无法恢复,应用程序必须自行决定是否放弃该操作或重试整个事务。如果 engine 使用 DBAPI 级别的自动提交(autocommit)连接(详见 设置事务隔离级别(包括 DBAPI 自动提交)),那么在操作中间可能会通过事件机制自动重新连接。有关示例请参阅 如何自动“重试”语句执行?

对于那些通过执行 “SELECT 1” 并捕获错误来判断断连的方言,可以使用 DialectEvents.handle_error() 钩子扩展对新后端特定错误信息的处理能力。

The pessimistic approach refers to emitting a test statement on the SQL connection at the start of each connection pool checkout, to test that the database connection is still viable. The implementation is dialect-specific, and makes use of either a DBAPI-specific ping method, or by using a simple SQL statement like “SELECT 1”, in order to test the connection for liveness.

The approach adds a small bit of overhead to the connection checkout process, however is otherwise the most simple and reliable approach to completely eliminating database errors due to stale pooled connections. The calling application does not need to be concerned about organizing operations to be able to recover from stale connections checked out from the pool.

Pessimistic testing of connections upon checkout is achievable by using the Pool.pre_ping argument, available from create_engine() via the create_engine.pool_pre_ping argument:

engine = create_engine("mysql+pymysql://user:pw@host/db", pool_pre_ping=True)

The “pre ping” feature operates on a per-dialect basis either by invoking a DBAPI-specific “ping” method, or if not available will emit SQL equivalent to “SELECT 1”, catching any errors and detecting the error as a “disconnect” situation. If the ping / error check determines that the connection is not usable, the connection will be immediately recycled, and all other pooled connections older than the current time are invalidated, so that the next time they are checked out, they will also be recycled before use.

If the database is still not available when “pre ping” runs, then the initial connect will fail and the error for failure to connect will be propagated normally. In the uncommon situation that the database is available for connections, but is not able to respond to a “ping”, the “pre_ping” will try up to three times before giving up, propagating the database error last received.

It is critical to note that the pre-ping approach does not accommodate for connections dropped in the middle of transactions or other SQL operations. If the database becomes unavailable while a transaction is in progress, the transaction will be lost and the database error will be raised. While the Connection object will detect a “disconnect” situation and recycle the connection as well as invalidate the rest of the connection pool when this condition occurs, the individual operation where the exception was raised will be lost, and it’s up to the application to either abandon the operation, or retry the whole transaction again. If the engine is configured using DBAPI-level autocommit connections, as described at 设置事务隔离级别(包括 DBAPI 自动提交), a connection may be reconnected transparently mid-operation using events. See the section 如何自动“重试”语句执行? for an example.

For dialects that make use of “SELECT 1” and catch errors in order to detect disconnects, the disconnection test may be augmented for new backend-specific error messages using the DialectEvents.handle_error() hook.

自定义/旧式悲观 Ping

Custom / Legacy Pessimistic Ping

create_engine.pool_pre_ping 参数被添加之前,”预 ping” 策略历来是通过手动方式实现的,即使用 ConnectionEvents.engine_connect() 引擎事件。以下是该方式的常见示例,供参考使用,特别适用于已经采用此方案的应用程序,或需要自定义行为的场景:

from sqlalchemy import exc
from sqlalchemy import event
from sqlalchemy import select

some_engine = create_engine(...)

@event.listens_for(some_engine, "engine_connect")
def ping_connection(connection, branch):
    if branch:
        # 从 SQLAlchemy 2.0 开始,该参数始终为 False,
        # 但事件钩子仍然接受它。在 SQLAlchemy 1.x 中,应跳过“分支”连接。
        return

    try:
        # 执行一条 SELECT 1 查询。使用 core 的 select()
        # 可以确保无表的标量查询格式符合后端要求。
        connection.scalar(select(1))
    except exc.DBAPIError as err:
        # 捕获 SQLAlchemy 的 DBAPIError,它是 DBAPI 异常的包装器。
        # 该异常包含 .connection_invalidated 属性,用于标识是否为“断开连接”情况,
        # 判断依据是当前方言对原始异常的解析。
        if err.connection_invalidated:
            # 再次执行相同的 SELECT 查询,连接将自我验证并建立新连接。
            # 同时,该断连检测还会导致整个连接池失效,从而丢弃所有过期连接。
            connection.scalar(select(1))
        else:
            raise

上述示例的优势在于,它利用了 SQLAlchemy 提供的机制来检测那些已知表示“断开连接”的 DBAPI 异常,同时结合 Engine 对象在此类情况下自动使连接池失效,并允许当前的 Connection 使用新的 DBAPI 连接重新验证的能力。

Before create_engine.pool_pre_ping was added, the “pre-ping” approach historically has been performed manually using the ConnectionEvents.engine_connect() engine event. The most common recipe for this is below, for reference purposes in case an application is already using such a recipe, or special behaviors are needed:

from sqlalchemy import exc
from sqlalchemy import event
from sqlalchemy import select

some_engine = create_engine(...)


@event.listens_for(some_engine, "engine_connect")
def ping_connection(connection, branch):
    if branch:
        # this parameter is always False as of SQLAlchemy 2.0,
        # but is still accepted by the event hook.  In 1.x versions
        # of SQLAlchemy, "branched" connections should be skipped.
        return

    try:
        # run a SELECT 1.   use a core select() so that
        # the SELECT of a scalar value without a table is
        # appropriately formatted for the backend
        connection.scalar(select(1))
    except exc.DBAPIError as err:
        # catch SQLAlchemy's DBAPIError, which is a wrapper
        # for the DBAPI's exception.  It includes a .connection_invalidated
        # attribute which specifies if this connection is a "disconnect"
        # condition, which is based on inspection of the original exception
        # by the dialect in use.
        if err.connection_invalidated:
            # run the same SELECT again - the connection will re-validate
            # itself and establish a new connection.  The disconnect detection
            # here also causes the whole connection pool to be invalidated
            # so that all stale connections are discarded.
            connection.scalar(select(1))
        else:
            raise

The above recipe has the advantage that we are making use of SQLAlchemy’s facilities for detecting those DBAPI exceptions that are known to indicate a “disconnect” situation, as well as the Engine object’s ability to correctly invalidate the current connection pool when this condition occurs and allowing the current Connection to re-validate onto a new DBAPI connection.

断开连接处理 - 乐观

Disconnect Handling - Optimistic

当未启用悲观处理机制,或数据库在连接使用过程中(例如事务中)被关闭或重启时,另一种处理过期/关闭连接的方式是让 SQLAlchemy 在异常发生时自动处理断连情况。此时,连接池中的所有连接都会被失效处理,即被视为过期,在下次借出时自动刷新。此行为依赖于 PoolEngine 联合使用。Engine 包含可检测断连事件并自动刷新连接池的逻辑。

Connection 尝试使用 DBAPI 连接并遇到表示“断连”的异常时,该连接将被标记为失效。此时 Connection 会调用 Pool.recreate() 方法,从而使所有未借出的连接失效,在下次借出时替换为新连接。以下示例展示了此流程:

from sqlalchemy import create_engine, exc

e = create_engine(...)
c = e.connect()

try:
    # 假设数据库已被重启。
    c.execute(text("SELECT * FROM table"))
    c.close()
except exc.DBAPIError as e:
    # 抛出异常,连接被标记为失效。
    if e.connection_invalidated:
        print("Connection was invalidated!")

# 在失效事件之后,下一次连接将使用新的连接池连接。
c = e.connect()
c.execute(text("SELECT * FROM table"))

以上示例说明在检测到断连事件后无需额外干预,连接池将正常继续工作。但需要注意的是,对于每一个在断连事件发生时正在使用的连接,都会抛出一次数据库异常。在典型的使用 ORM Session 的 Web 应用中,这通常表现为某个请求返回 500 错误,而应用之后会继续正常运行。因此,该方法是“乐观”的,即假设数据库不会频繁重启。

When pessimistic handling is not employed, as well as when the database is shutdown and/or restarted in the middle of a connection’s period of use within a transaction, the other approach to dealing with stale / closed connections is to let SQLAlchemy handle disconnects as they occur, at which point all connections in the pool are invalidated, meaning they are assumed to be stale and will be refreshed upon next checkout. This behavior assumes the Pool is used in conjunction with a Engine. The Engine has logic which can detect disconnection events and refresh the pool automatically.

When the Connection attempts to use a DBAPI connection, and an exception is raised that corresponds to a “disconnect” event, the connection is invalidated. The Connection then calls the Pool.recreate() method, effectively invalidating all connections not currently checked out so that they are replaced with new ones upon next checkout. This flow is illustrated by the code example below:

from sqlalchemy import create_engine, exc

e = create_engine(...)
c = e.connect()

try:
    # suppose the database has been restarted.
    c.execute(text("SELECT * FROM table"))
    c.close()
except exc.DBAPIError as e:
    # an exception is raised, Connection is invalidated.
    if e.connection_invalidated:
        print("Connection was invalidated!")

# after the invalidate event, a new connection
# starts with a new Pool
c = e.connect()
c.execute(text("SELECT * FROM table"))

The above example illustrates that no special intervention is needed to refresh the pool, which continues normally after a disconnection event is detected. However, one database exception is raised, per each connection that is in use while the database unavailability event occurred. In a typical web application using an ORM Session, the above condition would correspond to a single request failing with a 500 error, then the web application continuing normally beyond that. Hence the approach is “optimistic” in that frequent database restarts are not anticipated.

设置池回收

Setting Pool Recycle

为增强“乐观”策略的健壮性,可以设置连接池的 recycle 参数。该参数用于避免使用已存在超过指定时长的连接,适用于如 MySQL 这类会在空闲一段时间后自动关闭连接的后端数据库:

from sqlalchemy import create_engine

e = create_engine("mysql+mysqldb://scott:tiger@localhost/test", pool_recycle=3600)

如上所示,任何存在时间超过一小时的 DBAPI 连接在下一次借出时都会被自动标记为失效并替换。注意,该失效 在借出时发生——不会影响当前已借出的连接。pool_recyclePool 的一个特性,与是否使用 Engine 无关。

An additional setting that can augment the “optimistic” approach is to set the pool recycle parameter. This parameter prevents the pool from using a particular connection that has passed a certain age, and is appropriate for database backends such as MySQL that automatically close connections that have been stale after a particular period of time:

from sqlalchemy import create_engine

e = create_engine("mysql+mysqldb://scott:tiger@localhost/test", pool_recycle=3600)

Above, any DBAPI connection that has been open for more than one hour will be invalidated and replaced, upon next checkout. Note that the invalidation only occurs during checkout - not on any connections that are held in a checked out state. pool_recycle is a function of the Pool itself, independent of whether or not an Engine is in use.

更多关于失效的信息

More on Invalidation

Pool 提供了“连接失效”机制,支持显式失效和自动失效两种方式,用于处理那些已无法继续使用的连接。

“失效”指的是某个 DBAPI 连接将被从连接池中移除并丢弃。如果当前不确定连接是否已关闭,将调用其 .close() 方法;若此方法抛出异常,该异常会被记录,但整体操作仍将继续。

在使用 Engine 时,通常通过 Connection.invalidate() 方法触发显式失效。其他可能触发失效的情况包括:

  • 当调用 connection.execute() 等方法时,发生如 OperationalError 的 DBAPI 异常,并被识别为“断开连接”情况。由于 Python DBAPI 并未标准化异常分类,SQLAlchemy 各个方言使用一个名为 is_disconnect() 的机制来检查异常对象的内容(包括错误信息字符串和错误码等)以判断连接是否已不可用。如是,将调用 _ConnectionFairy.invalidate() 方法并丢弃连接。

  • 当连接归还到连接池,在执行 connection.rollback()connection.commit() 方法(依据连接池“返回时重置”策略)时抛出异常。此时会最后尝试执行 .close(),之后丢弃连接。

  • 当某个实现了 PoolEvents.checkout() 的事件监听器抛出 DisconnectionError 异常,表示该连接不可用,需重新建立连接。

所有失效事件都会触发 PoolEvents.invalidate() 事件。

The Pool provides “connection invalidation” services which allow both explicit invalidation of a connection as well as automatic invalidation in response to conditions that are determined to render a connection unusable.

“Invalidation” means that a particular DBAPI connection is removed from the pool and discarded. The .close() method is called on this connection if it is not clear that the connection itself might not be closed, however if this method fails, the exception is logged but the operation still proceeds.

When using a Engine, the Connection.invalidate() method is the usual entrypoint to explicit invalidation. Other conditions by which a DBAPI connection might be invalidated include:

  • a DBAPI exception such as OperationalError, raised when a method like connection.execute() is called, is detected as indicating a so-called “disconnect” condition. As the Python DBAPI provides no standard system for determining the nature of an exception, all SQLAlchemy dialects include a system called is_disconnect() which will examine the contents of an exception object, including the string message and any potential error codes included with it, in order to determine if this exception indicates that the connection is no longer usable. If this is the case, the _ConnectionFairy.invalidate() method is called and the DBAPI connection is then discarded.

  • When the connection is returned to the pool, and calling the connection.rollback() or connection.commit() methods, as dictated by the pool’s “reset on return” behavior, throws an exception. A final attempt at calling .close() on the connection will be made, and it is then discarded.

  • When a listener implementing PoolEvents.checkout() raises the DisconnectionError exception, indicating that the connection won’t be usable and a new connection attempt needs to be made.

All invalidations which occur will invoke the PoolEvents.invalidate() event.

支持断开连接场景的新数据库错误代码

Supporting new database error codes for disconnect scenarios

以下是翻译后的 rst 文本,保留了原有的 rst 格式和 SQLAlchemy 技术风格:

SQLAlchemy 的各个方言(dialect)都实现了一个名为 is_disconnect() 的例程,在遇到 DBAPI 异常时会调用该方法。该方法接收 DBAPI 异常对象作为参数,并通过方言特定的启发式方法来判断错误码是否表明数据库连接已被“断开”,或处于其他不可用状态,需要进行回收。此处使用的启发式方法可通过事件钩子 DialectEvents.handle_error() 进行定制,通常通过所属的 Engine 对象来注册该事件钩子。使用此钩子时,所有发生的错误都会传入一个上下文对象 ExceptionContext。自定义事件钩子可以控制某个错误是否应被视为“断开”状态,以及该断开是否应导致整个连接池被失效。

例如,为了让 Oracle 数据库驱动错误码 DPY-1001DPY-4011 被识别为断开错误码,可以在创建引擎之后添加如下事件处理器:

import re

from sqlalchemy import create_engine

engine = create_engine(
    "oracle+oracledb://scott:tiger@localhost:1521?service_name=freepdb1"
)


@event.listens_for(engine, "handle_error")
def handle_exception(context: ExceptionContext) -> None:
    if not context.is_disconnect and re.match(
        r"^(?:DPY-1001|DPY-4011)", str(context.original_exception)
    ):
        context.is_disconnect = True

    return None

上述错误处理函数将在所有 Oracle 数据库错误触发时被调用,包括当启用 pool pre ping 功能时(此功能在 2.0 中引入)所捕获的错误。

SQLAlchemy dialects each include a routine called is_disconnect() that is invoked whenever a DBAPI exception is encountered. The DBAPI exception object is passed to this method, where dialect-specific heuristics will then determine if the error code received indicates that the database connection has been “disconnected”, or is in an otherwise unusable state which indicates it should be recycled. The heuristics applied here may be customized using the DialectEvents.handle_error() event hook, which is typically established via the owning Engine object. Using this hook, all errors which occur are delivered passing along a contextual object known as ExceptionContext. Custom event hooks may control whether or not a particular error should be considered a “disconnect” situation or not, as well as if this disconnect should cause the entire connection pool to be invalidated or not.

For example, to add support to consider the Oracle Database driver error codes DPY-1001 and DPY-4011 to be handled as disconnect codes, apply an event handler to the engine after creation:

import re

from sqlalchemy import create_engine

engine = create_engine(
    "oracle+oracledb://scott:tiger@localhost:1521?service_name=freepdb1"
)


@event.listens_for(engine, "handle_error")
def handle_exception(context: ExceptionContext) -> None:
    if not context.is_disconnect and re.match(
        r"^(?:DPY-1001|DPY-4011)", str(context.original_exception)
    ):
        context.is_disconnect = True

    return None

The above error processing function will be invoked for all Oracle Database errors raised, including those caught when using the pool pre ping feature for those backends that rely upon disconnect error handling (new in 2.0).

使用 FIFO 与 LIFO

Using FIFO vs. LIFO

QueuePool 类包含一个名为 QueuePool.use_lifo 的标志,也可以通过 create_engine() 函数的参数 create_engine.pool_use_lifo 进行设置。将此标志设为 True 会使连接池的“队列”行为改为“栈”行为,即最后归还到连接池的连接会是下一次请求中最先使用的。与连接池默认的先进先出(FIFO)行为相对,后进先出(LIFO)模式允许多余的连接在池中保持空闲,从而让服务器端的超时机制关闭这些连接。FIFO 与 LIFO 之间的区别,本质上在于是否希望连接池在空闲期间依然保持完整的连接集:

engine = create_engine("postgresql://", pool_use_lifo=True, pool_pre_ping=True)

上述示例中,我们还启用了 create_engine.pool_pre_ping 标志,以便连接池在服务器端关闭连接时能够优雅处理,并替换为新的连接。

请注意,该标志仅适用于 QueuePool

The QueuePool class features a flag called QueuePool.use_lifo, which can also be accessed from create_engine() via the flag create_engine.pool_use_lifo. Setting this flag to True causes the pool’s “queue” behavior to instead be that of a “stack”, e.g. the last connection to be returned to the pool is the first one to be used on the next request. In contrast to the pool’s long- standing behavior of first-in-first-out, which produces a round-robin effect of using each connection in the pool in series, lifo mode allows excess connections to remain idle in the pool, allowing server-side timeout schemes to close these connections out. The difference between FIFO and LIFO is basically whether or not its desirable for the pool to keep a full set of connections ready to go even during idle periods:

engine = create_engine("postgresql://", pool_use_lifo=True, pool_pre_ping=True)

Above, we also make use of the create_engine.pool_pre_ping flag so that connections which are closed from the server side are gracefully handled by the connection pool and replaced with a new connection.

Note that the flag only applies to QueuePool use.

使用具有多处理或 os.fork() 的连接池

Using Connection Pools with Multiprocessing or os.fork()

当使用连接池(从而也包括通过 create_engine() 创建的 Engine)时, 务必不要将连接池共享给子进程 。TCP 连接由文件描述符表示,这些描述符通常能跨进程使用,这意味着可能会导致多个独立 Python 解释器状态同时访问同一个文件描述符。

根据驱动和操作系统的不同,可能出现的情况从连接无法使用到多个进程同时使用一个 socket 连接而导致的消息错乱(通常是最常见的情况)。

SQLAlchemy 的 Engine 对象引用的是一个已存在数据库连接的连接池。因此当该对象被复制到子进程时,目标是确保不会将任何数据库连接带入子进程。有以下四种通用方法可实现此目的:

  1. 使用 NullPool 禁用连接池。这是一种最简单的一次性机制,确保 Engine 创建的连接不会被重复使用:

    from sqlalchemy.pool import NullPool
    
    engine = create_engine("mysql+mysqldb://user:pass@host/dbname", poolclass=NullPool)
  2. 在子进程初始化阶段对任何给定的 Engine 调用 Engine.dispose(),并传入参数 Engine.dispose.close=False。这样可确保子进程不会触及父进程的连接,而是从新连接开始使用。 这是推荐方式:

    from multiprocessing import Pool
    
    engine = create_engine("mysql+mysqldb://user:pass@host/dbname")
    
    
    def run_in_process(some_data_record):
        with engine.connect() as conn:
            conn.execute(text("..."))
    
    
    def initializer():
        """确保子进程不会使用父进程的连接"""
        engine.dispose(close=False)
    
    
    with Pool(10, initializer=initializer) as p:
        p.map(run_in_process, data)

    在 1.4.33 版本加入: 添加了 Engine.dispose.close 参数,用于在子进程中替换连接池而不干扰父进程连接的使用。

  3. 在创建子进程 之前 直接调用 Engine.dispose()。这也能使子进程从新的连接池开始,同时确保父进程的连接不会被转移:

    engine = create_engine("mysql://user:pass@host/dbname")
    
    
    def run_in_process():
        with engine.connect() as conn:
            conn.execute(text("..."))
    
    
    # 在进程启动前调用 dispose()
    engine.dispose()
    p = Process(target=run_in_process)
    p.start()
  4. 可以为连接池添加事件处理器,用于检测连接是否被跨进程共享,并使其失效:

    from sqlalchemy import event
    from sqlalchemy import exc
    import os
    
    engine = create_engine("...")
    
    
    @event.listens_for(engine, "connect")
    def connect(dbapi_connection, connection_record):
        connection_record.info["pid"] = os.getpid()
    
    
    @event.listens_for(engine, "checkout")
    def checkout(dbapi_connection, connection_record, connection_proxy):
        pid = os.getpid()
        if connection_record.info["pid"] != pid:
            connection_record.dbapi_connection = connection_proxy.dbapi_connection = None
            raise exc.DisconnectionError(
                "连接记录属于进程 %s,"
                "当前尝试在进程 %s 中检出" % (connection_record.info["pid"], pid)
            )

上述方法中,我们采用了与 断开连接处理 - 悲观 中描述的方法类似的手段,将来源于其他父进程的 DBAPI 连接视为“无效”,强制连接池回收并创建新连接。

以上策略适用于 Engine 被多个进程共享的情况。对于跨进程共享某个特定 Connection 的情况,上述做法并不充分;应始终确保某个特定的 Connection 仅限于单个进程(和线程)使用。同样,不支持将任何处于事务状态的对象直接跨进程共享,例如已启动事务并持有活动 Connection 实例的 ORM Session 对象;在新进程中应创建新的 Session 实例。

It’s critical that when using a connection pool, and by extension when using an Engine created via create_engine(), that the pooled connections are not shared to a forked process. TCP connections are represented as file descriptors, which usually work across process boundaries, meaning this will cause concurrent access to the file descriptor on behalf of two or more entirely independent Python interpreter states.

Depending on specifics of the driver and OS, the issues that arise here range from non-working connections to socket connections that are used by multiple processes concurrently, leading to broken messaging (the latter case is typically the most common).

The SQLAlchemy Engine object refers to a connection pool of existing database connections. So when this object is replicated to a child process, the goal is to ensure that no database connections are carried over. There are four general approaches to this:

  1. Disable pooling using NullPool. This is the most simplistic, one shot system that prevents the Engine from using any connection more than once:

    from sqlalchemy.pool import NullPool
    
    engine = create_engine("mysql+mysqldb://user:pass@host/dbname", poolclass=NullPool)
  2. Call Engine.dispose() on any given Engine, passing the Engine.dispose.close parameter with a value of False, within the initialize phase of the child process. This is so that the new process will not touch any of the parent process’ connections and will instead start with new connections. This is the recommended approach:

    from multiprocessing import Pool
    
    engine = create_engine("mysql+mysqldb://user:pass@host/dbname")
    
    
    def run_in_process(some_data_record):
        with engine.connect() as conn:
            conn.execute(text("..."))
    
    
    def initializer():
        """ensure the parent proc's database connections are not touched
        in the new connection pool"""
        engine.dispose(close=False)
    
    
    with Pool(10, initializer=initializer) as p:
        p.map(run_in_process, data)

    在 1.4.33 版本加入: Added the Engine.dispose.close parameter to allow the replacement of a connection pool in a child process without interfering with the connections used by the parent process.

  3. Call Engine.dispose() directly before the child process is created. This will also cause the child process to start with a new connection pool, while ensuring the parent connections are not transferred to the child process:

    engine = create_engine("mysql://user:pass@host/dbname")
    
    
    def run_in_process():
        with engine.connect() as conn:
            conn.execute(text("..."))
    
    
    # before process starts, ensure engine.dispose() is called
    engine.dispose()
    p = Process(target=run_in_process)
    p.start()
  4. An event handler can be applied to the connection pool that tests for connections being shared across process boundaries, and invalidates them:

    from sqlalchemy import event
    from sqlalchemy import exc
    import os
    
    engine = create_engine("...")
    
    
    @event.listens_for(engine, "connect")
    def connect(dbapi_connection, connection_record):
        connection_record.info["pid"] = os.getpid()
    
    
    @event.listens_for(engine, "checkout")
    def checkout(dbapi_connection, connection_record, connection_proxy):
        pid = os.getpid()
        if connection_record.info["pid"] != pid:
            connection_record.dbapi_connection = connection_proxy.dbapi_connection = None
            raise exc.DisconnectionError(
                "Connection record belongs to pid %s, "
                "attempting to check out in pid %s" % (connection_record.info["pid"], pid)
            )

    Above, we use an approach similar to that described in 断开连接处理 - 悲观 to treat a DBAPI connection that originated in a different parent process as an “invalid” connection, coercing the pool to recycle the connection record to make a new connection.

The above strategies will accommodate the case of an Engine being shared among processes. The above steps alone are not sufficient for the case of sharing a specific Connection over a process boundary; prefer to keep the scope of a particular Connection local to a single process (and thread). It’s additionally not supported to share any kind of ongoing transactional state directly across a process boundary, such as an ORM Session object that’s begun a transaction and references active Connection instances; again prefer to create new Session objects in new processes.

直接使用连接池实例

Using a pool instance directly

连接池实现可以在不使用 Engine 的情况下直接使用。这种方式适用于只希望使用连接池行为,而不需要 SQLAlchemy 其他功能的应用程序。

下面的示例中,通过 create_pool_from_url() 获取了 MySQLdb 方言的默认连接池:

from sqlalchemy import create_pool_from_url

my_pool = create_pool_from_url(
    "mysql+mysqldb://", max_overflow=5, pool_size=5, pre_ping=True
)

con = my_pool.connect()
# 使用连接
...
# 然后关闭它
con.close()

如果未显式指定要创建的连接池类型,将使用该方言的默认连接池类型。若希望明确指定连接池类型,可使用 poolclass 参数,如下示例所示:

from sqlalchemy import create_pool_from_url
from sqlalchemy import NullPool

my_pool = create_pool_from_url("mysql+mysqldb://", poolclass=NullPool)

A pool implementation can be used directly without an engine. This could be used in applications that just wish to use the pool behavior without all other SQLAlchemy features. In the example below the default pool for the MySQLdb dialect is obtained using create_pool_from_url():

from sqlalchemy import create_pool_from_url

my_pool = create_pool_from_url(
    "mysql+mysqldb://", max_overflow=5, pool_size=5, pre_ping=True
)

con = my_pool.connect()
# use the connection
...
# then close it
con.close()

If the type of pool to create is not specified, the default one for the dialect will be used. To specify it directly the poolclass argument can be used, like in the following example:

from sqlalchemy import create_pool_from_url
from sqlalchemy import NullPool

my_pool = create_pool_from_url("mysql+mysqldb://", poolclass=NullPool)

API 文档 - 可用的连接池实现

API Documentation - Available Pool Implementations

Object Name Description

_ConnectionFairy

Proxies a DBAPI connection and provides return-on-dereference support.

_ConnectionRecord

Maintains a position in a connection pool which references a pooled connection.

AssertionPool

A Pool that allows at most one checked out connection at any given time.

AsyncAdaptedQueuePool

An asyncio-compatible version of QueuePool.

ConnectionPoolEntry

Interface for the object that maintains an individual database connection on behalf of a Pool instance.

ManagesConnection

Common base for the two connection-management interfaces PoolProxiedConnection and ConnectionPoolEntry.

NullPool

A Pool which does not pool connections.

Pool

Abstract base class for connection pools.

PoolProxiedConnection

A connection-like adapter for a PEP 249 DBAPI connection, which includes additional methods specific to the Pool implementation.

QueuePool

A Pool that imposes a limit on the number of open connections.

SingletonThreadPool

A Pool that maintains one connection per thread.

StaticPool

A Pool of exactly one connection, used for all requests.

class sqlalchemy.pool.Pool

Abstract base class for connection pools.

Class signature

class sqlalchemy.pool.Pool (sqlalchemy.log.Identified, sqlalchemy.event.registry.EventTarget)

method sqlalchemy.pool.Pool.__init__(creator: _CreatorFnType | _CreatorWRecFnType, recycle: int = -1, echo: log._EchoFlagType = None, logging_name: str | None = None, reset_on_return: _ResetStyleArgType = True, events: List[Tuple[_ListenerFnType, str]] | None = None, dialect: _ConnDialect | Dialect | None = None, pre_ping: bool = False, _dispatch: _DispatchCommon[Pool] | None = None)

Construct a Pool.

参数:
  • creator – a callable function that returns a DB-API connection object. The function will be called with parameters.

  • recycle – If set to a value other than -1, number of seconds between connection recycling, which means upon checkout, if this timeout is surpassed the connection will be closed and replaced with a newly opened connection. Defaults to -1.

  • logging_name – String identifier which will be used within the “name” field of logging records generated within the “sqlalchemy.pool” logger. Defaults to a hexstring of the object’s id.

  • echo

    if True, the connection pool will log informational output such as when connections are invalidated as well as when connections are recycled to the default log handler, which defaults to sys.stdout for output.. If set to the string "debug", the logging will include pool checkouts and checkins.

    The Pool.echo parameter can also be set from the create_engine() call by using the create_engine.echo_pool parameter.

    参见

    配置日志记录 - further detail on how to configure logging.

  • reset_on_return

    Determine steps to take on connections as they are returned to the pool, which were not otherwise handled by a Connection. Available from create_engine() via the create_engine.pool_reset_on_return parameter.

    Pool.reset_on_return can have any of these values:

    • "rollback" - call rollback() on the connection, to release locks and transaction resources. This is the default value. The vast majority of use cases should leave this value set.

    • "commit" - call commit() on the connection, to release locks and transaction resources. A commit here may be desirable for databases that cache query plans if a commit is emitted, such as Microsoft SQL Server. However, this value is more dangerous than ‘rollback’ because any data changes present on the transaction are committed unconditionally.

    • None - don’t do anything on the connection. This setting may be appropriate if the database / DBAPI works in pure “autocommit” mode at all times, or if a custom reset handler is established using the PoolEvents.reset() event handler.

    • True - same as ‘rollback’, this is here for backwards compatibility.

    • False - same as None, this is here for backwards compatibility.

    For further customization of reset on return, the PoolEvents.reset() event hook may be used which can perform any connection activity desired on reset.

  • events – a list of 2-tuples, each of the form (callable, target) which will be passed to listen() upon construction. Provided here so that event listeners can be assigned via create_engine() before dialect-level listeners are applied.

  • dialect – a Dialect that will handle the job of calling rollback(), close(), or commit() on DBAPI connections. If omitted, a built-in “stub” dialect is used. Applications that make use of create_engine() should not use this parameter as it is handled by the engine creation strategy.

  • pre_ping – if True, the pool will emit a “ping” (typically “SELECT 1”, but is dialect-specific) on the connection upon checkout, to test if the connection is alive or not. If not, the connection is transparently re-connected and upon success, all other pooled connections established prior to that timestamp are invalidated. Requires that a dialect is passed as well to interpret the disconnection error.

method sqlalchemy.pool.Pool.connect() PoolProxiedConnection

Return a DBAPI connection from the pool.

The connection is instrumented such that when its close() method is called, the connection will be returned to the pool.

method sqlalchemy.pool.Pool.dispose() None

Dispose of this pool.

This method leaves the possibility of checked-out connections remaining open, as it only affects connections that are idle in the pool.

method sqlalchemy.pool.Pool.recreate() Pool

Return a new Pool, of the same class as this one and configured with identical creation arguments.

This method is used in conjunction with dispose() to close out an entire Pool and create a new one in its place.

method sqlalchemy.pool.Pool.status() str

Returns a brief description of the state of this pool.

class sqlalchemy.pool.QueuePool

A Pool that imposes a limit on the number of open connections.

QueuePool is the default pooling implementation used for all Engine objects other than SQLite with a :memory: database.

The QueuePool class is not compatible with asyncio and create_async_engine(). The AsyncAdaptedQueuePool class is used automatically when using create_async_engine(), if no other kind of pool is specified.

method sqlalchemy.pool.QueuePool.__init__(creator: _CreatorFnType | _CreatorWRecFnType, pool_size: int = 5, max_overflow: int = 10, timeout: float = 30.0, use_lifo: bool = False, **kw: Any)

Construct a QueuePool.

参数:
  • creator – a callable function that returns a DB-API connection object, same as that of Pool.creator.

  • pool_size – The size of the pool to be maintained, defaults to 5. This is the largest number of connections that will be kept persistently in the pool. Note that the pool begins with no connections; once this number of connections is requested, that number of connections will remain. pool_size can be set to 0 to indicate no size limit; to disable pooling, use a NullPool instead.

  • max_overflow – The maximum overflow size of the pool. When the number of checked-out connections reaches the size set in pool_size, additional connections will be returned up to this limit. When those additional connections are returned to the pool, they are disconnected and discarded. It follows then that the total number of simultaneous connections the pool will allow is pool_size + max_overflow, and the total number of “sleeping” connections the pool will allow is pool_size. max_overflow can be set to -1 to indicate no overflow limit; no limit will be placed on the total number of concurrent connections. Defaults to 10.

  • timeout – The number of seconds to wait before giving up on returning a connection. Defaults to 30.0. This can be a float but is subject to the limitations of Python time functions which may not be reliable in the tens of milliseconds.

  • use_lifo

    use LIFO (last-in-first-out) when retrieving connections instead of FIFO (first-in-first-out). Using LIFO, a server-side timeout scheme can reduce the number of connections used during non-peak periods of use. When planning for server-side timeouts, ensure that a recycle or pre-ping strategy is in use to gracefully handle stale connections.

  • **kw – Other keyword arguments including Pool.recycle, Pool.echo, Pool.reset_on_return and others are passed to the Pool constructor.

method sqlalchemy.pool.QueuePool.dispose() None

Dispose of this pool.

This method leaves the possibility of checked-out connections remaining open, as it only affects connections that are idle in the pool.

method sqlalchemy.pool.QueuePool.recreate() QueuePool

Return a new Pool, of the same class as this one and configured with identical creation arguments.

This method is used in conjunction with dispose() to close out an entire Pool and create a new one in its place.

method sqlalchemy.pool.QueuePool.status() str

Returns a brief description of the state of this pool.

class sqlalchemy.pool.AsyncAdaptedQueuePool

An asyncio-compatible version of QueuePool.

This pool is used by default when using AsyncEngine engines that were generated from create_async_engine(). It uses an asyncio-compatible queue implementation that does not use threading.Lock.

The arguments and operation of AsyncAdaptedQueuePool are otherwise identical to that of QueuePool.

class sqlalchemy.pool.SingletonThreadPool

A Pool that maintains one connection per thread.

Maintains one connection per each thread, never moving a connection to a thread other than the one which it was created in.

警告

the SingletonThreadPool will call .close() on arbitrary connections that exist beyond the size setting of pool_size, e.g. if more unique thread identities than what pool_size states are used. This cleanup is non-deterministic and not sensitive to whether or not the connections linked to those thread identities are currently in use.

SingletonThreadPool may be improved in a future release, however in its current status it is generally used only for test scenarios using a SQLite :memory: database and is not recommended for production use.

The SingletonThreadPool class is not compatible with asyncio and create_async_engine().

Options are the same as those of Pool, as well as:

参数:

pool_size – The number of threads in which to maintain connections at once. Defaults to five.

SingletonThreadPool is used by the SQLite dialect automatically when a memory-based database is used. See SQLite.

method sqlalchemy.pool.SingletonThreadPool.connect() PoolProxiedConnection

Return a DBAPI connection from the pool.

The connection is instrumented such that when its close() method is called, the connection will be returned to the pool.

method sqlalchemy.pool.SingletonThreadPool.dispose() None

Dispose of this pool.

method sqlalchemy.pool.SingletonThreadPool.recreate() SingletonThreadPool

Return a new Pool, of the same class as this one and configured with identical creation arguments.

This method is used in conjunction with dispose() to close out an entire Pool and create a new one in its place.

method sqlalchemy.pool.SingletonThreadPool.status() str

Returns a brief description of the state of this pool.

class sqlalchemy.pool.AssertionPool

A Pool that allows at most one checked out connection at any given time.

This will raise an exception if more than one connection is checked out at a time. Useful for debugging code that is using more connections than desired.

The AssertionPool class is compatible with asyncio and create_async_engine().

method sqlalchemy.pool.AssertionPool.dispose() None

Dispose of this pool.

This method leaves the possibility of checked-out connections remaining open, as it only affects connections that are idle in the pool.

method sqlalchemy.pool.AssertionPool.recreate() AssertionPool

Return a new Pool, of the same class as this one and configured with identical creation arguments.

This method is used in conjunction with dispose() to close out an entire Pool and create a new one in its place.

method sqlalchemy.pool.AssertionPool.status() str

Returns a brief description of the state of this pool.

class sqlalchemy.pool.NullPool

A Pool which does not pool connections.

Instead it literally opens and closes the underlying DB-API connection per each connection open/close.

Reconnect-related functions such as recycle and connection invalidation are not supported by this Pool implementation, since no connections are held persistently.

The NullPool class is compatible with asyncio and create_async_engine().

method sqlalchemy.pool.NullPool.dispose() None

Dispose of this pool.

This method leaves the possibility of checked-out connections remaining open, as it only affects connections that are idle in the pool.

method sqlalchemy.pool.NullPool.recreate() NullPool

Return a new Pool, of the same class as this one and configured with identical creation arguments.

This method is used in conjunction with dispose() to close out an entire Pool and create a new one in its place.

method sqlalchemy.pool.NullPool.status() str

Returns a brief description of the state of this pool.

class sqlalchemy.pool.StaticPool

A Pool of exactly one connection, used for all requests.

Reconnect-related functions such as recycle and connection invalidation (which is also used to support auto-reconnect) are only partially supported right now and may not yield good results.

The StaticPool class is compatible with asyncio and create_async_engine().

method sqlalchemy.pool.StaticPool.dispose() None

Dispose of this pool.

This method leaves the possibility of checked-out connections remaining open, as it only affects connections that are idle in the pool.

method sqlalchemy.pool.StaticPool.recreate() StaticPool

Return a new Pool, of the same class as this one and configured with identical creation arguments.

This method is used in conjunction with dispose() to close out an entire Pool and create a new one in its place.

method sqlalchemy.pool.StaticPool.status() str

Returns a brief description of the state of this pool.

class sqlalchemy.pool.ManagesConnection

Common base for the two connection-management interfaces PoolProxiedConnection and ConnectionPoolEntry.

These two objects are typically exposed in the public facing API via the connection pool event hooks, documented at PoolEvents.

在 2.0 版本加入.

attribute sqlalchemy.pool.ManagesConnection.dbapi_connection: DBAPIConnection | None

A reference to the actual DBAPI connection being tracked.

This is a PEP 249-compliant object that for traditional sync-style dialects is provided by the third-party DBAPI implementation in use. For asyncio dialects, the implementation is typically an adapter object provided by the SQLAlchemy dialect itself; the underlying asyncio object is available via the ManagesConnection.driver_connection attribute.

SQLAlchemy’s interface for the DBAPI connection is based on the DBAPIConnection protocol object

attribute sqlalchemy.pool.ManagesConnection.driver_connection: Any | None

The “driver level” connection object as used by the Python DBAPI or database driver.

For traditional PEP 249 DBAPI implementations, this object will be the same object as that of ManagesConnection.dbapi_connection. For an asyncio database driver, this will be the ultimate “connection” object used by that driver, such as the asyncpg.Connection object which will not have standard pep-249 methods.

在 1.4.24 版本加入.

attribute sqlalchemy.pool.ManagesConnection.info

Info dictionary associated with the underlying DBAPI connection referred to by this ManagesConnection instance, allowing user-defined data to be associated with the connection.

The data in this dictionary is persistent for the lifespan of the DBAPI connection itself, including across pool checkins and checkouts. When the connection is invalidated and replaced with a new one, this dictionary is cleared.

For a PoolProxiedConnection instance that’s not associated with a ConnectionPoolEntry, such as if it were detached, the attribute returns a dictionary that is local to that ConnectionPoolEntry. Therefore the ManagesConnection.info attribute will always provide a Python dictionary.

method sqlalchemy.pool.ManagesConnection.invalidate(e: BaseException | None = None, soft: bool = False) None

Mark the managed connection as invalidated.

参数:
  • e – an exception object indicating a reason for the invalidation.

  • soft – if True, the connection isn’t closed; instead, this connection will be recycled on next checkout.

attribute sqlalchemy.pool.ManagesConnection.record_info

Persistent info dictionary associated with this ManagesConnection.

Unlike the ManagesConnection.info dictionary, the lifespan of this dictionary is that of the ConnectionPoolEntry which owns it; therefore this dictionary will persist across reconnects and connection invalidation for a particular entry in the connection pool.

For a PoolProxiedConnection instance that’s not associated with a ConnectionPoolEntry, such as if it were detached, the attribute returns None. Contrast to the ManagesConnection.info dictionary which is never None.

class sqlalchemy.pool.ConnectionPoolEntry

Interface for the object that maintains an individual database connection on behalf of a Pool instance.

The ConnectionPoolEntry object represents the long term maintainance of a particular connection for a pool, including expiring or invalidating that connection to have it replaced with a new one, which will continue to be maintained by that same ConnectionPoolEntry instance. Compared to PoolProxiedConnection, which is the short-term, per-checkout connection manager, this object lasts for the lifespan of a particular “slot” within a connection pool.

The ConnectionPoolEntry object is mostly visible to public-facing API code when it is delivered to connection pool event hooks, such as PoolEvents.connect() and PoolEvents.checkout().

在 2.0 版本加入: ConnectionPoolEntry provides the public facing interface for the _ConnectionRecord internal class.

method sqlalchemy.pool.ConnectionPoolEntry.close() None

Close the DBAPI connection managed by this connection pool entry.

attribute sqlalchemy.pool.ConnectionPoolEntry.dbapi_connection: DBAPIConnection | None

A reference to the actual DBAPI connection being tracked.

This is a PEP 249-compliant object that for traditional sync-style dialects is provided by the third-party DBAPI implementation in use. For asyncio dialects, the implementation is typically an adapter object provided by the SQLAlchemy dialect itself; the underlying asyncio object is available via the ManagesConnection.driver_connection attribute.

SQLAlchemy’s interface for the DBAPI connection is based on the DBAPIConnection protocol object

attribute sqlalchemy.pool.ConnectionPoolEntry.driver_connection: Any | None

The “driver level” connection object as used by the Python DBAPI or database driver.

For traditional PEP 249 DBAPI implementations, this object will be the same object as that of ManagesConnection.dbapi_connection. For an asyncio database driver, this will be the ultimate “connection” object used by that driver, such as the asyncpg.Connection object which will not have standard pep-249 methods.

在 1.4.24 版本加入.

attribute sqlalchemy.pool.ConnectionPoolEntry.in_use

Return True the connection is currently checked out

attribute sqlalchemy.pool.ConnectionPoolEntry.info

inherited from the ManagesConnection.info attribute of ManagesConnection

Info dictionary associated with the underlying DBAPI connection referred to by this ManagesConnection instance, allowing user-defined data to be associated with the connection.

The data in this dictionary is persistent for the lifespan of the DBAPI connection itself, including across pool checkins and checkouts. When the connection is invalidated and replaced with a new one, this dictionary is cleared.

For a PoolProxiedConnection instance that’s not associated with a ConnectionPoolEntry, such as if it were detached, the attribute returns a dictionary that is local to that ConnectionPoolEntry. Therefore the ManagesConnection.info attribute will always provide a Python dictionary.

method sqlalchemy.pool.ConnectionPoolEntry.invalidate(e: BaseException | None = None, soft: bool = False) None

inherited from the ManagesConnection.invalidate() method of ManagesConnection

Mark the managed connection as invalidated.

参数:
  • e – an exception object indicating a reason for the invalidation.

  • soft – if True, the connection isn’t closed; instead, this connection will be recycled on next checkout.

attribute sqlalchemy.pool.ConnectionPoolEntry.record_info

inherited from the ManagesConnection.record_info attribute of ManagesConnection

Persistent info dictionary associated with this ManagesConnection.

Unlike the ManagesConnection.info dictionary, the lifespan of this dictionary is that of the ConnectionPoolEntry which owns it; therefore this dictionary will persist across reconnects and connection invalidation for a particular entry in the connection pool.

For a PoolProxiedConnection instance that’s not associated with a ConnectionPoolEntry, such as if it were detached, the attribute returns None. Contrast to the ManagesConnection.info dictionary which is never None.

class sqlalchemy.pool.PoolProxiedConnection

A connection-like adapter for a PEP 249 DBAPI connection, which includes additional methods specific to the Pool implementation.

PoolProxiedConnection is the public-facing interface for the internal _ConnectionFairy implementation object; users familiar with _ConnectionFairy can consider this object to be equivalent.

在 2.0 版本加入: PoolProxiedConnection provides the public- facing interface for the _ConnectionFairy internal class.

method sqlalchemy.pool.PoolProxiedConnection.close() None

Release this connection back to the pool.

The PoolProxiedConnection.close() method shadows the PEP 249 .close() method, altering its behavior to instead release the proxied connection back to the connection pool.

Upon release to the pool, whether the connection stays “opened” and pooled in the Python process, versus actually closed out and removed from the Python process, is based on the pool implementation in use and its configuration and current state.

attribute sqlalchemy.pool.PoolProxiedConnection.dbapi_connection: DBAPIConnection | None

A reference to the actual DBAPI connection being tracked.

This is a PEP 249-compliant object that for traditional sync-style dialects is provided by the third-party DBAPI implementation in use. For asyncio dialects, the implementation is typically an adapter object provided by the SQLAlchemy dialect itself; the underlying asyncio object is available via the ManagesConnection.driver_connection attribute.

SQLAlchemy’s interface for the DBAPI connection is based on the DBAPIConnection protocol object

method sqlalchemy.pool.PoolProxiedConnection.detach() None

Separate this connection from its Pool.

This means that the connection will no longer be returned to the pool when closed, and will instead be literally closed. The associated ConnectionPoolEntry is de-associated from this DBAPI connection.

Note that any overall connection limiting constraints imposed by a Pool implementation may be violated after a detach, as the detached connection is removed from the pool’s knowledge and control.

attribute sqlalchemy.pool.PoolProxiedConnection.driver_connection: Any | None

The “driver level” connection object as used by the Python DBAPI or database driver.

For traditional PEP 249 DBAPI implementations, this object will be the same object as that of ManagesConnection.dbapi_connection. For an asyncio database driver, this will be the ultimate “connection” object used by that driver, such as the asyncpg.Connection object which will not have standard pep-249 methods.

在 1.4.24 版本加入.

attribute sqlalchemy.pool.PoolProxiedConnection.info

inherited from the ManagesConnection.info attribute of ManagesConnection

Info dictionary associated with the underlying DBAPI connection referred to by this ManagesConnection instance, allowing user-defined data to be associated with the connection.

The data in this dictionary is persistent for the lifespan of the DBAPI connection itself, including across pool checkins and checkouts. When the connection is invalidated and replaced with a new one, this dictionary is cleared.

For a PoolProxiedConnection instance that’s not associated with a ConnectionPoolEntry, such as if it were detached, the attribute returns a dictionary that is local to that ConnectionPoolEntry. Therefore the ManagesConnection.info attribute will always provide a Python dictionary.

method sqlalchemy.pool.PoolProxiedConnection.invalidate(e: BaseException | None = None, soft: bool = False) None

inherited from the ManagesConnection.invalidate() method of ManagesConnection

Mark the managed connection as invalidated.

参数:
  • e – an exception object indicating a reason for the invalidation.

  • soft – if True, the connection isn’t closed; instead, this connection will be recycled on next checkout.

attribute sqlalchemy.pool.PoolProxiedConnection.is_detached

Return True if this PoolProxiedConnection is detached from its pool.

attribute sqlalchemy.pool.PoolProxiedConnection.is_valid

Return True if this PoolProxiedConnection still refers to an active DBAPI connection.

attribute sqlalchemy.pool.PoolProxiedConnection.record_info

inherited from the ManagesConnection.record_info attribute of ManagesConnection

Persistent info dictionary associated with this ManagesConnection.

Unlike the ManagesConnection.info dictionary, the lifespan of this dictionary is that of the ConnectionPoolEntry which owns it; therefore this dictionary will persist across reconnects and connection invalidation for a particular entry in the connection pool.

For a PoolProxiedConnection instance that’s not associated with a ConnectionPoolEntry, such as if it were detached, the attribute returns None. Contrast to the ManagesConnection.info dictionary which is never None.

class sqlalchemy.pool._ConnectionFairy

Proxies a DBAPI connection and provides return-on-dereference support.

This is an internal object used by the Pool implementation to provide context management to a DBAPI connection delivered by that Pool. The public facing interface for this class is described by the PoolProxiedConnection class. See that class for public API details.

The name “fairy” is inspired by the fact that the _ConnectionFairy object’s lifespan is transitory, as it lasts only for the length of a specific DBAPI connection being checked out from the pool, and additionally that as a transparent proxy, it is mostly invisible.

Class signature

class sqlalchemy.pool._ConnectionFairy (sqlalchemy.pool.base.PoolProxiedConnection)

class sqlalchemy.pool._ConnectionRecord

Maintains a position in a connection pool which references a pooled connection.

This is an internal object used by the Pool implementation to provide context management to a DBAPI connection maintained by that Pool. The public facing interface for this class is described by the ConnectionPoolEntry class. See that class for public API details.

Class signature

class sqlalchemy.pool._ConnectionRecord (sqlalchemy.pool.base.ConnectionPoolEntry)