每篇前言:
🏆🏆作者介绍:【孤寒者】—CSDN全栈领域优质创作者、HDZ核心组成员、华为云享专家Python全栈领域博主、CSDN原力计划作者
- 🔥🔥本文已收录于Flask框架从入门到实战专栏:《Flask框架从入门到实战》
- 🔥🔥热门专栏推荐:《Python全栈系列教程》 | 《爬虫从入门到精通系列教程》 | 《爬虫进阶+实战系列教程》 | 《Scrapy框架从入门到实战》 | 《Flask框架从入门到实战》 | 《Django框架从入门到实战》 | 《Tornado框架从入门到实战》 | 《爬虫必备前端技术栈》
- 🎉🎉订阅专栏后可私聊进一千多人Python全栈交流群(手把手教学,问题解答);进群可领取Python全栈教程视频 + 多得数不过来的计算机书籍:基础、Web、爬虫、数据分析、可视化、机器学习、深度学习、人工智能、算法、面试题等。
- 🚀🚀加入我【博主V信:GuHanZheCoder】一起学习进步,一个人可以走的很快,一群人才能走的更远!
👇 👉 🚔文末扫码关注本人公众号~🚔 👈☝️
SQLAlchemy
SQLAlchemy是一个Python编程语言下的SQL工具和对象关系映射(ORM)库。它提供了一种以更Pythonic的方式与数据库进行交互的方式。通过SQLAlchemy,我们可以使用Python类和对象来表示数据库表和查询,而不是直接使用SQL语句。该框架建立在数据库API之上,使用对象关系映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。如下第一张图:
该库的主要组件包括:
Core部分: 提供SQL表达式语言,允许你以Python代码的方式构建和执行SQL语句。
ORM部分: 允许你定义数据库模型,将数据库表映射到Python类,并通过这些类来进行数据库操作,而无需直接编写SQL语句。
SQLAlchemy的优势包括灵活性、可移植性和强大的查询功能。它支持多种数据库后端,如SQLite、MySQL、PostgreSQL等。通过使用ORM,可以更轻松地进行数据库操作,而无需深入了解底层的SQL语法。
知识点补给站:
第一个——什么是ORM:
ORM(对象关系映射)是一种编程技术,它将数据库中的数据映射到编程语言中的对象,使得通过对象来操作数据库变得更加直观和方便。ORM框架负责处理数据库操作,开发者可以使用面向对象的方式进行数据库交互,而不必直接编写SQL语句。这提供了一种抽象层,简化了数据库操作的复杂性。
对象关系映射:
- 类 ——> 表
- 对象 ——> 记录(一行数据)
############################################################################################
第二个——ORM和原生SQL哪个好?
两个都好,选择ORM或原生SQL通常取决于项目需求和个人偏好。
ORM的优点:
抽象化: ORM提供了面向对象的抽象,使得数据库操作更接近编程语言的习惯,减少了直接编写SQL的需要。
可维护性: ORM使代码更清晰、易读,减少了手动管理数据库连接和结果集的繁琐细节,提高了代码的可维护性。
跨数据库平台: ORM通常支持多种数据库平台,因此可以轻松切换数据库引擎而无需更改大量代码。
生产力: ORM减少了手动编写SQL语句的时间,提高了开发速度,尤其是在简单到中等复杂度的项目中。
原生SQL的优点:
灵活性: 直接使用原生SQL语句能够更灵活地执行复杂查询、优化和调整数据库操作,适用于一些高度优化的场景。
性能: 对于一些复杂的查询或涉及大量数据的操作,原生SQL通常能够更好地优化执行计划,提高性能。
数据库专业性: 对于有丰富数据库经验的开发者,直接使用原生SQL可能更符合其专业知识,更容易理解和调优。
综合考虑:
在简单项目和迅速开发的场景下,ORM可能更具优势,可以提高开发效率。而在对性能要求极高、需要进行复杂查询和数据库优化的情况下,原生SQL可能更为合适。在实际应用中,也可以根据项目的具体情况,将ORM和原生SQL结合使用,发挥各自的优势。
############################################################################################
第三个——DB First 和 Code First:
DB First(数据库优先):
DB First 是一种开发方法,其中开发人员首先定义数据库结构,然后从数据库中生成相应的类或实体。通常,这涉及使用工具或ORM(对象关系映射)框架,能够自动生成与数据库表相对应的类。
- 步骤:
- 定义数据库表结构,包括表、列、关系等。
- 使用工具或ORM框架,通过数据库连接获取表的元数据。
- 自动生成相应的类或实体,这些类的属性通常映射到数据库表的字段。
- 优点:
- 快速创建实体类,省去手动编写类的步骤。
- 可以确保数据库结构与代码实体的一致性。
- 缺点:
- 不够灵活,对于数据库结构的更改可能需要重新生成类。
- 自动生成的代码可能不符合特定编码风格要求。
Code First(代码优先):
Code First 则是相反的方法,开发人员首先定义应用程序中的类或实体,然后根据这些类创建数据库表。ORM框架负责将类与数据库表进行映射。
- 步骤:
- 定义应用程序中的类,这些类通常表示数据库表。
- 使用工具或ORM框架,根据类的定义生成数据库表。
- 优点:
- 更灵活,可以通过代码轻松调整数据库结构。
- 可以使用面向对象的思维方式设计应用程序。
- 缺点:
- 初始开发可能相对慢,需要手动创建类。
- 需要注意数据库和类的一致性,特别是在复杂的关系映射中。
选择:
选择DB First还是Code First通常取决于项目需求和开发人员的偏好。DB First适用于已有数据库结构的项目,而Code First适用于从零开始创建应用程序的情况。在一些项目中,也可以采用混合的方式,结合两者的优势。
###########################################################################################
第四个——ORM的实现原理(Unit of Work 设计模式):
ORM的实现原理:
ORM(对象关系映射)通过将对象模型和关系数据库之间建立映射,实现了对象与数据库表之间的转换。以下是ORM的主要实现原理:
映射: ORM通过映射将对象的属性映射到数据库表的字段,建立对象与数据库之间的关系。这包括对象的属性类型、关系映射(如一对多、多对多关系)等。
查询语言: ORM提供了一种查询语言,允许开发人员使用面向对象的方式进行数据库查询,而无需直接编写SQL语句。这种查询语言被翻译成对应的SQL查询。
数据操作: ORM负责将对象的状态与数据库同步,包括插入、更新、删除等操作。它提供了一组API,开发人员通过这些API对对象进行CRUD操作,ORM则负责将这些操作映射为对应的SQL语句。
事务管理: ORM通常支持事务管理,确保对数据库的操作是原子的、一致的、隔离的、持久的(ACID特性)。
DDD中的Unit of Work(工作单元):
在领域驱动设计(DDD)中,Unit of Work是一种设计模式,用于管理对象的生命周期和数据的事务性。在ORM中,Unit of Work负责跟踪对象的变化并协调这些变化的持久化。
对象追踪: Unit of Work负责追踪被加载到内存中的对象的状态变化,包括新增、修改、删除等。
事务控制: Unit of Work在一次业务操作中负责管理事务的开始和结束,以确保一系列的操作要么全部成功,要么全部失败,从而维护数据的一致性。
协调持久化: Unit of Work协调将变化同步到数据库,确保对象的状态与数据库的一致性。
在ORM中,Unit of Work与数据映射器(Mapper)协同工作,确保对象与数据库之间的交互是有效且一致的。
1. 安装:
pip install sqlalchemy
注1:SQLAlchemy无法修改表结构,如果需要可以使用开源软件Alembic来完成。
注2:SQLAlchemy本身无法操作数据库,其内部使用pymysql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作。
(1)执行原生SQL:
from sqlalchemy import create_engine, text
engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/gset", max_overflow=5)
connection = engine.connect()
result = connection.execute(text('select * from translate_log'))
print(result.fetchall())
拓展1:可指定操作各类数据库,create_engine里的数据库链接URL详见官网:https://docs.sqlalchemy.org/en/20/core/engines.html#sqlalchemy.create_engine
URL的典型形式如下图:
拓展2:SQLAlchemy自带线程池:
(2)建表:
SQLAlchemy内部组件调用顺序为:使用 Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 进行数据库操作。Engine使用Schema Type创建一个特定的结构对象,之后通过SQL Expression Language将该对象转换成SQL语句,然后通过 ConnectionPooling 连接数据库,再然后通过 Dialect 执行SQL,并获取结果。
TIPS:有两种建表的方式可选——使用类的方式和使用metadata方式创建表。二者区别在于metadata可以不指定主键,而是用class方式必须要求有主键。
第一种:使用metadata创建表:
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData
engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/gset",
echo=True, # echo=True打印输出信息和执行的sql语句 默认Flase
max_overflow=5
)
meta = MetaData() # 生成源类
# 定义表结构
user = Table('user', meta,
Column('id', Integer, nullable=Table, autoincrement=True, primary_key=True),
Column('name', String(20), nullable=True),
Column('age', Integer, nullable=True)
)
host = Table('host', meta,
Column('ip', String(20), nullable=True),
Column('hostname', String(20), nullable=True),
)
meta.create_all(engine) # 创建表,如果存在则忽视
第二种:使用类的方式创建表:
from sqlalchemy import create_engine, Column, Integer, String, Date
from sqlalchemy.ext.declarative import declarative_base
engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/gset",
echo=True,
max_overflow=5
)
base = declarative_base()
# 定义表结构
class User(base):
__tablename__ = 'book'
id = Column(Integer, primary_key=True)
name = Column(String(32))
date = Column(Date)
base.metadata.create_all(engine) # 创建表,如果存在则忽视
# Base.metadata.drop_all(engine) 删除表
(3)增删改查:
增:
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker, declarative_base
engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/gset",
max_overflow=5,
echo=True)
base = declarative_base() # 生成orm基类
class User(base):
__tablename__ = 'user'
id = Column(Integer, autoincrement=True, primary_key=True)
name = Column(String(20))
age = Column(Integer)
sessoion_class = sessionmaker(bind=engine) # 创建与数据库的会话类
Session = sessoion_class() # 生成会话实例
user1 = User(name='wd', age=22) # 生成user对象
Session.add(user1) # 添加user1,可以使用add_all,参数为列表或者tuple
Session.commit() # 提交
# Session.rollback() # 回滚
Session.close() # 关闭会话
删:
data = Session.query(User).filter(User.age == 22).delete() # 会删所有
Session.commit()
Session.close()
改:
# data=Session.query(User).filter(User.age > 20).update({"name": 'jarry'}) # update语法
data = Session.query(User).filter(User.age == 22).first()
data.name = 'GuHanZheCoder'
Session.commit()
Session.close()
查:
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker, declarative_base
engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/gset",
max_overflow=5,
# echo=True
)
base = declarative_base()
class User(base):
__tablename__ = 'user'
id = Column(Integer, autoincrement=True, primary_key=True)
name = Column(String(20))
age = Column(Integer)
def __repr__(self): # 定义
return "(%s, %s, %s)" % (self.id, self.name, self.age)
sessoion_class = sessionmaker(bind=engine)
Session = sessoion_class()
# get语法获取primrykey中的关键字,在这里主键为id,获取id为3的数据
data = Session.get(User, 3)
print(data)
data = Session.query(User).filter(User.age > 22, User.name == 'wd').first()
print(data)
# filter语法(传的是表达式)两个等于号,filter_by语法(传的是参数)一个等于号,可以有多个filter,如果多个数据返回列表,first代表获取第一个,为all()获取所有
data = Session.query(User).filter(User.age == 20, User.name.in_(['GuHanZheCoder', 'wd'])).all() # in语法
print(data)
data = Session.query(User).filter_by(name='wd').first()
print(data)
Session.commit()
Session.close()
(4)拓展几个常用操作:
# 获取所有数据
data = Session.query(User).all() # 获取user表所有数据
for i in data:
print(i)
# 统计
# count = Session.query(User).count() # 获取总条数
count = Session.query(User).filter(User.name.like("w%")).count() # 获取过滤后总条数
print(count)
# 分组
from sqlalchemy import func # 需要导入func函数
res = Session.query(func.count(User.name), User.name).group_by(User.name).all()
print(res)
(5)外键关联相关:
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship, declarative_base
from sqlalchemy.orm import sessionmaker
engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/gset?charset=gbk", # ?charset是连接数据库的字符集编码(和数据库的编码一样)
encoding="utf-8",
echo=True,
max_overflow=5
)
Base = declarative_base()
class User(Base):
__tablename__ = 'user'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(20))
age = Column(Integer)
def __repr__(self):
return "<id:%s,name:%s,age:%s>" % (self.id, self.name, self.age)
class Host(Base):
__tablename__ = 'host'
user_id = Column(Integer, ForeignKey('user.id')) # user_id关联user表中的id
hostname = Column(String(20))
ip = Column(String(20), primary_key=True)
# 通过host_user查询host表中关联的user信息,通过user_host,在user表查询关联的host,与生成的表结构无关,只是为了方便查询
host_user = relationship('user', backref='user_host')
def __repr__(self):
return "<user_id:%s,hostname:%s,ip:%s>" % (self.user_id, self.hostname, self.ip)
Base.metadata.create_all(engine)
Session_class = sessionmaker(bind=engine)
Session = Session_class()
host1 = Session.query(Host).first()
print(host1.host_user)
print(host1)
user1 = Session.query(User).first()
print(user1.user_host)
# 骚操作:
# 如果想在增加一条host表数据的同时添加一条user表数据,因为relationship的使用,可以一行代码实现:
Session.add(Host(hostname="百度", host_user=User(name="xiaowang", age=18)))
(6)一对多场景:
应用场景:
当我们购物的时候,有一个收发票地址,和一个收货地址。
关系如下:默认情况下,收发票地址和收获地址是一致的,但是也有可能我想买东西送给别人,而发票要自己留着,那收货的地址和收发票的地址可以不同。
即:一个顾客可以有多个地址,而每个地址只属于一个顾客。
这就需要
Customer
和Address
类之间建立一对多的关系~
from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import relationship, declarative_base
Base = declarative_base()
class Customer(Base):
__tablename__ = 'customer'
id = Column(Integer, primary_key=True)
name = Column(String)
billing_address_id = Column(Integer, ForeignKey("address.id"))
shipping_address_id = Column(Integer, ForeignKey("address.id"))
billing_address = relationship("Address", foreign_keys=[billing_address_id])
shipping_address = relationship("Address", foreign_keys=[shipping_address_id])
# 同时关联同一个表的一个字段,使用relationship需要指定foreign_keys来说明关联时使用的外键,为了让sqlalchemy清楚表与表之间的关系
class Address(Base):
__tablename__ = 'address'
id = Column(Integer, primary_key=True)
street = Column(String)
city = Column(String)
state = Column(String)
(7)多对多场景:
很多时候,我们会使用多对多外键关联,例如:书和作者,学生和课程,即:书可以有多个作者,而每个作者可以写多本书,orm提供了更简单方式操作多对多关系,在进行删除操作的时候,orm会自动删除相关联的数据。
1. 表结构创建:
from sqlalchemy import Column, Table, String, Integer, ForeignKey
from sqlalchemy import create_engine
from sqlalchemy.orm import relationship, declarative_base
engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/gset?charset=gbk",
echo=True,
max_overflow=5
)
Base = declarative_base()
stu_cour = Table('stu_cour', Base.metadata,
Column('stu_id', Integer, ForeignKey('student.id')),
Column('cour_id', Integer, ForeignKey('course.id'))
)
class student(Base):
__tablename__ = 'student'
id = Column(Integer, autoincrement=True, primary_key=True)
stu_name = Column(String(32))
stu_age = Column(String(32))
# course是关联的表,secondary是中间表,当然,也可以在第三张关联表中使用两个relationship关联student表和course表
courses = relationship('course', secondary=stu_cour, backref='students')
def __repr__(self):
return '<%s>' % self.stu_name
class course(Base):
__tablename__ = 'course'
id = Column(Integer, autoincrement=True, primary_key=True)
cour_name = Column(String(32))
def __repr__(self):
return '<%s>' % self.cour_name
Base.metadata.create_all(engine)
2. 插入数据:
from sqlalchemy import Column, Table, String, Integer, ForeignKey
from sqlalchemy import create_engine
from sqlalchemy.orm import relationship
from sqlalchemy.orm import sessionmaker, declarative_base
engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/gset?charset=gbk",
echo=True,
max_overflow=5
)
Base = declarative_base()
stu_cour = Table('stu_cour', Base.metadata,
Column('stu_id', Integer, ForeignKey('student.id')),
Column('cour_id', Integer, ForeignKey('course.id'))
)
class student(Base):
__tablename__ = 'student'
id = Column(Integer, autoincrement=True, primary_key=True)
stu_name = Column(String(32))
stu_age = Column(String(32))
# course是关联的表,secondary是中间表,当然,也可以在第三张关联表中使用两个relationship关联student表和course表
courses = relationship('course', secondary=stu_cour, backref='students')
def __repr__(self):
return '<%s>' % self.stu_name
class course(Base):
__tablename__ = 'course'
id = Column(Integer, autoincrement=True, primary_key=True)
cour_name = Column(String(32))
def __repr__(self):
return '<%s>' % self.cour_name
session_class = sessionmaker(bind=engine)
session = session_class()
stu1 = student(stu_name='wd', stu_age='22')
stu2 = student(stu_name='jack', stu_age=33)
stu3 = student(stu_name='rose', stu_age=18)
c1 = course(cour_name='linux')
c2 = course(cour_name='python')
c3 = course(cour_name='go')
stu1.courses = [c1, c2] # 添加学生课程关联
stu2.courses = [c1]
stu3.courses = [c1, c2, c3]
session.add_all([stu1, stu2, stu3, c1, c2, c3])
session.commit()
拓展:
- 因为使用了relationship,还可以像下面这样添加数据!
# 正向操作:
obj = student(stu_name="xiaowang", stu_age=18)
obj.courses = [course(cour_name="物理"), course(cour_name="化学")]
session.add(obj)
session.commit()
# 反向操作:
obj = course(cour_name="英语")
obj.students = [sutdent(stu_name="xiaoming", stu_age=11), student(stu_name="xiaozhang", stu_age=16)]
session.add(obj)
session.commit()
4. 查询:
session_class = sessionmaker(bind=engine)
session = session_class()
stu_obj = session.query(student).filter(student.stu_name == 'wd').first()
print(stu_obj.courses) # 查询wd学生所报名的课程
cour_obj = session.query(course).filter(course.cour_name == 'python').first()
print(cour_obj.students) # 查询报名python课程所对应的学生
session.commit()
4. 删除:
session_class = sessionmaker(bind=engine)
session = session_class()
cour_obj = session.query(course).filter(course.cour_name == 'python').first()
session.delete(cour_obj) # 删除python课程
session.commit()
2. 使用:
【单表+一对多+多对多】
(1)建表:
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String, UniqueConstraint, Index, DateTime, ForeignKey
from sqlalchemy import create_engine
import datetime
engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/gset?charset=gbk",
echo=True,
max_overflow=5
)
Base = declarative_base()
class Classes(Base):
__tablename__ = 'classes'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(32), nullable=False, unique=True)
class Student(Base):
__tablename__ = 'student'
id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(32), nullable=False, unique=True)
password = Column(String(64), nullable=False)
ctime = Column(DateTime, default=datetime.datetime.now)
class_id = Column(Integer, ForeignKey("classes.id"))
class Hobby(Base):
__tablename__ = 'hobby'
id = Column(Integer, primary_key=True)
caption = Column(String(50), default='王者荣耀')
# 多对多的第三张表需要手动写
class Student2Hobby(Base):
__tablename__ = 'student2hobby'
id = Column(Integer, primary_key=True, autoincrement=True)
student_id = Column(Integer, ForeignKey('student.id'))
hobby_id = Column(Integer, ForeignKey('hobby.id'))
__table_args__ = (
"""
唯一索引(UniqueConstraint):
在表格的定义中添加一个唯一索引,该索引由 'student_id' 和 'hobby_id' 两个字段组成,并使用名称 'uix_student_id_hobby_id' 标识。唯一索引的目的是确保表 格中的每一行都具有唯一的 ('student_id', 'hobby_id') 组合。
"""
UniqueConstraint('student_id', 'hobby_id', name='uix_student_id_hobby_id'),
"""
索引(Index):
添加一个名为 'ix_sutdent_id_hobby_id'的索引,包含两个字段 'student_id' 和 'hobby_id'。
"""
# Index('ix_sutdent_id_hobby_id', 'student_id', 'hobby_id')
)
Base.metadata.create_all(engine)
知识点补给站:
唯一索引(UniqueConstraint)的作用:
- 确保唯一性: 确保在表格中没有两行记录具有相同的 (‘student_id’, ‘hobby_id’) 组合。如果试图插入或更新记录,导致违反这一唯一性规则,将会引发唯一性冲突的异常。
- 优化查询: 数据库可以使用索引来更有效地执行与这两个字段有关的查询,提高查询性能。
总体而言,通过设置唯一索引,可以在数据库层面强制确保某些字段或字段组合的唯一性。
索引(Index)的作用:
- 提高查询性能: 数据库可以使用这个索引更快地执行基于 ‘student_id’ 和 ‘hobby_id’ 的查询,减少查询的时间复杂度。
- 降低插入、更新、删除性能: 索引的存在可能使插入、更新、删除等写操作的性能略有降低,因为除了对表格的修改外,还需要维护索引的结构。
(2) 拓展的基本增删改查:
增:
改:
(将classes表中id大于0的所有数据的name字段加上后缀111)
注意:
上述是字符串,所以synchronize_session值为False
;
如果是数值的话,synchronize_session值需要改为evaluate
。
查:
(1)label —— 给字段取别名:
# select id, name from classes
result = session.query(Classes.id, Classes.name).all()
for item in result:
# print(item[0], item[1])
print(item.id, item.name)
# select id, name as xx from classes
result_2 = session.query(Classes.id, Classes.name.label('xx')).all()
for item in result_2:
print(item.id, item.xx)
(2)text + params —— 构造复杂查询:
res = session.query(Student).filter(text("id<:value and name=:name")).params(value=22, name='xiaoming').order_by(Student.id).all()
(3)from_statement —— 构造SQL语句:
res = session.query(Student).from_statement(text("SELECT * FROM student where name=:name")).params(name='xiaowang').all()
# SELECT * FROM student where name="xiaowang"
(4)查询打印所有学生信息(包含对应的班级名称):
# 方法一:
# isouter=True 代表左查询
objs = session.query(Student.id, Student.name, Classes.name).join(Classes, isouter=True).all()
print(objs)
# 方法二:
objs = session.query(Student).all()
for item in objs:
print(item.id, item.username, item.class_id, item.cls.name)
方法二需要在Student类中加下述代码:
因为加了上述代码,所以现在Classes类中也多了一个字段stus。
查询课程名为:大学体育的所有学生的信息。
obj = session.query(Classes).filter(Classes.name == '大学体育').first()
student_list = obj.stus
for item in student_list:
print(item.id, item.username)
(3)常用查询:
常用的【条件】查询:
res = session.query(Classes).filter(Classes.id.between(1, 3), Classes.name == '大英').all()
res = session.query(Classes).filter(Classes.id.in_([1, 2, 3])).all()
res = session.query(Classes).filter(~Classes.id.in_([1, 2, 3])).all()
res = session.query(Classes).filter(Classes.id.in_(session.query(Classes.id).filter_by(name='大英'))).all()
from sqlalchemy import and_, or_
res = session.query(Classes).filter(and_(Classes.id > 3, Classes.name == '大英')).all()
res = session.query(Classes).filter(or_(Classes.id < 2, Classes.name == '物理')).all()
res = session.query(Classes).filter(
or_(
Classes.id < 2,
and_(Classes.name == '化学', Classes.id > 3),
Classes.name != ""
)
).all()
通配符:
res = session.query(Classes).filter(Classes.name.like("大%")).all()
res = session.query(Classes).filter(~Classes.name.like("大%")).all()
限制:
res = session.query(Classes)[1: 2]
排序:
res = session.query(Classes).order_by(Classes.name.desc()).all()
res = session.query(Classes).order_by(Classes.name.desc(), Classes.id.asc()).all() # 写多个的话,优先按最左边的排序,相同的话按第二个排序,以次类推
分组:
from sqlalchemy.sql import func
res = session.query(Student).group_by(Student.class_id).all()
res = session.query(
func.max(Student.id),
func.sum(Student.id),
func.min(Student.id)
).group_by(Student.name).all()
res = session.query(
func.max(Student.id),
func.sum(Student.id),
func.min(Student.id)
).group_by(Student.name).having(func.min(Student.id) > 2).all()
连表:
res = session.query(Student, Hobby).filter(Student.id == Hobby.id).all()
res = session.query(Student).join(Hobby).all() # inner join
res = session.query(Student).join(Hobby, isouter=True).all() # left join
组合:
# objs = session.query(A).join(B, and_(A.xid == B.id, A.id > 2), isouter=True).all()
# select * from a left join b on a.xid = b.id and id > 2
q1 = session.query(Student.name).filter(Student.id > 2)
q2 = session.query(Hobby.caption).filter(Hobby.id < 2)
res = q1.union(q2).all() # 有重复只留一个
q1 = session.query(Student.name).filter(Student.id > 2)
q2 = session.query(Hobby.caption).filter(Hobby.id < 2)
res = q1.union_all(q2).all() # 都留
3. 补充——神奇的子查询写法:
普通的子查询:
select * from A where id in (select a_id from B)
select * from A where id in [1, 2, 3]
神奇的子查询:
名为classes的表:
名为hobby的表:
select id, name from classes;
select id, name, 666 from classes;
(这样会给查询结果新增一列,值都为666)
# 神奇的子查询:
select
id,
name,
666,
(select id from hobby where hobby.id = classes.id) as b
from classes;
SQLAlchemy也支持上述“神奇的子查询”:
sub_query = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
result = session.query(Group.name, sub_query)
"""
SELECT group.name AS group_name,
(SELECT count(server.id) AS sid
FROM server
WHERE server.id = group.id)
AS anon_1
FROM group
"""
简单讲一下:
目的是检索每个Group
的name
列以及与每个Group
相关的Server
表的id
列的数量。
func.count(Server.id).label("sid")
: 这一部分是一个聚合函数,计算Server
表中id
列的数量,并使用label
方法给这个计数起了一个别名sid
。filter(Server.id == Group.id)
: 这是一个过滤条件,指定了在Server
表和Group
表之间使用id
列进行匹配。correlate(Group)
: 这将子查询与外部查询关联起来,确保子查询中的Server.id
和外部查询中的Group.id
是相关的。correlate(Group)
: 在SQLAlchemy中,correlate
方法用于将子查询与外部查询关联起来,以确保子查询中的条件引用了外部查询的表。在这个例子中,correlate(Group)
确保子查询中的Server.id
引用的是外部查询的Group.id
,这样就建立了关联。这是因为Server.id == Group.id
条件需要在子查询中引用外部查询中的Group
表,而不是创建一个独立的没有关联的子查询。as_scalar()
: 这将子查询转化为标量值,以便在主查询中使用。as_scalar()
: 在SQLAlchemy中,as_scalar()
方法用于将子查询转化为标量值,使其可以在主查询中作为一个单一的值使用。标量值意味着子查询返回的结果是一个单一的数值,而不是一个结果集。在这个例子中,通过调用as_scalar()
,子查询func.count(Server.id).label("sid")
被转换为一个标量值,因此可以在主查询中像普通的列一样使用。sub_query = ...
: 这一行创建了一个子查询,其中计算了与每个Group
相关的Server
表中id
列的数量,并将其命名为sid
。result = ...
: 这一行使用主查询,选择了Group
表中的name
列和上述子查询sub_query
。
最终,通过这个查询,将获得一个结果集,其中包含每个Group
的name
列和与每个Group
相关的Server
表的id
列的数量。
4. 两种创建session的方式:
(1)多线程操作时,为每个线程都创建一个session:
import models
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from threading import Thread
engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/gset?charset=gbk",
# echo=True,
pool_size=2,
max_overflow=0
)
XXXX = sessionmaker(bind=engine)
def task():
session = XXXX()
data = session.query(models.Classes).all()
print(data)
session.close()
for i in range(10):
t = Thread(target=task)
t.start()
这种方法需要注意的是:一旦开线程操作的话,一定要保证在线程里面创建session,如上。
如果在线程外面创建session就会报错!
(2)使用scoped_session创建session【推荐】:
可以猜到——这玩意底层是使用的threading.local()
对象来实现的为每个线程创建及关闭session~
import models
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import create_engine
from threading import Thread
engine = create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/gset?charset=gbk",
# echo=True,
pool_size=2,
max_overflow=5
)
XXXX = sessionmaker(bind=engine)
session = scoped_session(XXXX)
def task():
# session = XXXX()
data = session.query(models.Classes).all()
print(data)
session.remove() # scoped_session创建的session要用remove关闭
for i in range(10):
t = Thread(target=task)
t.start()
简单扣一波源码:
先看session = scoped_session(XXXX)
做了什么?
进scoped_session()
:
进ThreadLocalRegistry()
:
回一层:
再会回一层::
继续看执行data = session.query(models.Classes).all()
做了什么?
首先要找到query()方法在哪?
但是scoped_session
类里面没有!而且这个类没有父类!
那么这个方法在哪呢????
切记:类的方法还可以通过反射
setattr
来动态设置!!!class Demo(object): pass for i in ['k1', 'k2']: setattr(Demo, i, lambda self: 1) obj = Demo() v = obj.k1() print(v) # 输出:1
所以继续看scoped_session类这个文件的下面:
需要注意的是:上图中Session类就是第一种方法里创建的session对象所用的类,可以通过打印type证实!
所以上图就是将原本Session里的public_methods
里的所有方法加到scoped_session
类上!
比如add,add_all,query方法,加完之后就是这个样子:
所以当执行data = session.query(models.Classes).all()
的时候就会触发ThreadLocalRegistry
对象的call
方法:
最开始self.registry
里还没有值,所以会执行self.createfunc()
,就是执行XXX()
【第一种方法里的那个sessioin】并放到threading.local()
里,这样就为对应线程创建了session!
然后就会触发getattr
调用原来session对象的query方法~
继续看session.remove()
:
拓展——flask-sqlalchemy中SQLAlchemy().session
默认也是使用的上述第二种方式(scoped_session)
🌟 解决问题,拓展人脉,共同成长!(非诚勿扰)