sqlalchemy

发布于:2025-09-09 ⋅ 阅读:(16) ⋅ 点赞:(0)

sqlalchemy

视频学习:哔哩哔哩

数据库连接

import sqlalchemy

engine = sqlalchemy.create_engine('mysql://root:test@localhost/testdb', echo=True)

meta_data = sqlalchemy.MetaData()

person_table = sqlalchemy.Table(
    "person", meta_data,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("name", sqlalchemy.String(128), unique=True, nullable=False),
    sqlalchemy.Column("birthday", sqlalchemy.Date, nullable=False),
)

meta_data.create_all(engine)

数据查询

查询所有记录(单表查询)

查询所有记录,使用循环提取,适合结果集很大的场景,使用提取函数fetchallfetchone,返回一个列表,结果集放在内存中

from db_init import engine, person_table

with engine.connect() as conn:
    query = person_table.select()
    result_set = conn.execute(query)
	
	# 循环提取,适合结果集很大的场景
    for row in result_set:
        print(row[0])
        print(row.name)
	
	# 使用提取函数,返回一个列表,结果集放在内存中
   	result = result_set.fetchall()
   	print(result)
	
	# 使用提取函数
    row = result_set.fetchone()
    print(row)

按条件查询(单表查询)

示例:查询人物表中生日大于2000-10-13的人数并且ID小于6,并返回为列表。

from db_init import engine, person_table
from sqlalchemy.sql import and_, or_

with engine.connect() as conn:
	#示例1
    # query = person_table.select().where(person_table.c.birthday > '2000-10-13').where(person_table.c.id < 6)
    #示例2
    query = person_table.select().where(
        or_(
            person_table.c.name == 'Tom',
            and_(
                person_table.c.birthday > '2000-10-13',
                person_table.c.id < 7
            )
        )
    )
    result_set = conn.execute(query)

    result = result_set.fetchall()
    print(result)

在这里插入图片描述

增加

示例如下,增加单个记录数或多个记录数。

import sqlalchemy

engine = sqlalchemy.create_engine('mysql://root:test@localhost/testdb', echo=True)

meta_data = sqlalchemy.MetaData()

person = sqlalchemy.Table(
    "person", meta_data,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("name", sqlalchemy.String(128), unique=True, nullable=False),
    sqlalchemy.Column("birthday", sqlalchemy.Date, nullable=False),
)

meta_data.create_all(engine)

# insert a record
# person_insert = person.insert()
# print(person_insert)
# insert_tom = person_insert.values(name="TomTom", birthday="2000-10-11")
#
# with engine.connect() as conn:
#     result = conn.execute(insert_tom)
#     print(result.inserted_primary_key)
#     conn.commit()

# insert multiple records
person_insert = person.insert()
with engine.connect() as conn:
    conn.execute(person_insert, [
        {"name": "Jack", "birthday": "2000-10-13"},
        {"name": "Mary", "birthday": "2000-10-14"},
        {"name": "Smith", "birthday": "2000-10-15"},
    ])
    conn.commit()

修改

SQL语句:
UPDATE persion SET name=='Maria' WHERE id=5

from db_init import engine, person_table

with engine.connect() as conn:
    update_query = person_table.update().values(address="Shanghai").where(person_table.c.id == 6)
    conn.execute(update_query)
    conn.commit()

删除

DELETE FROM persion WHERE id=5

from db_init import engine, person_table

with engine.connect() as conn:
    delete_query = person_table.delete().where(person_table.c.id == 7)
    conn.execute(delete_query)
    conn.commit()

关联表定义

一对多关联表

在这里插入图片描述

通过ForeignKey关联部门表的ID

import sqlalchemy

engine = sqlalchemy.create_engine('mysql://root:test@localhost/testdb', echo=True)

meta_data = sqlalchemy.MetaData()

department = sqlalchemy.Table(
    "department", meta_data,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("name", sqlalchemy.String(255), nullable=False, unique=True),
)

employee = sqlalchemy.Table(
    "employee", meta_data,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("department_id", sqlalchemy.Integer, sqlalchemy.ForeignKey("department.id"), nullable=False),
    sqlalchemy.Column("name", sqlalchemy.String(255), nullable=False),
)

meta_data.create_all(engine)

关联表查询

查询hr部门的所有员工及部门信息

from db_init import engine, department, employee
import sqlalchemy

with engine.connect() as conn:
    join = employee.join(department, department.c.id == employee.c.department_id)
    # query = sqlalchemy.select(join).where(department.c.name == 'hr')
    # query = sqlalchemy.select(employee).select_from(join).where(department.c.name == 'hr')
    query = sqlalchemy.select(department).select_from(join).where(employee.c.name == 'Rose')

    print(conn.execute(query).fetchall())

query = sqlalchemy.select(employee).select_from(join).where(department.c.name == 'hr') 执行结果,只有员工信息,没有部门信息。
在这里插入图片描述
query = sqlalchemy.select(department).select_from(join).where(employee.c.name == 'Rose') 执行结果,只有部门信息,没有员工信息。
在这里插入图片描述

映射类定义与添加记录

from sqlalchemy import create_engine, Column, Integer, String, Date
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker


engine = create_engine('mysql://root:test@localhost/testdb', echo=True)
Base = declarative_base()


class Person(Base):
    __tablename__ = "person"

    id = Column(Integer, primary_key=True)
    name = Column(String(128), unique=True, nullable=False)
    birthday = Column(Date, nullable=False)
    address = Column(String(255), nullable=True)


Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
from db_init import Session, Person


session = Session()

# p = Person(name="Amy", birthday="2000-9-18", address="unknown")
# session.add(p)

person_list = [
    Person(name="Eric", birthday="1998-2-18", address="unknown"),
    Person(name="Samuel", birthday="1997-1-15", address="unknown")
]
session.add_all(person_list)

session.commit()

映射类查询和修改

from db_init import Session, Person


session = Session()

# result = session.query(Person).all()
result = session.query(Person).filter(Person.address == 'aaa')

for person in result:
    print(f'name: {person.name}, birthday: {person.birthday}')

# 拿出结果集的第一条记录,查询结果有多条记录
# person = session.query(Person).filter(Person.address == 'aaa').first()
# person = session.query(Person).filter(Person.id == 100).first()

# one的用法,要求查出来的结果有且只有一个,没有不行,多了也不行
# person = session.query(Person).filter(Person.id == 1).one()

# scalar底层是调用one()
person = session.query(Person).filter(Person.id == 1).scalar()

if person:
    print(f'name: {person.name}, birthday: {person.birthday}')

session.query(Person).filter(Person.address == 'aaa').first() 执行结果为:
在这里插入图片描述
修改单条记录:

from db_init import Session, Person


session = Session()

# person = session.query(Person).filter(Person.id == 1).one()
# person.address = 'wwww'

# session.query(Person).filter(Person.id == 1).update({
#     Person.address: 'PPPP'
# })

session.query(Person).filter(Person.id > 14).update({
    Person.address: 'Beijing'
})


session.commit()

Python 的对象关系映射工具(ORM),简化数据库交互与查询

import datetime

from sqlalchemy import create_engine, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Mapped, mapped_column
from sqlalchemy.sql import func
from typing_extensions import Annotated


engine = create_engine('mysql://root:test@localhost/testdb', echo=True)
Base = declarative_base()


int_pk = Annotated[int, mapped_column(primary_key=True)]
required_unique_name = Annotated[str, mapped_column(String(128), unique=True, nullable=False)]
timestamp_default_now = Annotated[datetime.datetime, mapped_column(nullable=False, server_default=func.now())]


class Customer(Base):
    __tablename__ = "customers"

    id: Mapped[int_pk]
    name: Mapped[required_unique_name]
    birthday: Mapped[datetime.datetime]
    city: Mapped[str] = mapped_column(String(128), nullable=True)
    create_time: Mapped[timestamp_default_now]


Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

一对多ORM

import datetime

from sqlalchemy import create_engine, String, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Mapped, mapped_column, relationship
from typing_extensions import Annotated
from typing import List


engine = create_engine('mysql://root:test@localhost/testdb', echo=True)
Base = declarative_base()


int_pk = Annotated[int, mapped_column(primary_key=True)]
required_unique_name = Annotated[str, mapped_column(String(128), unique=True, nullable=False)]
timestamp_not_null = Annotated[datetime.datetime, mapped_column(nullable=False)]


class Department(Base):
    __tablename__ = "department"

    id: Mapped[int_pk]
    name: Mapped[required_unique_name]

    employees: Mapped[List["Employee"]] = relationship(back_populates="department")

    def __repr__(self):
        return f'id: {self.id}, name: {self.name}'


class Employee(Base):
    __tablename__ = "employee"

    id: Mapped[int_pk]
    dep_id: Mapped[int] = mapped_column(ForeignKey("department.id"))
    name: Mapped[required_unique_name]
    birthday: Mapped[timestamp_not_null]

    department: Mapped[Department] = relationship(lazy=False, back_populates="employees")

    def __repr__(self):
        return f'id: {self.id}, dep_id: {self.dep_id}, name: {self.name}, birthday: {self.birthday}'


Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
from db_init import Session, Department, Employee


def insert_records(session):
    d1 = Department(name="hr")
    e1 = Employee(department=d1, name="Jack", birthday="2000-10-1")
    session.add(e1)

    session.commit()


def select_employee(session):
    emp = session.query(Employee).filter(Employee.id == 1).one()

    print(emp)
    print(emp.department)


def select_department(session):
    dep = session.query(Department).filter(Department.id == 1).one()

    print(dep)
    print(dep.employees)


session = Session()

# insert_records(session)
# select_employee(session)
select_department(session)

多对多ORM

在这里插入图片描述

import datetime

from sqlalchemy import create_engine, String, ForeignKey, Table, Column
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Mapped, mapped_column, relationship
from typing_extensions import Annotated
from typing import List, Set


engine = create_engine('mysql://root:test@localhost/testdb', echo=True)
Base = declarative_base()


int_pk = Annotated[int, mapped_column(primary_key=True)]
required_unique_name = Annotated[str, mapped_column(String(128), unique=True, nullable=False)]
required_string = Annotated[str, mapped_column(String(128), nullable=False)]


association_table = Table(
    "user_role",
    Base.metadata,
    Column("user_id", ForeignKey("users.id"), primary_key=True),
    Column("role_id", ForeignKey("roles.id"), primary_key=True)
)


class User(Base):
    __tablename__ = "users"

    id: Mapped[int_pk]
    name: Mapped[required_unique_name]
    password: Mapped[required_string]

    roles: Mapped[List["Role"]] = relationship(secondary=association_table, lazy=False, back_populates="users")

    def __repr__(self):
        return f'id: {self.id}, name: {self.name}'


class Role(Base):
    __tablename__ = "roles"

    id: Mapped[int_pk]
    name: Mapped[required_unique_name]

    users: Mapped[List["User"]] = relationship(secondary=association_table, lazy=True, back_populates="roles")

    def __repr__(self):
        return f'id: {self.id}, name: {self.name}'


Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
from db_init import Session, User, Role


def insert_records(s):
    role1 = Role(name="Admin")
    role2 = Role(name="Operator")

    user1 = User(name="Jack", password="111")
    user2 = User(name="Tom", password="222")
    user3 = User(name="Mary", password="333")

    user1.roles.append(role1)
    user1.roles.append(role2)

    user2.roles.append(role1)
    user3.roles.append(role2)

    s.add_all([user1, user2, user3])

    s.commit()


def select_user(s):
    u = s.query(User).filter(User.id == 1).one()
    print(u)
    print(u.roles)


def select_role(s):
    r = s.query(Role).filter(Role.id == 2).one()
    print(r)
    print(r.users)


session = Session()
# insert_records(session)
# select_user(session)
select_role(session)

一对一ORM

在这里插入图片描述

import datetime

from sqlalchemy import create_engine, String, ForeignKey, Table, Column
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Mapped, mapped_column, relationship
from typing_extensions import Annotated
from typing import List, Set


engine = create_engine('mysql://root:test@localhost/testdb', echo=True)
Base = declarative_base()


int_pk = Annotated[int, mapped_column(primary_key=True)]
required_unique_string = Annotated[str, mapped_column(String(128), unique=True, nullable=False)]
required_string = Annotated[str, mapped_column(String(128), nullable=False)]


class Employee(Base):
    __tablename__ = "employee"

    id: Mapped[int_pk]
    name: Mapped[required_unique_string]
    computer_id: Mapped[int] = mapped_column(ForeignKey("computer.id"), nullable=True)

    computer = relationship("Computer", lazy=False, back_populates="employee")

    def __repr__(self):
        return f'id: {self.id}, name: {self.name}'


class Computer(Base):
    __tablename__ = "computer"

    id: Mapped[int_pk]
    model: Mapped[required_string]
    number: Mapped[required_unique_string]

    employee = relationship("Employee", lazy=True, back_populates="computer")

    def __repr__(self):
        return f'id: {self.id}, model: {self.model}, number: {self.number}'


Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
from db_init import Session, Employee, Computer


def insert(s):
    c1 = Computer(model="Dell", number="1111")
    c2 = Computer(model="Surface", number="2222")
    c3 = Computer(model="MacBook Pro", number="3333")

    e1 = Employee(name="Jack", computer=c1)
    e2 = Employee(name="Mary", computer=c2)
    e3 = Employee(name="Tome", computer=c3)

    s.add_all([e1, e2, e3])

    s.commit()


def select(s):
    e = s.query(Employee).filter(Employee.id == 1).scalar()
    if e:
        print(e)
        print(e.computer)

    c = s.query(Computer).filter(Computer.id == 2).scalar()
    if c:
        print(c)
        print(c.employee)


def update_1(s):
    s.query(Employee).filter(Employee.id == 3).update({Employee.computer_id: None})
    s.commit()


def update_2(s):
    c = s.query(Computer).filter(Computer.id == 3).scalar()
    e = s.query(Employee).filter(Employee.id == 3).scalar()
    if c and e:
        e.computer = c
        s.commit()


session = Session()
# insert(session)
# select(session)
# update_1(session)
update_2(session)

ORM查询

一对多关系

import datetime

from sqlalchemy import create_engine, String, ForeignKey
from sqlalchemy.orm import DeclarativeBase, sessionmaker, Mapped, mapped_column, relationship
from typing_extensions import Annotated
from typing import List


class Base(DeclarativeBase):
    pass


engine = create_engine('mysql://root:test@localhost/testdb', echo=True)


int_pk = Annotated[int, mapped_column(primary_key=True)]
required_unique_string = Annotated[str, mapped_column(String(128), unique=True, nullable=False)]
required_string = Annotated[str, mapped_column(String(128), nullable=False)]
timestamp_not_null = Annotated[datetime.datetime, mapped_column(nullable=False)]


class Department(Base):
    __tablename__ = "department"

    id: Mapped[int_pk]
    name: Mapped[required_unique_string]

    employees: Mapped[List["Employee"]] = relationship(back_populates="department")

    def __repr__(self):
        return f'id: {self.id}, name: {self.name}'


class Employee(Base):
    __tablename__ = "employee"

    id: Mapped[int_pk]
    dep_id: Mapped[int] = mapped_column(ForeignKey("department.id"))
    name: Mapped[required_unique_string]
    birthday: Mapped[timestamp_not_null]

    department: Mapped[Department] = relationship(back_populates="employees")

    def __repr__(self):
        return f'id: {self.id}, dep_id: {self.dep_id}, name: {self.name}, birthday: {self.birthday}'


Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
from sqlalchemy import select, insert, update, bindparam, delete
from sqlalchemy.orm import outerjoin, aliased

from db_init import Session, Department, Employee


def execute_query(query):
    result = session.execute(query)
    for row in result:
        print(row)


def select_single_target():
    query = select(Department).order_by(Department.name)
    execute_query(query)


def select_multiple():
    query = select(Employee, Department).join(Employee.department, isouter=True)
    execute_query(query)


def select_with_alias():
    emp_cls = aliased(Employee, name="emp")
    dep_cls = aliased(Department, name="dep")
    query = select(emp_cls, dep_cls).join(emp_cls.department.of_type(dep_cls))
    execute_query(query)


def select_fields():
    query = select(Employee.name.label('emp_name'), Department.name.label('dep_name')).join_from(Employee, Department)
    execute_query(query)


def select_fields_outer():
    query = select(Employee.name.label('emp_name'), Department.name.label('dep_name'))\
        .select_from(outerjoin(Employee, Department))
    execute_query(query)


def where_obj():
    dep = session.get(Department, 1)
    # query = select(Employee).where(Employee.department == dep)
    query = select(Employee).where(Employee.dep_id != dep.id)
    execute_query(query)


def select_contains():
    emp = session.get(Employee, 1)
    query = select(Department).where(Department.employees.contains(emp))
    execute_query(query)


session = Session()
# select_single_target()
# select_multiple()
# select_with_alias()
# select_fields()
# select_fields_outer()
# where_obj()
select_contains()

基于ORM的批量增加和修改

from sqlalchemy import select, insert, update, bindparam, delete
from sqlalchemy.orm import Session

from db_init import engine, Department, Employee


def batch_insert():
    with Session(engine) as session:
        session.execute(
            insert(Department).values(
                [
                    {"name": "QA"},
                    {"name": "Sales"}
                ]
            )
        )
        session.commit()


def batch_orm_insert():
    with Session(engine) as session:
        session.execute(
            insert(Employee).values(
                [
                    {
                        "dep_id": select(Department.id).where(Department.name == 'hr'),
                        "name": "wwww",
                        "birthday": "2000-1-19"
                    },
                    {
                        "dep_id": select(Department.id).where(Department.name == 'market'),
                        "name": "YYY",
                        "birthday": "2000-2-19"
                    }
                ]
            )
        )
        session.commit()


def batch_update():
    with Session(engine) as session:
        session.execute(
            update(Employee),
            [
                {"id": 2, "birthday": "1999-2-9"},
                {"id": 5, "name": "Samuel"}
            ]
        )
        session.commit()


def batch_delete():
    with Session(engine) as session:
        session.execute(
            delete(Employee).where(Employee.name.in_(['wwww', 'YYY']))
        )
        session.commit()


batch_delete()

Session和事务控制

from sqlalchemy import select, insert, update, delete
from sqlalchemy.orm import Session

from db_init import engine, engine2, Department, Employee, User


with Session(engine) as session1, session1.begin(), Session(engine2) as session2, session2.begin():
    dep = Department(name="pop2")
    session1.add(dep)

    user = User(name="hhh2")
    session2.add(user)

网站公告

今日签到

点亮在社区的每一天
去签到