Flask ORM 查询详解:Model.query vs db.session.query vs db.session.execute

发布于:2025-08-16 ⋅ 阅读:(15) ⋅ 点赞:(0)

Flask ORM 查询详解:Model.query vs db.session.query vs db.session.execute

在 Flask 中使用 SQLAlchemy ORM 进行数据库查询时,主要有三种方式:Model.querydb.session.querydb.session.execute。下面我将详细解释每种方法的使用方式、结果获取和结果类型。

1. Model.query (推荐用于简单查询)

这是最简洁的查询方式,直接通过模型类进行查询。

基本用法

from models import User  # 假设有 User 模型

# 查询所有用户
all_users = User.query.all()

# 根据ID查询单个用户
user = User.query.get(1)

# 带过滤条件的查询
admin_users = User.query.filter_by(role='admin').all()

结果获取方法

方法 返回类型 描述 示例
all() list[Model] 返回所有结果的列表 users = User.query.all()
first() ModelNone 返回第一个结果 user = User.query.first()
get(ident) ModelNone 根据主键查询 user = User.query.get(1)
one() Model 必须有且只有一个结果,否则抛异常 user = User.query.filter_by(email='admin@ex.com').one()
one_or_none() ModelNone 有结果返回结果,无结果返回None user = User.query.filter_by(email='notexist@ex.com').one_or_none()
count() int 返回结果数量 count = User.query.count()
paginate() Pagination 对象 分页查询 page = User.query.paginate(page=1, per_page=20)

结果处理示例

# 获取所有用户列表
users = User.query.all()
print(type(users))  # <class 'list'>
print(type(users[0]))  # <class 'models.User'>

# 遍历用户
for user in users:
    print(user.username, user.email)

# 分页查询
page = User.query.paginate(page=1, per_page=10)
print(page.items)  # 当前页的用户列表
print(page.total)  # 总记录数

2. db.session.query (推荐用于复杂查询)

这是更灵活的查询方式,特别适合需要查询多个表或特定字段的场景。

基本用法

from models import User, Post
from sqlalchemy.orm import aliased

# 查询所有用户
all_users = db.session.query(User).all()

# 查询特定字段
user_emails = db.session.query(User.email).all()

# 多表查询
results = db.session.query(User, Post).join(Post, User.id == Post.user_id).all()

# 使用别名
ua = aliased(User, name='ua')
admin_users = db.session.query(ua).filter(ua.role == 'admin').all()

结果获取方法

方法 返回类型 描述
all() list[Row]list[tuple] 返回所有结果
first() RowtupleNone 返回第一个结果
one() Rowtuple 必须有且只有一个结果
one_or_none() RowtupleNone 最多一个结果
scalar() 标量值 返回第一行第一列的值
count() int 返回结果数量

结果类型处理

# 查询整个模型对象
users = db.session.query(User).all()
# 类型: list[User]
for user in users:
    print(user.username)

# 查询特定字段 - 返回元组
emails = db.session.query(User.email).all()
# 类型: list[Row] (行为类似元组)
for row in emails:
    print(row[0])  # 索引访问
    print(row.email)  # 属性访问

# 查询多个字段
user_data = db.session.query(User.id, User.username).all()
# 类型: list[Row]
for row in user_data:
    print(f"ID: {row.id}, Username: {row.username}")

# 多表查询 - 返回元组
user_posts = db.session.query(User, Post).join(Post).all()
for user, post in user_posts:
    print(f"{user.username} wrote: {post.title}")

# 使用标量值
email_count = db.session.query(func.count(User.email)).scalar()
print(f"Total emails: {email_count}")

3. db.session.execute (用于原生SQL查询)

当需要执行原生SQL时使用此方法,返回结果为 ResultProxy 对象。

基本用法

# 执行原生SQL查询
result = db.session.execute("SELECT * FROM users WHERE role = :role", {'role': 'admin'})

# 使用text构造SQL
from sqlalchemy import text
sql = text("SELECT * FROM users WHERE created_at > :start_date")
result = db.session.execute(sql, {'start_date': '2023-01-01'})

结果获取方法

方法 返回类型 描述
fetchall() list[RowProxy] 返回所有结果
fetchone() RowProxyNone 返回一行结果
fetchmany(size) list[RowProxy] 返回指定数量的结果
scalar() 标量值 返回第一行第一列的值
keys() list[str] 返回列名列表

结果类型处理

# 执行查询
result = db.session.execute("SELECT id, username, email FROM users")

# 获取列名
print(result.keys())  # ['id', 'username', 'email']

# 获取所有行
rows = result.fetchall()
# 类型: list[RowProxy]
for row in rows:
    # 索引访问
    print(row[0], row[1], row[2])
    
    # 列名访问
    print(row['id'], row['username'], row['email'])
    
    # 转换为字典
    row_dict = dict(row)
    print(row_dict)

# 获取单行
row = result.fetchone()
if row:
    print(row.username)

# 遍历结果集(适用于大量数据)
result = db.session.execute("SELECT * FROM large_table")
for row in result:
    process_row(row)  # 逐行处理,减少内存占用

三种方法对比

特性 Model.query db.session.query db.session.execute
易用性 ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐
灵活性 ⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐
返回类型 模型对象 Row对象/元组 RowProxy对象
ORM特性支持 完整 完整 有限
原生SQL支持 不支持 部分支持 完整支持
性能 最高
适用场景 简单CRUD操作 复杂查询、连接查询 高性能需求、复杂SQL

最佳实践建议

  1. 优先使用 Model.query

    • 适合大多数简单查询场景
    • 代码简洁易读
    • 直接返回模型对象
  2. 复杂查询使用 db.session.query

    • 需要多表连接时
    • 需要选择特定字段时
    • 需要使用SQL函数时
    from sqlalchemy import func
    
    # 每月新增用户统计
    monthly_stats = db.session.query(
        func.date_trunc('month', User.created_at).label('month'),
        func.count(User.id).label('count')
    ).group_by('month').all()
    
  3. 只在必要时使用 db.session.execute

    • 执行复杂原生SQL
    • 需要最高性能时
    • 执行数据库特定功能
    # 使用数据库特定函数(PostgreSQL示例)
    result = db.session.execute("""
        SELECT id, username, 
               json_build_object('id', id, 'name', username) AS user_json 
        FROM users
    """)
    
  4. 结果转换技巧

    # 将Row对象转换为字典
    def row_to_dict(row):
        return {column: getattr(row, column) for column in row._fields}
    
    # 将查询结果直接转为字典列表
    users = db.session.query(User.id, User.username).all()
    user_dicts = [dict(zip(u._fields, u)) for u in users]
    
    # 使用ScalarResult获取单一列的值列表
    emails = db.session.query(User.email).scalars().all()
    
  5. 性能优化

    # 使用yield_per处理大数据集
    large_query = User.query.yield_per(100)
    for user in large_query:
        process_user(user)
    
    # 只选择需要的字段
    # 而不是 select * 
    db.session.query(User.id, User.name)
    

事务管理

所有查询都应该在事务上下文中执行:

@app.route('/users')
def get_users():
    try:
        # 查询操作
        users = User.query.all()
        
        # 修改操作
        new_user = User(name="John")
        db.session.add(new_user)
        db.session.commit()
        
        return jsonify([u.serialize() for u in users])
    except Exception as e:
        db.session.rollback()
        return jsonify({"error": str(e)}), 500

总结

  1. 简单查询:优先使用 Model.query,直接返回模型对象列表
  2. 复杂查询:使用 db.session.query,灵活处理多表关联和字段选择
  3. 原生SQL:使用 db.session.execute,处理特殊需求
  4. 结果处理
    • 模型对象:直接访问属性
    • Row/RowProxy 对象:索引或列名访问
    • 标量值:使用 scalar()scalars()
  5. 性能关键:使用 yield_per 处理大数据,只选择必要字段

根据具体需求选择最合适的查询方式,并在开发过程中注意结果类型的处理,可以大大提高 Flask 应用的数据库操作效率和代码可维护性。


网站公告

今日签到

点亮在社区的每一天
去签到