SQLAlchemy 2.0 查询使用指南

发布于:2025-05-25 ⋅ 阅读:(20) ⋅ 点赞:(0)

SQLAlchemy 2.0 查询使用指南

1. 环境设置

首先,需要安装 SQLAlchemy 2.0 版本。假设你使用的是 SQLite 数据库,可以通过以下命令安装 SQLAlchemy:

pip install sqlalchemy

接着,我们创建数据库连接并初始化会话:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 创建数据库引擎
engine = create_engine('sqlite:///example.db', echo=True)

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

2. 定义数据模型

SQLAlchemy 使用 ORM (对象关系映射) 使得 Python 对象和数据库表之间实现映射。以下是定义 User​ 和 Address​ 表的模型示例:

from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String)
    age = Column(Integer)

    # 一对多关系,用户可以有多个地址
    addresses = relationship('Address', back_populates='user', lazy='select')
    
class Address(Base):
    __tablename__ = 'addresses'

    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    email = Column(String)

    # 反向关系
    user = relationship('User', back_populates='addresses')

在这个模型中,User​ 表和 Address​ 表通过 user_id​ 建立了外键关联,用户可以有多个地址。

3. 基本查询操作

SQLAlchemy 2.0 提供了一个更简洁和一致的查询接口。以下是一些常见的查询操作。

3.1 查询所有记录

from sqlalchemy import select

stmt = select(User)
result = session.execute(stmt).scalars().all()

for user in result:
    print(user.name, user.age)

SQL 原生查询:

SELECT * FROM users;

3.2 查询特定字段

查询 name​ 和 age​ 字段:

stmt = select(User.name, User.age)
result = session.execute(stmt).all()

for name, age in result:
    print(name, age)

SQL 原生查询:

SELECT name, age FROM users;

3.3 查询带条件的记录

查询年龄大于 30 的用户:

stmt = select(User).where(User.age > 30)
result = session.execute(stmt).scalars().all()

for user in result:
    print(user.name, user.age)

SQL 原生查询:

SELECT * FROM users WHERE age > 30;

3.4 排序查询

根据年龄降序排列查询用户:

stmt = select(User).order_by(User.age.desc())
result = session.execute(stmt).scalars().all()

for user in result:
    print(user.name, user.age)

SQL 原生查询:

SELECT * FROM users ORDER BY age DESC;

3.5 限制查询结果

限制返回的记录数为 5:

stmt = select(User).limit(5)
result = session.execute(stmt).scalars().all()

for user in result:
    print(user.name, user.age)

SQL 原生查询:

SELECT * FROM users LIMIT 5;

4. 关联查询(Join)

4.1 懒查询 (Lazy Loading)

懒查询是 SQLAlchemy 默认的加载方式,只有在访问关联属性时才会执行查询。它通过延迟加载来避免不必要的查询,但是可能导致 “N+1 查询问题”。

# 查询用户,并访问用户的地址
user = session.query(User).first()  # 执行查询,不查询 addresses
print(user.name)  # 输出用户名称

addresses = user.addresses  # 当访问 addresses 时,SQLAlchemy 会查询 addresses 表
for address in addresses:
    print(address.email)

SQL 原生查询:

SELECT * FROM users WHERE id = 1;
-- 查询用户的地址
SELECT * FROM addresses WHERE user_id = 1;

4.2 预加载(Eager Loading)

预加载是通过 JOIN 或子查询一次性加载所有相关数据。常见的预加载方法有 joinedload​、subqueryload​ 和 selectinload​。

4.2.1 joinedload​

​joinedload​ 会通过 JOIN 一次性加载所有相关数据,适用于关联数据量少的情况。

from sqlalchemy.orm import joinedload

stmt = select(User).options(joinedload(User.addresses))
result = session.execute(stmt).scalars().all()

for user in result:
    print(user.name)
    for address in user.addresses:
        print(address.email)

SQL 原生查询:

SELECT * FROM users
JOIN addresses ON users.id = addresses.user_id;
4.2.2 subqueryload​

​subqueryload​ 使用子查询来加载关联数据,适用于关联数据较多的情况。

from sqlalchemy.orm import subqueryload

stmt = select(User).options(subqueryload(User.addresses))
result = session.execute(stmt).scalars().all()

for user in result:
    print(user.name)
    for address in user.addresses:
        print(address.email)

SQL 原生查询:

SELECT * FROM users;
-- 查询 addresses 表的所有数据
SELECT * FROM addresses WHERE user_id IN (1, 2, 3, ...);
4.2.3 selectinload​

​selectinload​ 适用于多对一和一对多的关系,能够通过一次查询批量加载所有关联数据。

from sqlalchemy.orm import selectinload

stmt = select(User).options(selectinload(User.addresses))
result = session.execute(stmt).scalars().all()

for user in result:
    print(user.name)
    for address in user.addresses:
        print(address.email)

SQL 原生查询:

SELECT * FROM users;
-- 查询 addresses 表
SELECT * FROM addresses WHERE user_id IN (1, 2, 3, ...);

4.3 延迟加载与预加载的比较

特性 懒查询 (Lazy Loading) 预加载 (Eager Loading)
数据加载方式 延迟加载,直到访问关联属性时才查询 一次性加载所有关联数据
查询次数 可能多次查询(N+1 查询问题) 一次性查询,通常只发出少量查询
性能 对于小数据量高效 对于复杂查询避免 N+1 问题,适用于大量关联数据
常用方法 ​joinedload​、subqueryload​、selectinload​

5. 高级查询

5.1 聚合查询

SQLAlchemy 支持聚合函数,如 count​、sum​、avg​ 等。以下是计算用户数量的例子:

from sqlalchemy import func

stmt = select(func.count(User.id))
result = session.execute(stmt).scalar()
print(f"User Count: {result}")

SQL 原生查询:

SELECT COUNT(id) FROM users;

5.2 分组查询

通过 group_by​ 方法进行分组查询:

stmt = select(User.age, func.count(User.id)).group_by(User.age)
result = session.execute(stmt).all()

for age, count in result:
    print(f"Age: {age}, Count: {count}")

SQL 原生查询:

SELECT age, COUNT(id) FROM users GROUP BY age;

5.3 子查询

通过子查询来执行更复杂的查询:

subquery = select(func.max(User.age)).scalar_subquery()
stmt = select(User).where(User.age == subquery)
result = session.execute(stmt).scalars().all()

for user in result:
    print(user.name, user.age)

SQL 原生查询:

SELECT * FROM users WHERE age = (SELECT MAX(age) FROM users);

6. 事务管理

SQLAlchemy 2.0 自动管理事务,但你也可以显式地管理事务。例如:

from sqlalchemy.exc import SQLAlchemyError

try:
    new_user = User(name="David", age=28)
    session.add(new_user)
    session.commit()  # 提交事务
except SQLAlchemyError:
    session.rollback()  # 回滚事务
    print("事务失败,已回滚")

7. 总结

SQLAlchemy 2.0 提供了强大的 ORM 功能和灵活的查询接口。合理选择懒查询和预加载策略可以有效避免性能问题,特别是对于关系复杂的数据模型,预加载能帮助避免 N+1 查询问题。通过使用 select()​、joinedload​、subqueryload​ 等方法,我们可以优化查询性能,提高数据操作的效率。

当然,以下是更详细的 SQLAlchemy 2.0 查询使用指南,包括懒查询(Lazy Loading)、预加载(Eager Loading)以及异步 session​ 使用示例。我们将通过详细的案例来解释这些概念,帮助团队成员全面理解 SQLAlchemy 2.0 的使用方式。


SQLAlchemy 2.0 查询使用指南

1. 环境设置

首先,确保安装了 SQLAlchemy 2.0:

pip install sqlalchemy

对于异步支持,确保你安装了 asyncpg​(适用于 PostgreSQL)或 aiomysql​(适用于 MySQL)等异步数据库驱动。

pip install sqlalchemy[asyncio] asyncpg

1.1 创建数据库引擎和会话

数据库连接使用 create_engine()​ 来创建数据库引擎,会话使用 sessionmaker()​ 来创建。

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 创建数据库引擎
engine = create_engine('sqlite+aiosqlite:///example.db', echo=True, future=True)

# 创建异步会话
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine

# 异步引擎
engine = create_async_engine('sqlite+aiosqlite:///example.db', echo=True, future=True)

# 异步会话
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

2. 定义数据模型

SQLAlchemy ORM 通过 declarative_base()​ 定义数据库模型。这里我们定义 User​ 和 Address​ 模型。

from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    name = Column(String)
    age = Column(Integer)

    # 一对多关系
    addresses = relationship('Address', back_populates='user', lazy='select')

class Address(Base):
    __tablename__ = 'addresses'

    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('users.id'))
    email = Column(String)

    user = relationship('User', back_populates='addresses')

2.1 解释模型

在这个模型中,User​ 表有一个一对多关系与 Address​ 表关联,User​ 表的 addresses​ 属性代表用户的多个地址,而 Address​ 表的 user​ 属性反向关联用户。

3. 基本查询操作

SQLAlchemy 2.0 提供了更加直观的查询接口。以下是一些常见的查询操作。

3.1 查询所有记录

查询 User​ 表中的所有用户数据:

from sqlalchemy import select

# 创建查询语句
stmt = select(User)

# 执行查询
with session.begin():
    result = session.execute(stmt).scalars().all()

# 输出结果
for user in result:
    print(user.name, user.age)

原生 SQL 查询:

SELECT * FROM users;

3.2 查询特定字段

查询 name​ 和 age​ 字段的用户数据:

stmt = select(User.name, User.age)

with session.begin():
    result = session.execute(stmt).all()

for name, age in result:
    print(name, age)

原生 SQL 查询:

SELECT name, age FROM users;

3.3 查询带条件的记录

查询年龄大于 30 的用户:

stmt = select(User).where(User.age > 30)

with session.begin():
    result = session.execute(stmt).scalars().all()

for user in result:
    print(user.name, user.age)

原生 SQL 查询:

SELECT * FROM users WHERE age > 30;

3.4 排序查询

按年龄降序排列查询:

stmt = select(User).order_by(User.age.desc())

with session.begin():
    result = session.execute(stmt).scalars().all()

for user in result:
    print(user.name, user.age)

原生 SQL 查询:

SELECT * FROM users ORDER BY age DESC;

3.5 限制查询结果

限制返回前 5 条记录:

stmt = select(User).limit(5)

with session.begin():
    result = session.execute(stmt).scalars().all()

for user in result:
    print(user.name, user.age)

原生 SQL 查询:

SELECT * FROM users LIMIT 5;

4. 关联查询(Join)

SQLAlchemy 提供了强大的关联查询功能。我们可以使用 joinedload​、subqueryload​ 等方法来优化查询。

4.1 懒查询 (Lazy Loading)

懒查询是 SQLAlchemy 默认的加载方式。只有当你访问某个关联属性时,SQLAlchemy 会延迟加载数据。

user = session.query(User).first()  # 执行查询,不查询 addresses
print(user.name)  # 输出用户名称
addresses = user.addresses  # 此时会查询 addresses 表
for address in addresses:
    print(address.email)

原生 SQL 查询:

SELECT * FROM users WHERE id = 1;
-- 查询地址
SELECT * FROM addresses WHERE user_id = 1;

4.2 预加载(Eager Loading)

为了避免 N+1 查询问题,可以使用预加载。以下是几种常用的预加载方式。

4.2.1 joinedload​

​joinedload​ 使用 SQL JOIN 一次性加载所有关联数据,适用于数据量较小的场景。

from sqlalchemy.orm import joinedload

stmt = select(User).options(joinedload(User.addresses))

with session.begin():
    result = session.execute(stmt).scalars().all()

for user in result:
    print(user.name)
    for address in user.addresses:
        print(address.email)

原生 SQL 查询:

SELECT * FROM users
JOIN addresses ON users.id = addresses.user_id;
4.2.2 subqueryload​

​subqueryload​ 使用子查询加载关联数据,适用于关联数据较多的情况。

from sqlalchemy.orm import subqueryload

stmt = select(User).options(subqueryload(User.addresses))

with session.begin():
    result = session.execute(stmt).scalars().all()

for user in result:
    print(user.name)
    for address in user.addresses:
        print(address.email)

原生 SQL 查询:

SELECT * FROM users;
-- 查询 addresses 表的所有数据
SELECT * FROM addresses WHERE user_id IN (1, 2, 3, ...);
4.2.3 selectinload​

​selectinload​ 通过批量查询一次性加载关联数据,适用于多对一和一对多的关系。

from sqlalchemy.orm import selectinload

stmt = select(User).options(selectinload(User.addresses))

with session.begin():
    result = session.execute(stmt).scalars().all()

for user in result:
    print(user.name)
    for address in user.addresses:
        print(address.email)

原生 SQL 查询:

SELECT * FROM users;
-- 查询 addresses 表
SELECT * FROM addresses WHERE user_id IN (1, 2, 3, ...);

4.3 延迟加载与预加载的比较

特性 懒查询 (Lazy Loading) 预加载 (Eager Loading)
数据加载方式 延迟加载,直到访问关联属性时才查询 一次性加载所有关联数据
查询次数 可能多次查询(N+1 查询问题) 一次性查询,通常只发出少量查询
性能 对于小数据量高效 对于复杂查询避免 N+1 问题,适用于大量关联数据
常用方法 ​joinedload​、subqueryload​、selectinload​

5. 高级查询

5.1 聚合查询

SQLAlchemy 支持聚合函数,如 count​、sum​、avg​ 等。以下是计算用户数量的例子:

from sqlalchemy import func

stmt = select(func.count(User.id))

with session.begin():
    result = session.execute(stmt).scalar()

print(f"User Count: {result}")

SQL 原生查询:

SELECT COUNT(id) FROM users;

5.2 分组查询

通过 group_by​ 方法进行分组查询:

stmt = select(User.age, func.count(User.id)).group_by(User.age)

with session.begin():
    result = session.execute(stmt).all()

for age, count in result:
    print(f"Age: {age}, Count: {count}")

SQL 原生查询:

SELECT age, COUNT(id) FROM users GROUP BY age;

5.3 子查询

通过子查询来执行更复杂的查询:

subquery = select(func.max(User.age)).scalar_subquery()

stmt = select(User).where(User.age == subquery)

with session.begin():
    result = session.execute(stmt).scalars().all()

for user in result:
    print(user.name, user.age)

SQL 原生查询:

SELECT * FROM users WHERE age = (SELECT MAX(age) FROM users);

6. 异步查询使用示例

SQLAlchemy 支持异步查询,适用于需要处理高并发操作的场景。以下是使用异步 session​ 进行查询的示例。

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.future import select

# 创建异步引擎和会话
engine = create_async_engine('sqlite+aiosqlite:///example.db', echo=True, future=True)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async def async_query():
    async with async_session() as session:
        stmt = select(User)
        result = await session.execute(stmt)
        users = result.scalars().all()
        for user in users:
            print(user.name)

# 执行异步查询
import asyncio
asyncio.run(async_query())

在异步查询中,我们使用 await​ 来执行查询,确保数据库操作不会阻塞主线程。

7. 总结

SQLAlchemy 2.0 提供了灵活且高效的查询功能,支持懒查询、预加载和异步操作。通过合理使用懒查询和预加载,可以有效地避免 N+1 查询问题,提高应用性能。在异步操作方面,SQLAlchemy 的异步会话使得我们可以在高并发环境下高效地进行数据库查询。


网站公告

今日签到

点亮在社区的每一天
去签到