文章目录
sqlalchemy
视频学习:哔哩哔哩
数据库连接
import sqlalchemy
engine = sqlalchemy.create_engine('mysql://root:test@localhost/testdb', echo=True)
meta_data = sqlalchemy.MetaData()
person_table = sqlalchemy.Table(
"person", meta_data,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("name", sqlalchemy.String(128), unique=True, nullable=False),
sqlalchemy.Column("birthday", sqlalchemy.Date, nullable=False),
)
meta_data.create_all(engine)
数据查询
查询所有记录(单表查询)
查询所有记录,使用循环提取,适合结果集很大的场景,使用提取函数fetchall
和fetchone
,返回一个列表,结果集放在内存中
from db_init import engine, person_table
with engine.connect() as conn:
query = person_table.select()
result_set = conn.execute(query)
# 循环提取,适合结果集很大的场景
for row in result_set:
print(row[0])
print(row.name)
# 使用提取函数,返回一个列表,结果集放在内存中
result = result_set.fetchall()
print(result)
# 使用提取函数
row = result_set.fetchone()
print(row)
按条件查询(单表查询)
示例:查询人物表中生日大于2000-10-13的人数并且ID小于6,并返回为列表。
from db_init import engine, person_table
from sqlalchemy.sql import and_, or_
with engine.connect() as conn:
#示例1
# query = person_table.select().where(person_table.c.birthday > '2000-10-13').where(person_table.c.id < 6)
#示例2
query = person_table.select().where(
or_(
person_table.c.name == 'Tom',
and_(
person_table.c.birthday > '2000-10-13',
person_table.c.id < 7
)
)
)
result_set = conn.execute(query)
result = result_set.fetchall()
print(result)
增加
示例如下,增加单个记录数或多个记录数。
import sqlalchemy
engine = sqlalchemy.create_engine('mysql://root:test@localhost/testdb', echo=True)
meta_data = sqlalchemy.MetaData()
person = sqlalchemy.Table(
"person", meta_data,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("name", sqlalchemy.String(128), unique=True, nullable=False),
sqlalchemy.Column("birthday", sqlalchemy.Date, nullable=False),
)
meta_data.create_all(engine)
# insert a record
# person_insert = person.insert()
# print(person_insert)
# insert_tom = person_insert.values(name="TomTom", birthday="2000-10-11")
#
# with engine.connect() as conn:
# result = conn.execute(insert_tom)
# print(result.inserted_primary_key)
# conn.commit()
# insert multiple records
person_insert = person.insert()
with engine.connect() as conn:
conn.execute(person_insert, [
{"name": "Jack", "birthday": "2000-10-13"},
{"name": "Mary", "birthday": "2000-10-14"},
{"name": "Smith", "birthday": "2000-10-15"},
])
conn.commit()
修改
SQL语句:
UPDATE persion SET name=='Maria' WHERE id=5
from db_init import engine, person_table
with engine.connect() as conn:
update_query = person_table.update().values(address="Shanghai").where(person_table.c.id == 6)
conn.execute(update_query)
conn.commit()
删除
DELETE FROM persion WHERE id=5
from db_init import engine, person_table
with engine.connect() as conn:
delete_query = person_table.delete().where(person_table.c.id == 7)
conn.execute(delete_query)
conn.commit()
关联表定义
一对多关联表
通过ForeignKey
关联部门表的ID
import sqlalchemy
engine = sqlalchemy.create_engine('mysql://root:test@localhost/testdb', echo=True)
meta_data = sqlalchemy.MetaData()
department = sqlalchemy.Table(
"department", meta_data,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("name", sqlalchemy.String(255), nullable=False, unique=True),
)
employee = sqlalchemy.Table(
"employee", meta_data,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("department_id", sqlalchemy.Integer, sqlalchemy.ForeignKey("department.id"), nullable=False),
sqlalchemy.Column("name", sqlalchemy.String(255), nullable=False),
)
meta_data.create_all(engine)
关联表查询
查询hr部门的所有员工及部门信息
from db_init import engine, department, employee
import sqlalchemy
with engine.connect() as conn:
join = employee.join(department, department.c.id == employee.c.department_id)
# query = sqlalchemy.select(join).where(department.c.name == 'hr')
# query = sqlalchemy.select(employee).select_from(join).where(department.c.name == 'hr')
query = sqlalchemy.select(department).select_from(join).where(employee.c.name == 'Rose')
print(conn.execute(query).fetchall())
query = sqlalchemy.select(employee).select_from(join).where(department.c.name == 'hr')
执行结果,只有员工信息,没有部门信息。
query = sqlalchemy.select(department).select_from(join).where(employee.c.name == 'Rose')
执行结果,只有部门信息,没有员工信息。
映射类定义与添加记录
from sqlalchemy import create_engine, Column, Integer, String, Date
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
engine = create_engine('mysql://root:test@localhost/testdb', echo=True)
Base = declarative_base()
class Person(Base):
__tablename__ = "person"
id = Column(Integer, primary_key=True)
name = Column(String(128), unique=True, nullable=False)
birthday = Column(Date, nullable=False)
address = Column(String(255), nullable=True)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
from db_init import Session, Person
session = Session()
# p = Person(name="Amy", birthday="2000-9-18", address="unknown")
# session.add(p)
person_list = [
Person(name="Eric", birthday="1998-2-18", address="unknown"),
Person(name="Samuel", birthday="1997-1-15", address="unknown")
]
session.add_all(person_list)
session.commit()
映射类查询和修改
from db_init import Session, Person
session = Session()
# result = session.query(Person).all()
result = session.query(Person).filter(Person.address == 'aaa')
for person in result:
print(f'name: {person.name}, birthday: {person.birthday}')
# 拿出结果集的第一条记录,查询结果有多条记录
# person = session.query(Person).filter(Person.address == 'aaa').first()
# person = session.query(Person).filter(Person.id == 100).first()
# one的用法,要求查出来的结果有且只有一个,没有不行,多了也不行
# person = session.query(Person).filter(Person.id == 1).one()
# scalar底层是调用one()
person = session.query(Person).filter(Person.id == 1).scalar()
if person:
print(f'name: {person.name}, birthday: {person.birthday}')
session.query(Person).filter(Person.address == 'aaa').first()
执行结果为:
修改单条记录:
from db_init import Session, Person
session = Session()
# person = session.query(Person).filter(Person.id == 1).one()
# person.address = 'wwww'
# session.query(Person).filter(Person.id == 1).update({
# Person.address: 'PPPP'
# })
session.query(Person).filter(Person.id > 14).update({
Person.address: 'Beijing'
})
session.commit()
Python 的对象关系映射工具(ORM),简化数据库交互与查询
import datetime
from sqlalchemy import create_engine, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Mapped, mapped_column
from sqlalchemy.sql import func
from typing_extensions import Annotated
engine = create_engine('mysql://root:test@localhost/testdb', echo=True)
Base = declarative_base()
int_pk = Annotated[int, mapped_column(primary_key=True)]
required_unique_name = Annotated[str, mapped_column(String(128), unique=True, nullable=False)]
timestamp_default_now = Annotated[datetime.datetime, mapped_column(nullable=False, server_default=func.now())]
class Customer(Base):
__tablename__ = "customers"
id: Mapped[int_pk]
name: Mapped[required_unique_name]
birthday: Mapped[datetime.datetime]
city: Mapped[str] = mapped_column(String(128), nullable=True)
create_time: Mapped[timestamp_default_now]
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
一对多ORM
import datetime
from sqlalchemy import create_engine, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Mapped, mapped_column, relationship
from typing_extensions import Annotated
from typing import List
engine = create_engine('mysql://root:test@localhost/testdb', echo=True)
Base = declarative_base()
int_pk = Annotated[int, mapped_column(primary_key=True)]
required_unique_name = Annotated[str, mapped_column(String(128), unique=True, nullable=False)]
timestamp_not_null = Annotated[datetime.datetime, mapped_column(nullable=False)]
class Department(Base):
__tablename__ = "department"
id: Mapped[int_pk]
name: Mapped[required_unique_name]
employees: Mapped[List["Employee"]] = relationship(back_populates="department")
def __repr__(self):
return f'id: {self.id}, name: {self.name}'
class Employee(Base):
__tablename__ = "employee"
id: Mapped[int_pk]
dep_id: Mapped[int] = mapped_column(ForeignKey("department.id"))
name: Mapped[required_unique_name]
birthday: Mapped[timestamp_not_null]
department: Mapped[Department] = relationship(lazy=False, back_populates="employees")
def __repr__(self):
return f'id: {self.id}, dep_id: {self.dep_id}, name: {self.name}, birthday: {self.birthday}'
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
from db_init import Session, Department, Employee
def insert_records(session):
d1 = Department(name="hr")
e1 = Employee(department=d1, name="Jack", birthday="2000-10-1")
session.add(e1)
session.commit()
def select_employee(session):
emp = session.query(Employee).filter(Employee.id == 1).one()
print(emp)
print(emp.department)
def select_department(session):
dep = session.query(Department).filter(Department.id == 1).one()
print(dep)
print(dep.employees)
session = Session()
# insert_records(session)
# select_employee(session)
select_department(session)
多对多ORM
import datetime
from sqlalchemy import create_engine, String, ForeignKey, Table, Column
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Mapped, mapped_column, relationship
from typing_extensions import Annotated
from typing import List, Set
engine = create_engine('mysql://root:test@localhost/testdb', echo=True)
Base = declarative_base()
int_pk = Annotated[int, mapped_column(primary_key=True)]
required_unique_name = Annotated[str, mapped_column(String(128), unique=True, nullable=False)]
required_string = Annotated[str, mapped_column(String(128), nullable=False)]
association_table = Table(
"user_role",
Base.metadata,
Column("user_id", ForeignKey("users.id"), primary_key=True),
Column("role_id", ForeignKey("roles.id"), primary_key=True)
)
class User(Base):
__tablename__ = "users"
id: Mapped[int_pk]
name: Mapped[required_unique_name]
password: Mapped[required_string]
roles: Mapped[List["Role"]] = relationship(secondary=association_table, lazy=False, back_populates="users")
def __repr__(self):
return f'id: {self.id}, name: {self.name}'
class Role(Base):
__tablename__ = "roles"
id: Mapped[int_pk]
name: Mapped[required_unique_name]
users: Mapped[List["User"]] = relationship(secondary=association_table, lazy=True, back_populates="roles")
def __repr__(self):
return f'id: {self.id}, name: {self.name}'
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
from db_init import Session, User, Role
def insert_records(s):
role1 = Role(name="Admin")
role2 = Role(name="Operator")
user1 = User(name="Jack", password="111")
user2 = User(name="Tom", password="222")
user3 = User(name="Mary", password="333")
user1.roles.append(role1)
user1.roles.append(role2)
user2.roles.append(role1)
user3.roles.append(role2)
s.add_all([user1, user2, user3])
s.commit()
def select_user(s):
u = s.query(User).filter(User.id == 1).one()
print(u)
print(u.roles)
def select_role(s):
r = s.query(Role).filter(Role.id == 2).one()
print(r)
print(r.users)
session = Session()
# insert_records(session)
# select_user(session)
select_role(session)
一对一ORM
import datetime
from sqlalchemy import create_engine, String, ForeignKey, Table, Column
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Mapped, mapped_column, relationship
from typing_extensions import Annotated
from typing import List, Set
engine = create_engine('mysql://root:test@localhost/testdb', echo=True)
Base = declarative_base()
int_pk = Annotated[int, mapped_column(primary_key=True)]
required_unique_string = Annotated[str, mapped_column(String(128), unique=True, nullable=False)]
required_string = Annotated[str, mapped_column(String(128), nullable=False)]
class Employee(Base):
__tablename__ = "employee"
id: Mapped[int_pk]
name: Mapped[required_unique_string]
computer_id: Mapped[int] = mapped_column(ForeignKey("computer.id"), nullable=True)
computer = relationship("Computer", lazy=False, back_populates="employee")
def __repr__(self):
return f'id: {self.id}, name: {self.name}'
class Computer(Base):
__tablename__ = "computer"
id: Mapped[int_pk]
model: Mapped[required_string]
number: Mapped[required_unique_string]
employee = relationship("Employee", lazy=True, back_populates="computer")
def __repr__(self):
return f'id: {self.id}, model: {self.model}, number: {self.number}'
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
from db_init import Session, Employee, Computer
def insert(s):
c1 = Computer(model="Dell", number="1111")
c2 = Computer(model="Surface", number="2222")
c3 = Computer(model="MacBook Pro", number="3333")
e1 = Employee(name="Jack", computer=c1)
e2 = Employee(name="Mary", computer=c2)
e3 = Employee(name="Tome", computer=c3)
s.add_all([e1, e2, e3])
s.commit()
def select(s):
e = s.query(Employee).filter(Employee.id == 1).scalar()
if e:
print(e)
print(e.computer)
c = s.query(Computer).filter(Computer.id == 2).scalar()
if c:
print(c)
print(c.employee)
def update_1(s):
s.query(Employee).filter(Employee.id == 3).update({Employee.computer_id: None})
s.commit()
def update_2(s):
c = s.query(Computer).filter(Computer.id == 3).scalar()
e = s.query(Employee).filter(Employee.id == 3).scalar()
if c and e:
e.computer = c
s.commit()
session = Session()
# insert(session)
# select(session)
# update_1(session)
update_2(session)
ORM查询
一对多关系
import datetime
from sqlalchemy import create_engine, String, ForeignKey
from sqlalchemy.orm import DeclarativeBase, sessionmaker, Mapped, mapped_column, relationship
from typing_extensions import Annotated
from typing import List
class Base(DeclarativeBase):
pass
engine = create_engine('mysql://root:test@localhost/testdb', echo=True)
int_pk = Annotated[int, mapped_column(primary_key=True)]
required_unique_string = Annotated[str, mapped_column(String(128), unique=True, nullable=False)]
required_string = Annotated[str, mapped_column(String(128), nullable=False)]
timestamp_not_null = Annotated[datetime.datetime, mapped_column(nullable=False)]
class Department(Base):
__tablename__ = "department"
id: Mapped[int_pk]
name: Mapped[required_unique_string]
employees: Mapped[List["Employee"]] = relationship(back_populates="department")
def __repr__(self):
return f'id: {self.id}, name: {self.name}'
class Employee(Base):
__tablename__ = "employee"
id: Mapped[int_pk]
dep_id: Mapped[int] = mapped_column(ForeignKey("department.id"))
name: Mapped[required_unique_string]
birthday: Mapped[timestamp_not_null]
department: Mapped[Department] = relationship(back_populates="employees")
def __repr__(self):
return f'id: {self.id}, dep_id: {self.dep_id}, name: {self.name}, birthday: {self.birthday}'
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
from sqlalchemy import select, insert, update, bindparam, delete
from sqlalchemy.orm import outerjoin, aliased
from db_init import Session, Department, Employee
def execute_query(query):
result = session.execute(query)
for row in result:
print(row)
def select_single_target():
query = select(Department).order_by(Department.name)
execute_query(query)
def select_multiple():
query = select(Employee, Department).join(Employee.department, isouter=True)
execute_query(query)
def select_with_alias():
emp_cls = aliased(Employee, name="emp")
dep_cls = aliased(Department, name="dep")
query = select(emp_cls, dep_cls).join(emp_cls.department.of_type(dep_cls))
execute_query(query)
def select_fields():
query = select(Employee.name.label('emp_name'), Department.name.label('dep_name')).join_from(Employee, Department)
execute_query(query)
def select_fields_outer():
query = select(Employee.name.label('emp_name'), Department.name.label('dep_name'))\
.select_from(outerjoin(Employee, Department))
execute_query(query)
def where_obj():
dep = session.get(Department, 1)
# query = select(Employee).where(Employee.department == dep)
query = select(Employee).where(Employee.dep_id != dep.id)
execute_query(query)
def select_contains():
emp = session.get(Employee, 1)
query = select(Department).where(Department.employees.contains(emp))
execute_query(query)
session = Session()
# select_single_target()
# select_multiple()
# select_with_alias()
# select_fields()
# select_fields_outer()
# where_obj()
select_contains()
基于ORM的批量增加和修改
from sqlalchemy import select, insert, update, bindparam, delete
from sqlalchemy.orm import Session
from db_init import engine, Department, Employee
def batch_insert():
with Session(engine) as session:
session.execute(
insert(Department).values(
[
{"name": "QA"},
{"name": "Sales"}
]
)
)
session.commit()
def batch_orm_insert():
with Session(engine) as session:
session.execute(
insert(Employee).values(
[
{
"dep_id": select(Department.id).where(Department.name == 'hr'),
"name": "wwww",
"birthday": "2000-1-19"
},
{
"dep_id": select(Department.id).where(Department.name == 'market'),
"name": "YYY",
"birthday": "2000-2-19"
}
]
)
)
session.commit()
def batch_update():
with Session(engine) as session:
session.execute(
update(Employee),
[
{"id": 2, "birthday": "1999-2-9"},
{"id": 5, "name": "Samuel"}
]
)
session.commit()
def batch_delete():
with Session(engine) as session:
session.execute(
delete(Employee).where(Employee.name.in_(['wwww', 'YYY']))
)
session.commit()
batch_delete()
Session和事务控制
from sqlalchemy import select, insert, update, delete
from sqlalchemy.orm import Session
from db_init import engine, engine2, Department, Employee, User
with Session(engine) as session1, session1.begin(), Session(engine2) as session2, session2.begin():
dep = Department(name="pop2")
session1.add(dep)
user = User(name="hhh2")
session2.add(user)