[docs]classConnectionHandler:_conn_storage:ContextVar[Dict[str,"BaseDBAsyncClient"]]=contextvars.ContextVar("_conn_storage",default={})def__init__(self)->None:"""Unified connection management interface."""self._db_config:Optional["DBConfigType"]=Noneself._create_db:bool=Falseasyncdef_init(self,db_config:"DBConfigType",create_db:bool):ifself._db_configisNone:self._db_config=db_configelse:self._db_config.update(db_config)self._create_db=create_dbawaitself._init_connections()@propertydefdb_config(self)->"DBConfigType":""" Return the DB config. This is the same config passed to the :meth:`Tortoise.init<tortoise.Tortoise.init>` method while initialization. :raises ConfigurationError: If this property is accessed before calling the :meth:`Tortoise.init<tortoise.Tortoise.init>` method. """ifself._db_configisNone:raiseConfigurationError("DB configuration not initialised. Make sure to call ""Tortoise.init with a valid configuration before attempting ""to create connections.")returnself._db_configdef_get_storage(self)->Dict[str,"BaseDBAsyncClient"]:returnself._conn_storage.get()def_set_storage(self,new_storage:Dict[str,"BaseDBAsyncClient"])->contextvars.Token:# Should be used only for testing purposes.returnself._conn_storage.set(new_storage)def_copy_storage(self)->Dict[str,"BaseDBAsyncClient"]:returncopy(self._get_storage())def_clear_storage(self)->None:self._get_storage().clear()def_discover_client_class(self,engine:str)->Type["BaseDBAsyncClient"]:# Let exception bubble up for transparencyengine_module=importlib.import_module(engine)try:client_class=engine_module.client_classexceptAttributeError:raiseConfigurationError(f'Backend for engine "{engine}" does not implement db client')returnclient_classdef_get_db_info(self,conn_alias:str)->Union[str,Dict]:try:returnself.db_config[conn_alias]exceptKeyError:raiseConfigurationError(f"Unable to get db settings for alias '{conn_alias}'. Please "f"check if the config dict contains this alias and try again")asyncdef_init_connections(self)->None:foraliasinself.db_config:connection:"BaseDBAsyncClient"=self.get(alias)ifself._create_db:awaitconnection.db_create()def_create_connection(self,conn_alias:str)->"BaseDBAsyncClient":db_info=self._get_db_info(conn_alias)ifisinstance(db_info,str):db_info=expand_db_url(db_info)client_class=self._discover_client_class(db_info.get("engine",""))db_params=db_info["credentials"].copy()db_params.update({"connection_name":conn_alias})connection:"BaseDBAsyncClient"=client_class(**db_params)returnconnection
[docs]defget(self,conn_alias:str)->"BaseDBAsyncClient":""" Return the connection object for the given alias, creating it if needed. Used for accessing the low-level connection object (:class:`BaseDBAsyncClient<tortoise.backends.base.client.BaseDBAsyncClient>`) for the given alias. :param conn_alias: The alias for which the connection has to be fetched :raises ConfigurationError: If the connection alias does not exist. """storage:Dict[str,"BaseDBAsyncClient"]=self._get_storage()try:returnstorage[conn_alias]exceptKeyError:connection:BaseDBAsyncClient=self._create_connection(conn_alias)storage[conn_alias]=connectionreturnconnection
[docs]defset(self,conn_alias:str,conn_obj:"BaseDBAsyncClient")->contextvars.Token:""" Sets the given alias to the provided connection object. :param conn_alias: The alias to set the connection for. :param conn_obj: The connection object that needs to be set for this alias. .. note:: This method copies the storage from the `current context`, updates the ``conn_alias`` with the provided ``conn_obj`` and sets the updated storage in a `new context` and therefore returns a ``contextvars.Token`` in order to restore the original context storage. """storage_copy=self._copy_storage()storage_copy[conn_alias]=conn_objreturnself._conn_storage.set(storage_copy)
[docs]defdiscard(self,conn_alias:str)->Optional["BaseDBAsyncClient"]:""" Discards the given alias from the storage in the `current context`. :param conn_alias: The alias for which the connection object should be discarded. .. important:: Make sure to have called ``conn.close()`` for the provided alias before calling this method else there would be a connection leak (dangling connection). """returnself._get_storage().pop(conn_alias,None)
[docs]defreset(self,token:contextvars.Token)->None:""" Reset the underlying storage to the previous context state. Resets the storage state to the `context` associated with the provided token. After resetting storage state, any additional `connections` created in the `old context` are copied into the `current context`. :param token: The token corresponding to the `context` to which the storage state has to be reset. Typically, this token is obtained by calling the :meth:`set<tortoise.connection.ConnectionHandler.set>` method of this class. """current_storage=self._get_storage()self._conn_storage.reset(token)prev_storage=self._get_storage()foralias,connincurrent_storage.items():ifaliasnotinprev_storage:prev_storage[alias]=conn
[docs]defall(self)->List["BaseDBAsyncClient"]:"""Returns a list of connection objects from the storage in the `current context`."""# The reason this method iterates over db_config and not over `storage` directly is# because: assume that someone calls `discard` with a certain alias, and calls this# method subsequently. The alias which just got discarded from the storage would not# appear in the returned list though it exists as part of the `db_config`.return[self.get(alias)foraliasinself.db_config]
[docs]asyncdefclose_all(self,discard:bool=True)->None:""" Closes all connections in the storage in the `current context`. All closed connections will be removed from the storage by default. :param discard: If ``False``, all connection objects are closed but `retained` in the storage. """tasks=[conn.close()forconninself.all()]awaitasyncio.gather(*tasks)ifdiscard:foraliasinself.db_config:self.discard(alias)