SQLAlchemy核心概念与实战
引言
在Python开发中,与数据库的交互是一个非常常见的需求。直接使用各个数据库驱动(如 psycopg2, mysql-connector-python)虽然可行,但存在一些问题:
1. API不统一: 不同的数据库驱动有不同的API,更换数据库意味着需要修改大量代码。
2. SQL注入风险: 需要开发者手动处理SQL参数,稍有不慎就容易引发SQL注入漏洞。
3. 代码可读性与维护性: SQL语句以字符串形式硬编码在代码中,使得业务逻辑与数据访问逻辑耦合,难以维护和阅读。
SQLAlchemy的出现就是为了解决这些问题。它是一个强大而成熟的Python SQL工具包和对象关系映射器(ORM),为应用程序开发者提供了灵活、高效的数据库操作方式。
SQLAlchemy主要提供两个核心组件:
* SQLAlchemy Core: 提供了一套统一的、与具体数据库无关的SQL构建和执行引擎。它使用Python表达式来构建SQL语句,既保留了SQL的灵活性,又避免了直接拼接字符串带来的风险。
* SQLAlchemy ORM: 在Core的基础上,提供了一个更高层次的对象关系映射。它允许开发者将数据库表映射到Python对象(模型),通过操作这些对象来间接操作数据库表,极大地提升了开发效率和代码的可读性。
本文将从SQLAlchemy Core开始,逐步深入到ORM,并通过实战案例来展示其强大功能。
安装
首先,确保你已经安装了SQLAlchemy。如果还没有,可以通过pip进行安装。同时,我们还需要安装对应数据库的驱动程序,这里以PostgreSQL的psycopg2为例。
bash
pip install sqlalchemy
pip install psycopg2-binary
SQLAlchemy Core 实战
SQLAlchemy Core是ORM的基础,它关注于数据库表的结构(Schema)和SQL表达式的生成与执行。
1. 引擎与连接 (Engine and Connection)
Engine是SQLAlchemy与数据库交互的入口点。它负责管理连接池(Connection Pool)和方言(Dialect),让我们不必关心底层DBAPI的细节。
创建一个引擎非常简单,只需调用create_engine并传入一个数据库连接字符串即可。
“`python
from sqlalchemy import create_engine
连接字符串格式: “dialect+driver://username:password@host:port/database”
DATABASE_URL = “postgresql+psycopg2://user:password@localhost:5432/mydatabase”
创建引擎
echo=True会打印出所有执行的SQL语句,方便调试
engine = create_engine(DATABASE_URL, echo=True)
从引擎获取一个连接
使用 with 语句可以确保连接在使用后被正确关闭
with engine.connect() as connection:
print(f”成功连接到数据库: {connection.info}”)
# 在这里可以执行数据库操作
“`
2. 执行原生SQL
虽然SQLAlchemy鼓励使用其表达式语言,但它也完全支持直接执行原生SQL语句。这对于执行一些复杂的、动态生成的或者特定于数据库的查询非常有用。
“`python
from sqlalchemy import text
with engine.connect() as connection:
# 执行一个简单的查询
result = connection.execute(text(“SELECT ‘hello world'”))
print(result.scalar_one()) # scalar_one() 获取第一行第一列的值
# 执行带参数的查询,可以有效防止SQL注入
result = connection.execute(
text("SELECT * FROM users WHERE id = :user_id"),
{"user_id": 1}
)
for row in result:
print(f"用户ID: {row.id}, 用户名: {row.name}")
“`
3. SQL表达式语言 (SQL Expression Language)
这是SQLAlchemy Core最核心的部分。它允许我们使用Python对象和表达式来构建SQL语句,而不是拼接字符串。
3.1 定义表结构 (Table Metadata)
首先,我们需要用Table对象来描述数据库中的表。所有这些Table对象都依附于一个MetaData容器。
“`python
from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
创建一个MetaData对象
metadata_obj = MetaData()
定义’users’表
user_table = Table(
“users”,
metadata_obj,
Column(“id”, Integer, primary_key=True),
Column(“name”, String(30), nullable=False),
Column(“fullname”, String)
)
定义’addresses’表,并设置外键关联到’users’表
address_table = Table(
“addresses”,
metadata_obj,
Column(“id”, Integer, primary_key=True),
Column(“user_id”, ForeignKey(“users.id”), nullable=False),
Column(“email_address”, String, nullable=False)
)
使用MetaData对象一次性创建所有定义的表
metadata_obj.create_all(engine) 会检查表是否存在,不存在则创建
metadata_obj.create_all(engine)
``create_all会发出CREATE TABLE`语句。如果表已经存在,它不会重复创建。
3.2 插入数据 (Insert)
使用表的.insert()方法来创建一个插入语句对象。
“`python
from sqlalchemy import insert
创建一个插入语句
stmt = insert(user_table).values(name=”spongebob”, fullname=”Spongebob Squarepants”)
print(stmt) # 查看生成的SQL
执行语句
with engine.connect() as connection:
result = connection.execute(stmt)
# 插入后,可以通过 result.inserted_primary_key 获取新插入行的主键
print(f”新插入的用户ID: {result.inserted_primary_key}”)
connection.commit() # 提交事务
“`
注意: SQLAlchemy 2.0 风格的操作(如
connection.execute(stmt))会自动开始一个事务。你需要显式调用connection.commit()来提交它,或者在engine.begin()块中执行。
3.3 查询数据 (Select)
select()函数用于创建查询。
“`python
from sqlalchemy import select
创建一个简单的查询
stmt = select(user_table).where(user_table.c.name == “spongebob”)
with engine.connect() as connection:
result = connection.execute(stmt)
# result 是一个迭代器,可以遍历获取所有行
for row in result:
print(f”查询结果: ID={row.id}, Name={row.name}, Fullname={row.fullname}”)
“`
3.4 更新与删除 (Update and Delete)
更新和删除操作与插入类似。
“`python
from sqlalchemy import update, delete
更新
stmt_update = (
update(user_table)
.where(user_table.c.name == “spongebob”)
.values(fullname=”SpongeBob SquarePants, Esq.”)
)
删除
stmt_delete = delete(user_table).where(user_table.c.name == “sandy”)
with engine.begin() as connection: # engine.begin() 会自动提交或回滚事务
connection.execute(stmt_update)
connection.execute(stmt_delete)
“`
3.5 联结查询 (Joins)
SQLAlchemy使得构建JOIN查询变得非常直观。
“`python
select_from()显式指定FROM子句,然后用.join()连接
stmt_join = (
select(user_table.c.name, address_table.c.email_address)
.select_from(user_table)
.join(address_table, user_table.c.id == address_table.c.user_id)
)
with engine.connect() as connection:
result = connection.execute(stmt_join)
for row in result:
print(f”用户名: {row.name}, 邮箱: {row.email_address}”)
``Table
如果对象之间已经通过ForeignKey定义了关系,join()`方法可以自动推断出连接条件。
“`python
效果同上
stmt_join_auto = select(user_table, address_table).join_from(user_table, address_table)
“`
SQLAlchemy ORM 实战
ORM(对象关系映射)是SQLAlchemy最受欢迎的功能。它允许我们用Python类来代表数据库表,将数据库操作转换为对象操作,从而让代码更加面向对象,逻辑更清晰。
1. 声明式映射 (Declarative Mapping)
在现代SQLAlchemy中,我们使用DeclarativeBase来创建一个基类,我们所有的模型(Mapped Classes)都将继承自这个基类。
“`python
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy import String, Integer, ForeignKey
from typing import List, Optional
1. 创建一个声明式基类
class Base(DeclarativeBase):
pass
2. 定义映射类(模型),继承自Base
class User(Base):
tablename = “users” # 关联的数据库表名
# 定义列(字段)
# Mapped 和 mapped_column 提供了详细的类型提示
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30))
fullname: Mapped[Optional[str]]
# 定义关系
# back_populates 与 Address.user 建立双向关系
addresses: Mapped[List["Address"]] = relationship(
back_populates="user", cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"User(id={self.id!r}, name={self.name!r}, fullname={self.fullname!r})"
class Address(Base):
tablename = “addresses”
id: Mapped[int] = mapped_column(primary_key=True)
email_address: Mapped[str]
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
# 定义关系
user: Mapped["User"] = relationship(back_populates="addresses")
def __repr__(self) -> str:
return f"Address(id={self.id!r}, email_address={self.email_address!r})"
使用模型的元数据来创建表
之前在Core部分我们手动创建了metadata_obj,这里ORM会自动管理
Base.metadata.create_all(engine)
``User
这里我们定义了和Address两个模型,并使用relationship建立了它们之间的一对多关系。cascade=”all, delete-orphan”是一个非常重要的设置,它意味着对User对象的操作会级联到其关联的Address`对象上(例如,删除一个User时,其所有Address也会被自动删除)。
2. 会话 (Session)
Session是ORM操作的核心。可以把它看作是一个工作单元(Unit of Work),它跟踪了所有从数据库加载或新创建的对象。所有对这些对象的更改都会被记录下来,并在适当的时候(flush或commit)一次性写入数据库。
“`python
from sqlalchemy.orm import Session
Session的典型用法是配合 with 语句
with Session(engine) as session:
# 在这个代码块中,我们可以使用session对象进行数据库操作
# …
session.commit() # 块结束前提交所有更改
“`
3. ORM-CRUD操作
3.1 创建 (Create)
创建数据就是创建Python对象,并将其添加到session中。
“`python
with Session(engine) as session:
# 创建两个User对象和它们关联的Address对象
sandy = User(name=”sandy”, fullname=”Sandy Cheeks”)
patrick = User(name=”patrick”, fullname=”Patrick Star”)
# 直接在创建User时通过构造函数添加Address
squidward = User(
name="squidward",
fullname="Squidward Tentacles",
addresses=[Address(email_address="[email protected]")],
)
# 也可以在创建后添加
patrick.addresses.append(Address(email_address="[email protected]"))
# 将新创建的对象添加到session中
session.add_all([sandy, patrick, squidward])
# 此时,数据还只是在session的内存中,并未写入数据库
# session.commit()会将所有更改写入数据库并提交事务
session.commit()
“`
3.2 读取 (Read)
读取操作同样通过select()语句完成,但这次我们查询的是模型类而不是Table对象。
“`python
from sqlalchemy import select
with Session(engine) as session:
# 查询User表中name为”sandy”的用户
stmt = select(User).where(User.name == “sandy”)
# session.execute() 返回一个 Result 对象
result = session.execute(stmt)
# 从结果中获取单个对象,如果不存在则为None
sandy = result.scalar_one_or_none()
print(f"查询到的用户: {sandy}")
# 查询所有用户
all_users = session.scalars(select(User)).all()
for user in all_users:
print(f"用户: {user}, 关联的地址: {user.addresses}")
``session.scalars()是一个方便的方法,它直接执行select语句并返回一个迭代器,该迭代器产生的是ORM对象实例,而不是Row`对象。
3.3 更新 (Update)
更新数据只需修改从session中获取的对象的属性,然后在commit时,SQLAlchemy会自动生成UPDATE语句。
“`python
with Session(engine) as session:
# 首先,查询要更新的对象
stmt = select(User).where(User.name == “patrick”)
patrick = session.scalars(stmt).one()
# 修改其属性
patrick.fullname = "Patrick Starfish"
# 添加一个新的地址
patrick.addresses.append(Address(email_address="[email protected]"))
# 当session提交时,SQLAlchemy会检测到patrick对象的更改
# 并自动发出 UPDATE 语句
session.commit()
“`
3.4 删除 (Delete)
使用session.delete()来标记一个对象为待删除。
“`python
with Session(engine) as session:
# 查询要删除的对象
stmt = select(User).where(User.name == “squidward”)
squidward = session.scalars(stmt).one()
# 标记为删除
session.delete(squidward)
# 提交后会执行DELETE语句
# 由于我们之前在User.addresses关系上设置了cascade="all, delete-orphan"
# squidward关联的Address对象也会被一并删除
session.commit()
“`
数据库迁移与Alembic
当你的模型发生变化(例如,增加一个新字段,修改列类型)时,你需要相应地更新数据库的表结构。直接修改代码并重新运行Base.metadata.create_all(engine)是行不通的,因为它不会修改已经存在的表。
手动编写ALTER TABLE语句既繁琐又容易出错。这时,我们就需要一个数据库迁移工具,而Alembic正是SQLAlchemy官方推荐的迁移工具。
Alembic可以比对你的模型定义和当前数据库结构之间的差异,并自动生成迁移脚本。通过版本化的迁移脚本,你可以轻松地将数据库升级到任意版本,或者降级回去。
Alembic的使用是一个独立的主题,这里只做简要介绍。基本流程如下:
1. pip install alembic
2. alembic init alembic – 初始化Alembic环境。
3. 配置alembic.ini文件,指定数据库连接。
4. 修改env.py,让Alembic知道你的模型定义(target_metadata = Base.metadata)。
5. alembic revision --autogenerate -m "Add new column to user table" – 自动创建迁移脚本。
6. alembic upgrade head – 应用迁移,更新数据库到最新版本。
结论:Core vs. ORM,如何选择?
-
SQLAlchemy Core:
- 优点: 更接近原生SQL,性能开销极小,对于复杂的报表、数据分析或需要极致性能的批量操作非常理想。它提供了强大的SQL抽象能力,同时不失灵活性。
- 适用场景: 数据仓库(ETL)作业、数据分析应用、需要手写优化SQL的后台服务。
-
SQLAlchemy ORM:
- 优点: 极大地提升了开发效率,代码更加面向对象和可读,业务逻辑清晰。内置的会话管理(Unit of Work)简化了事务和对象状态的管理。
- 适用场景: 大多数Web应用、API服务等业务逻辑复杂的CRUD密集型应用。
在许多项目中,Core和ORM并不是互斥的,而是可以结合使用。例如,你可以使用ORM来处理大部分业务逻辑,同时在需要性能优化的特定查询或批量数据处理时,回退到Core来编写更高效的语句。
总而言之,SQLAlchemy提供了一个从底层到高层的完整数据库工具集。无论你的需求多么简单或复杂,它都能提供一个优雅、高效且健壮的解决方案。