Flask ORM 查询详解:Model.query vs db.session.query vs db.session.execute
在 Flask 中使用 SQLAlchemy ORM 进行数据库查询时,主要有三种方式:Model.query
、db.session.query
和 db.session.execute
。下面我将详细解释每种方法的使用方式、结果获取和结果类型。
1. Model.query (推荐用于简单查询)
这是最简洁的查询方式,直接通过模型类进行查询。
基本用法
from models import User # 假设有 User 模型
# 查询所有用户
all_users = User.query.all()
# 根据ID查询单个用户
user = User.query.get(1)
# 带过滤条件的查询
admin_users = User.query.filter_by(role='admin').all()
结果获取方法
方法 | 返回类型 | 描述 | 示例 |
---|---|---|---|
all() |
list[Model] |
返回所有结果的列表 | users = User.query.all() |
first() |
Model 或 None |
返回第一个结果 | user = User.query.first() |
get(ident) |
Model 或 None |
根据主键查询 | user = User.query.get(1) |
one() |
Model |
必须有且只有一个结果,否则抛异常 | user = User.query.filter_by(email='admin@ex.com').one() |
one_or_none() |
Model 或 None |
有结果返回结果,无结果返回None | user = User.query.filter_by(email='notexist@ex.com').one_or_none() |
count() |
int |
返回结果数量 | count = User.query.count() |
paginate() |
Pagination 对象 |
分页查询 | page = User.query.paginate(page=1, per_page=20) |
结果处理示例
# 获取所有用户列表
users = User.query.all()
print(type(users)) # <class 'list'>
print(type(users[0])) # <class 'models.User'>
# 遍历用户
for user in users:
print(user.username, user.email)
# 分页查询
page = User.query.paginate(page=1, per_page=10)
print(page.items) # 当前页的用户列表
print(page.total) # 总记录数
2. db.session.query (推荐用于复杂查询)
这是更灵活的查询方式,特别适合需要查询多个表或特定字段的场景。
基本用法
from models import User, Post
from sqlalchemy.orm import aliased
# 查询所有用户
all_users = db.session.query(User).all()
# 查询特定字段
user_emails = db.session.query(User.email).all()
# 多表查询
results = db.session.query(User, Post).join(Post, User.id == Post.user_id).all()
# 使用别名
ua = aliased(User, name='ua')
admin_users = db.session.query(ua).filter(ua.role == 'admin').all()
结果获取方法
方法 | 返回类型 | 描述 |
---|---|---|
all() |
list[Row] 或 list[tuple] |
返回所有结果 |
first() |
Row 或 tuple 或 None |
返回第一个结果 |
one() |
Row 或 tuple |
必须有且只有一个结果 |
one_or_none() |
Row 或 tuple 或 None |
最多一个结果 |
scalar() |
标量值 | 返回第一行第一列的值 |
count() |
int |
返回结果数量 |
结果类型处理
# 查询整个模型对象
users = db.session.query(User).all()
# 类型: list[User]
for user in users:
print(user.username)
# 查询特定字段 - 返回元组
emails = db.session.query(User.email).all()
# 类型: list[Row] (行为类似元组)
for row in emails:
print(row[0]) # 索引访问
print(row.email) # 属性访问
# 查询多个字段
user_data = db.session.query(User.id, User.username).all()
# 类型: list[Row]
for row in user_data:
print(f"ID: {row.id}, Username: {row.username}")
# 多表查询 - 返回元组
user_posts = db.session.query(User, Post).join(Post).all()
for user, post in user_posts:
print(f"{user.username} wrote: {post.title}")
# 使用标量值
email_count = db.session.query(func.count(User.email)).scalar()
print(f"Total emails: {email_count}")
3. db.session.execute (用于原生SQL查询)
当需要执行原生SQL时使用此方法,返回结果为 ResultProxy
对象。
基本用法
# 执行原生SQL查询
result = db.session.execute("SELECT * FROM users WHERE role = :role", {'role': 'admin'})
# 使用text构造SQL
from sqlalchemy import text
sql = text("SELECT * FROM users WHERE created_at > :start_date")
result = db.session.execute(sql, {'start_date': '2023-01-01'})
结果获取方法
方法 | 返回类型 | 描述 |
---|---|---|
fetchall() |
list[RowProxy] |
返回所有结果 |
fetchone() |
RowProxy 或 None |
返回一行结果 |
fetchmany(size) |
list[RowProxy] |
返回指定数量的结果 |
scalar() |
标量值 | 返回第一行第一列的值 |
keys() |
list[str] |
返回列名列表 |
结果类型处理
# 执行查询
result = db.session.execute("SELECT id, username, email FROM users")
# 获取列名
print(result.keys()) # ['id', 'username', 'email']
# 获取所有行
rows = result.fetchall()
# 类型: list[RowProxy]
for row in rows:
# 索引访问
print(row[0], row[1], row[2])
# 列名访问
print(row['id'], row['username'], row['email'])
# 转换为字典
row_dict = dict(row)
print(row_dict)
# 获取单行
row = result.fetchone()
if row:
print(row.username)
# 遍历结果集(适用于大量数据)
result = db.session.execute("SELECT * FROM large_table")
for row in result:
process_row(row) # 逐行处理,减少内存占用
三种方法对比
特性 | Model.query | db.session.query | db.session.execute |
---|---|---|---|
易用性 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐ |
灵活性 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
返回类型 | 模型对象 | Row对象/元组 | RowProxy对象 |
ORM特性支持 | 完整 | 完整 | 有限 |
原生SQL支持 | 不支持 | 部分支持 | 完整支持 |
性能 | 高 | 高 | 最高 |
适用场景 | 简单CRUD操作 | 复杂查询、连接查询 | 高性能需求、复杂SQL |
最佳实践建议
优先使用 Model.query
- 适合大多数简单查询场景
- 代码简洁易读
- 直接返回模型对象
复杂查询使用 db.session.query
- 需要多表连接时
- 需要选择特定字段时
- 需要使用SQL函数时
from sqlalchemy import func # 每月新增用户统计 monthly_stats = db.session.query( func.date_trunc('month', User.created_at).label('month'), func.count(User.id).label('count') ).group_by('month').all()
只在必要时使用 db.session.execute
- 执行复杂原生SQL
- 需要最高性能时
- 执行数据库特定功能
# 使用数据库特定函数(PostgreSQL示例) result = db.session.execute(""" SELECT id, username, json_build_object('id', id, 'name', username) AS user_json FROM users """)
结果转换技巧
# 将Row对象转换为字典 def row_to_dict(row): return {column: getattr(row, column) for column in row._fields} # 将查询结果直接转为字典列表 users = db.session.query(User.id, User.username).all() user_dicts = [dict(zip(u._fields, u)) for u in users] # 使用ScalarResult获取单一列的值列表 emails = db.session.query(User.email).scalars().all()
性能优化
# 使用yield_per处理大数据集 large_query = User.query.yield_per(100) for user in large_query: process_user(user) # 只选择需要的字段 # 而不是 select * db.session.query(User.id, User.name)
事务管理
所有查询都应该在事务上下文中执行:
@app.route('/users')
def get_users():
try:
# 查询操作
users = User.query.all()
# 修改操作
new_user = User(name="John")
db.session.add(new_user)
db.session.commit()
return jsonify([u.serialize() for u in users])
except Exception as e:
db.session.rollback()
return jsonify({"error": str(e)}), 500
总结
- 简单查询:优先使用
Model.query
,直接返回模型对象列表 - 复杂查询:使用
db.session.query
,灵活处理多表关联和字段选择 - 原生SQL:使用
db.session.execute
,处理特殊需求 - 结果处理:
- 模型对象:直接访问属性
- Row/RowProxy 对象:索引或列名访问
- 标量值:使用
scalar()
或scalars()
- 性能关键:使用
yield_per
处理大数据,只选择必要字段
根据具体需求选择最合适的查询方式,并在开发过程中注意结果类型的处理,可以大大提高 Flask 应用的数据库操作效率和代码可维护性。