数据库
Starlette 并不严格依赖于任何特定的数据库实现。
您可以将它与异步 ORM(例如 GINO)一起使用,或者使用常规的非异步端点,并与 SQLAlchemy 集成。
在本指南中,我们将展示如何与 databases
包 集成,databases
包提供了 SQLAlchemy 核心支持,能够与多种不同的数据库驱动程序兼容。
以下是一个完整的示例,包含表定义、配置 database.Database
实例,以及与数据库交互的几个端点。
.env
DATABASE_URL=sqlite:///test.db
app.py
import contextlib
import databases
import sqlalchemy
from starlette.applications import Starlette
from starlette.config import Config
from starlette.responses import JSONResponse
from starlette.routing import Route
# 从环境变量或 '.env' 文件中加载配置。
config = Config('.env')
DATABASE_URL = config('DATABASE_URL')
# 数据库表定义。
metadata = sqlalchemy.MetaData()
notes = sqlalchemy.Table(
"notes",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
database = databases.Database(DATABASE_URL)
@contextlib.asynccontextmanager
async def lifespan(app):
await database.connect()
yield
await database.disconnect()
# 主要应用代码。
async def list_notes(request):
query = notes.select()
results = await database.fetch_all(query)
content = [
{
"text": result["text"],
"completed": result["completed"]
}
for result in results
]
return JSONResponse(content)
async def add_note(request):
data = await request.json()
query = notes.insert().values(
text=data["text"],
completed=data["completed"]
)
await database.execute(query)
return JSONResponse({
"text": data["text"],
"completed": data["completed"]
})
routes = [
Route("/notes", endpoint=list_notes, methods=["GET"]),
Route("/notes", endpoint=add_note, methods=["POST"]),
]
app = Starlette(
routes=routes,
lifespan=lifespan,
)
最后,您需要创建数据库表。推荐使用 Alembic,关于它的内容我们会在 迁移 部分做简要介绍。
查询
查询可以通过 SQLAlchemy Core 查询 来执行。
以下方法受支持:
rows = await database.fetch_all(query)
row = await database.fetch_one(query)
async for row in database.iterate(query)
await database.execute(query)
await database.execute_many(query)
事务
数据库事务可以通过装饰器、上下文管理器或低级 API 使用。
使用装饰器在端点上:
@database.transaction()
async def populate_note(request):
# 此数据库插入操作在事务中执行。
# 它将被 `RuntimeError` 回滚。
query = notes.insert().values(text="你看不到我", completed=True)
await database.execute(query)
raise RuntimeError()
使用上下文管理器:
async def populate_note(request):
async with database.transaction():
# 此数据库插入操作在事务中执行。
# 它将被 `RuntimeError` 回滚。
query = notes.insert().values(text="你看不到我", completed=True)
await request.database.execute(query)
raise RuntimeError()
使用低级 API:
async def populate_note(request):
transaction = await database.transaction()
try:
# 此数据库插入操作在事务中执行。
# 它将被 `RuntimeError` 回滚。
query = notes.insert().values(text="你看不到我", completed=True)
await database.execute(query)
raise RuntimeError()
except:
await transaction.rollback()
raise
else:
await transaction.commit()
测试隔离
在对使用数据库的服务进行测试时,我们需要确保一些事项。我们的要求应当是:
- 使用一个独立的数据库进行测试。
- 每次运行测试时创建一个新的测试数据库。
- 确保每个测试用例之间数据库的状态是隔离的。
以下是如何组织应用程序和测试,以满足这些要求:
from starlette.applications import Starlette
from starlette.config import Config
import databases
config = Config(".env")
TESTING = config('TESTING', cast=bool, default=False)
DATABASE_URL = config('DATABASE_URL', cast=databases.DatabaseURL)
TEST_DATABASE_URL = DATABASE_URL.replace(database='test_' + DATABASE_URL.database)
# 在测试期间使用 'force_rollback',确保在每个测试用例之间不持久化数据库更改。
if TESTING:
database = databases.Database(TEST_DATABASE_URL, force_rollback=True)
else:
database = databases.Database(DATABASE_URL)
我们仍然需要在测试运行期间设置 TESTING
,并设置测试数据库。假设我们使用 py.test
,以下是我们的 conftest.py
可能的结构:
import pytest
from starlette.config import environ
from starlette.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy_utils import database_exists, create_database, drop_database
# 这会设置 `os.environ`,但提供了一些额外的保护。
# 如果我们将其放在应用程序导入之后,它会引发错误,
# 提示 'TESTING' 已经从环境中读取。
environ['TESTING'] = 'True'
import app
@pytest.fixture(scope="session", autouse=True)
def create_test_database():
"""
每个测试用例创建一个干净的数据库。
为了安全,如果数据库已经存在,我们应当中止测试。
我们在这里使用 `sqlalchemy_utils` 包,提供一些帮助函数
来一致地创建和删除数据库。
"""
url = str(app.TEST_DATABASE_URL)
engine = create_engine(url)
assert not database_exists(url), '测试数据库已经存在,正在中止测试。'
create_database(url) # 创建测试数据库。
metadata.create_all(engine) # 创建表格。
yield # 运行测试。
drop_database(url) # 删除测试数据库。
@pytest.fixture()
def client():
"""
当在测试用例中使用 'client' fixture 时,我们将确保每个测试用例之间
完成数据库回滚:
def test_homepage(client):
url = app.url_path_for('homepage')
response = client.get(url)
assert response.status_code == 200
"""
with TestClient(app) as client:
yield client
数据库迁移
几乎可以肯定,您需要使用数据库迁移来管理数据库的增量更改。为此,我们强烈推荐使用 Alembic,它是由 SQLAlchemy 的作者编写的。
$ pip install alembic
$ alembic init migrations
现在,您需要设置 Alembic 以引用配置的 DATABASE_URL
并使用您的表格元数据。
在 alembic.ini
中,删除以下行:
sqlalchemy.url = driver://user:pass@localhost/dbname
在 migrations/env.py
中,您需要设置 'sqlalchemy.url'
配置项和 target_metadata
变量。它应该像这样:
# Alembic 配置对象
config = context.config
# 配置 Alembic 使用我们的 DATABASE_URL 和表格定义...
import app
config.set_main_option('sqlalchemy.url', str(app.DATABASE_URL))
target_metadata = app.metadata
...
然后,使用我们上面的笔记示例,创建初始修订版本:
alembic revision -m "Create notes table"
并在新文件(在 migrations/versions
目录下)中填充必要的指令:
def upgrade():
op.create_table(
'notes',
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("text", sqlalchemy.String),
sqlalchemy.Column("completed", sqlalchemy.Boolean),
)
def downgrade():
op.drop_table('notes')
然后运行您的第一次迁移。我们的笔记应用现在可以运行了!
alembic upgrade head
在测试期间运行迁移
确保每次创建测试数据库时都运行数据库迁移是一个良好的做法。这有助于捕捉迁移脚本中的任何问题,并确保测试在与生产数据库一致的数据库状态下运行。
我们可以稍微调整 create_test_database
fixture:
from alembic import command
from alembic.config import Config
import app
...
@pytest.fixture(scope="session", autouse=True)
def create_test_database():
url = str(app.DATABASE_URL)
engine = create_engine(url)
assert not database_exists(url), '测试数据库已经存在,正在中止测试。'
create_database(url) # 创建测试数据库。
config = Config("alembic.ini") # 运行迁移。
command.upgrade(config, "head")
yield # 运行测试。
drop_database(url) # 删除测试数据库。