SQLAlchemy核心概念与实战 – wiki大全

SQLAlchemy核心概念与实战

引言

在Python开发中,与数据库的交互是一个非常常见的需求。直接使用各个数据库驱动(如 psycopg2, mysql-connector-python)虽然可行,但存在一些问题:
1. API不统一: 不同的数据库驱动有不同的API,更换数据库意味着需要修改大量代码。
2. SQL注入风险: 需要开发者手动处理SQL参数,稍有不慎就容易引发SQL注入漏洞。
3. 代码可读性与维护性: SQL语句以字符串形式硬编码在代码中,使得业务逻辑与数据访问逻辑耦合,难以维护和阅读。

SQLAlchemy的出现就是为了解决这些问题。它是一个强大而成熟的Python SQL工具包和对象关系映射器(ORM),为应用程序开发者提供了灵活、高效的数据库操作方式。

SQLAlchemy主要提供两个核心组件:
* SQLAlchemy Core: 提供了一套统一的、与具体数据库无关的SQL构建和执行引擎。它使用Python表达式来构建SQL语句,既保留了SQL的灵活性,又避免了直接拼接字符串带来的风险。
* SQLAlchemy ORM: 在Core的基础上,提供了一个更高层次的对象关系映射。它允许开发者将数据库表映射到Python对象(模型),通过操作这些对象来间接操作数据库表,极大地提升了开发效率和代码的可读性。

本文将从SQLAlchemy Core开始,逐步深入到ORM,并通过实战案例来展示其强大功能。

安装

首先,确保你已经安装了SQLAlchemy。如果还没有,可以通过pip进行安装。同时,我们还需要安装对应数据库的驱动程序,这里以PostgreSQL的psycopg2为例。

bash
pip install sqlalchemy
pip install psycopg2-binary

SQLAlchemy Core 实战

SQLAlchemy Core是ORM的基础,它关注于数据库表的结构(Schema)和SQL表达式的生成与执行。

1. 引擎与连接 (Engine and Connection)

Engine是SQLAlchemy与数据库交互的入口点。它负责管理连接池(Connection Pool)和方言(Dialect),让我们不必关心底层DBAPI的细节。

创建一个引擎非常简单,只需调用create_engine并传入一个数据库连接字符串即可。

“`python
from sqlalchemy import create_engine

连接字符串格式: “dialect+driver://username:password@host:port/database”

DATABASE_URL = “postgresql+psycopg2://user:password@localhost:5432/mydatabase”

创建引擎

echo=True会打印出所有执行的SQL语句,方便调试

engine = create_engine(DATABASE_URL, echo=True)

从引擎获取一个连接

使用 with 语句可以确保连接在使用后被正确关闭

with engine.connect() as connection:
print(f”成功连接到数据库: {connection.info}”)
# 在这里可以执行数据库操作
“`

2. 执行原生SQL

虽然SQLAlchemy鼓励使用其表达式语言,但它也完全支持直接执行原生SQL语句。这对于执行一些复杂的、动态生成的或者特定于数据库的查询非常有用。

“`python
from sqlalchemy import text

with engine.connect() as connection:
# 执行一个简单的查询
result = connection.execute(text(“SELECT ‘hello world'”))
print(result.scalar_one()) # scalar_one() 获取第一行第一列的值

# 执行带参数的查询,可以有效防止SQL注入
result = connection.execute(
    text("SELECT * FROM users WHERE id = :user_id"),
    {"user_id": 1}
)
for row in result:
    print(f"用户ID: {row.id}, 用户名: {row.name}")

“`

3. SQL表达式语言 (SQL Expression Language)

这是SQLAlchemy Core最核心的部分。它允许我们使用Python对象和表达式来构建SQL语句,而不是拼接字符串。

3.1 定义表结构 (Table Metadata)

首先,我们需要用Table对象来描述数据库中的表。所有这些Table对象都依附于一个MetaData容器。

“`python
from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey

创建一个MetaData对象

metadata_obj = MetaData()

定义’users’表

user_table = Table(
“users”,
metadata_obj,
Column(“id”, Integer, primary_key=True),
Column(“name”, String(30), nullable=False),
Column(“fullname”, String)
)

定义’addresses’表,并设置外键关联到’users’表

address_table = Table(
“addresses”,
metadata_obj,
Column(“id”, Integer, primary_key=True),
Column(“user_id”, ForeignKey(“users.id”), nullable=False),
Column(“email_address”, String, nullable=False)
)

使用MetaData对象一次性创建所有定义的表

metadata_obj.create_all(engine) 会检查表是否存在,不存在则创建

metadata_obj.create_all(engine)
``create_all会发出CREATE TABLE`语句。如果表已经存在,它不会重复创建。

3.2 插入数据 (Insert)

使用表的.insert()方法来创建一个插入语句对象。

“`python
from sqlalchemy import insert

创建一个插入语句

stmt = insert(user_table).values(name=”spongebob”, fullname=”Spongebob Squarepants”)
print(stmt) # 查看生成的SQL

执行语句

with engine.connect() as connection:
result = connection.execute(stmt)
# 插入后,可以通过 result.inserted_primary_key 获取新插入行的主键
print(f”新插入的用户ID: {result.inserted_primary_key}”)
connection.commit() # 提交事务
“`

注意: SQLAlchemy 2.0 风格的操作(如connection.execute(stmt))会自动开始一个事务。你需要显式调用connection.commit()来提交它,或者在engine.begin()块中执行。

3.3 查询数据 (Select)

select()函数用于创建查询。

“`python
from sqlalchemy import select

创建一个简单的查询

stmt = select(user_table).where(user_table.c.name == “spongebob”)

with engine.connect() as connection:
result = connection.execute(stmt)
# result 是一个迭代器,可以遍历获取所有行
for row in result:
print(f”查询结果: ID={row.id}, Name={row.name}, Fullname={row.fullname}”)
“`

3.4 更新与删除 (Update and Delete)

更新和删除操作与插入类似。

“`python
from sqlalchemy import update, delete

更新

stmt_update = (
update(user_table)
.where(user_table.c.name == “spongebob”)
.values(fullname=”SpongeBob SquarePants, Esq.”)
)

删除

stmt_delete = delete(user_table).where(user_table.c.name == “sandy”)

with engine.begin() as connection: # engine.begin() 会自动提交或回滚事务
connection.execute(stmt_update)
connection.execute(stmt_delete)
“`

3.5 联结查询 (Joins)

SQLAlchemy使得构建JOIN查询变得非常直观。

“`python

select_from()显式指定FROM子句,然后用.join()连接

stmt_join = (
select(user_table.c.name, address_table.c.email_address)
.select_from(user_table)
.join(address_table, user_table.c.id == address_table.c.user_id)
)

with engine.connect() as connection:
result = connection.execute(stmt_join)
for row in result:
print(f”用户名: {row.name}, 邮箱: {row.email_address}”)
``
如果
Table对象之间已经通过ForeignKey定义了关系,join()`方法可以自动推断出连接条件。

“`python

效果同上

stmt_join_auto = select(user_table, address_table).join_from(user_table, address_table)
“`

SQLAlchemy ORM 实战

ORM(对象关系映射)是SQLAlchemy最受欢迎的功能。它允许我们用Python类来代表数据库表,将数据库操作转换为对象操作,从而让代码更加面向对象,逻辑更清晰。

1. 声明式映射 (Declarative Mapping)

在现代SQLAlchemy中,我们使用DeclarativeBase来创建一个基类,我们所有的模型(Mapped Classes)都将继承自这个基类。

“`python
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy import String, Integer, ForeignKey
from typing import List, Optional

1. 创建一个声明式基类

class Base(DeclarativeBase):
pass

2. 定义映射类(模型),继承自Base

class User(Base):
tablename = “users” # 关联的数据库表名

# 定义列(字段)
# Mapped 和 mapped_column 提供了详细的类型提示
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30))
fullname: Mapped[Optional[str]]

# 定义关系
# back_populates 与 Address.user 建立双向关系
addresses: Mapped[List["Address"]] = relationship(
    back_populates="user", cascade="all, delete-orphan"
)

def __repr__(self) -> str:
    return f"User(id={self.id!r}, name={self.name!r}, fullname={self.fullname!r})"

class Address(Base):
tablename = “addresses”

id: Mapped[int] = mapped_column(primary_key=True)
email_address: Mapped[str]
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))

# 定义关系
user: Mapped["User"] = relationship(back_populates="addresses")

def __repr__(self) -> str:
    return f"Address(id={self.id!r}, email_address={self.email_address!r})"

使用模型的元数据来创建表

之前在Core部分我们手动创建了metadata_obj,这里ORM会自动管理

Base.metadata.create_all(engine)
``
这里我们定义了
UserAddress两个模型,并使用relationship建立了它们之间的一对多关系。cascade=”all, delete-orphan”是一个非常重要的设置,它意味着对User对象的操作会级联到其关联的Address`对象上(例如,删除一个User时,其所有Address也会被自动删除)。

2. 会话 (Session)

Session是ORM操作的核心。可以把它看作是一个工作单元(Unit of Work),它跟踪了所有从数据库加载或新创建的对象。所有对这些对象的更改都会被记录下来,并在适当的时候(flushcommit)一次性写入数据库。

“`python
from sqlalchemy.orm import Session

Session的典型用法是配合 with 语句

with Session(engine) as session:
# 在这个代码块中,我们可以使用session对象进行数据库操作
# …
session.commit() # 块结束前提交所有更改
“`

3. ORM-CRUD操作

3.1 创建 (Create)

创建数据就是创建Python对象,并将其添加到session中。

“`python
with Session(engine) as session:
# 创建两个User对象和它们关联的Address对象
sandy = User(name=”sandy”, fullname=”Sandy Cheeks”)
patrick = User(name=”patrick”, fullname=”Patrick Star”)

# 直接在创建User时通过构造函数添加Address
squidward = User(
    name="squidward",
    fullname="Squidward Tentacles",
    addresses=[Address(email_address="[email protected]")],
)

# 也可以在创建后添加
patrick.addresses.append(Address(email_address="[email protected]"))

# 将新创建的对象添加到session中
session.add_all([sandy, patrick, squidward])

# 此时,数据还只是在session的内存中,并未写入数据库
# session.commit()会将所有更改写入数据库并提交事务
session.commit()

“`

3.2 读取 (Read)

读取操作同样通过select()语句完成,但这次我们查询的是模型类而不是Table对象。

“`python
from sqlalchemy import select

with Session(engine) as session:
# 查询User表中name为”sandy”的用户
stmt = select(User).where(User.name == “sandy”)

# session.execute() 返回一个 Result 对象
result = session.execute(stmt)

# 从结果中获取单个对象,如果不存在则为None
sandy = result.scalar_one_or_none()
print(f"查询到的用户: {sandy}")

# 查询所有用户
all_users = session.scalars(select(User)).all()
for user in all_users:
    print(f"用户: {user}, 关联的地址: {user.addresses}")

``session.scalars()是一个方便的方法,它直接执行select语句并返回一个迭代器,该迭代器产生的是ORM对象实例,而不是Row`对象。

3.3 更新 (Update)

更新数据只需修改从session中获取的对象的属性,然后在commit时,SQLAlchemy会自动生成UPDATE语句。

“`python
with Session(engine) as session:
# 首先,查询要更新的对象
stmt = select(User).where(User.name == “patrick”)
patrick = session.scalars(stmt).one()

# 修改其属性
patrick.fullname = "Patrick Starfish"

# 添加一个新的地址
patrick.addresses.append(Address(email_address="[email protected]"))

# 当session提交时,SQLAlchemy会检测到patrick对象的更改
# 并自动发出 UPDATE 语句
session.commit()

“`

3.4 删除 (Delete)

使用session.delete()来标记一个对象为待删除。

“`python
with Session(engine) as session:
# 查询要删除的对象
stmt = select(User).where(User.name == “squidward”)
squidward = session.scalars(stmt).one()

# 标记为删除
session.delete(squidward)

# 提交后会执行DELETE语句
# 由于我们之前在User.addresses关系上设置了cascade="all, delete-orphan"
# squidward关联的Address对象也会被一并删除
session.commit()

“`

数据库迁移与Alembic

当你的模型发生变化(例如,增加一个新字段,修改列类型)时,你需要相应地更新数据库的表结构。直接修改代码并重新运行Base.metadata.create_all(engine)是行不通的,因为它不会修改已经存在的表。

手动编写ALTER TABLE语句既繁琐又容易出错。这时,我们就需要一个数据库迁移工具,而Alembic正是SQLAlchemy官方推荐的迁移工具。

Alembic可以比对你的模型定义和当前数据库结构之间的差异,并自动生成迁移脚本。通过版本化的迁移脚本,你可以轻松地将数据库升级到任意版本,或者降级回去。

Alembic的使用是一个独立的主题,这里只做简要介绍。基本流程如下:
1. pip install alembic
2. alembic init alembic – 初始化Alembic环境。
3. 配置alembic.ini文件,指定数据库连接。
4. 修改env.py,让Alembic知道你的模型定义(target_metadata = Base.metadata)。
5. alembic revision --autogenerate -m "Add new column to user table" – 自动创建迁移脚本。
6. alembic upgrade head – 应用迁移,更新数据库到最新版本。

结论:Core vs. ORM,如何选择?

  • SQLAlchemy Core:

    • 优点: 更接近原生SQL,性能开销极小,对于复杂的报表、数据分析或需要极致性能的批量操作非常理想。它提供了强大的SQL抽象能力,同时不失灵活性。
    • 适用场景: 数据仓库(ETL)作业、数据分析应用、需要手写优化SQL的后台服务。
  • SQLAlchemy ORM:

    • 优点: 极大地提升了开发效率,代码更加面向对象和可读,业务逻辑清晰。内置的会话管理(Unit of Work)简化了事务和对象状态的管理。
    • 适用场景: 大多数Web应用、API服务等业务逻辑复杂的CRUD密集型应用。

在许多项目中,Core和ORM并不是互斥的,而是可以结合使用。例如,你可以使用ORM来处理大部分业务逻辑,同时在需要性能优化的特定查询或批量数据处理时,回退到Core来编写更高效的语句。

总而言之,SQLAlchemy提供了一个从底层到高层的完整数据库工具集。无论你的需求多么简单或复杂,它都能提供一个优雅、高效且健壮的解决方案。

滚动至顶部