python&Flask 使用 SQLAlchemy 的连接池

发布于:2025-09-11 ⋅ 阅读:(20) ⋅ 点赞:(0)

相比于上篇写的DBUtils,SQLAlchemy 是一个功能更为强大的 Python 库,它不仅支持 ORM,还支持与关系数据库的直接交互。通过 SQLAlchemy,我们可以非常方便地创建和管理数据库连接池,大多数情况下,连接池是默认启用的,并且可以根据需要进行配置。下面是用 Python 和 SQLAlchemy 创建并配置连接池的一些步骤:

目录

1、安装 SQLAlchemy

2、进行连接池基本配置

2.1 连接池参数详解

3、基本使用方式

3.1 直接执行sql

3.2 使用session

3、Flask集成示例


1、安装 SQLAlchemy

首先,确保安装了 SQLAlchemy 和你所需的数据库驱动程序,例如 pymysql 用于 MySQL。

pip install sqlalchemy pymysql

2、进行连接池基本配置

以下是使用 SQLAlchemy 创建连接池的基本代码示例:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# 创建数据库连接 URI
DATABASE_URI = 'mysql+pymysql://user:password@localhost/dbname'

# 创建 SQLAlchemy 引擎,并通过参数配置连接池
engine = create_engine(
    DATABASE_URI,
    pool_size=5,          # 连接池的大小,默认为 5
    max_overflow=10,      # 连接池中允许超过 pool_size 的最大连接数,超出部分会被释放
    pool_timeout=30,      # 等待连接的超时时间,默认是 30 秒
    pool_recycle=1800     # 连接回收时间,断开重连之前的时间,以秒为单位;可以防止过期连接使用
)
2.1 连接池参数详解

参数

说明

推荐值

pool_size

连接池保持的连接数

5-20

max_overflow

允许超过pool_size的连接数

5-10

pool_recycle

连接回收时间(秒)

3600

pool_timeout

获取连接的超时时间(秒)

30

pool_pre_ping

执行前检查连接有效性

True

pool_use_lifo

使用LIFO(后进先出)策略

True

3、基本使用方式

3.1 直接执行sql
  • 直接操作数据库连接,适合执行原生SQL,返回原始数据行(类似DB-API接口)
  • 需要精细控制SQL或高性能场景 → 选择 engine.connect()
with engine.connect() as connection:
    result = connection.execute("SELECT * FROM users")
    for row in result:
        print(row)
3.2 使用session
  • 提供对象关系映射(ORM)功能,返回模型对象,自动管理事务和对象状态
  • 需要对象关系映射或复杂业务逻辑​ → 选择 Session
from sqlalchemy.orm import sessionmaker

# 创建会话
Session = sessionmaker(bind=engine)

# 使用会话进行数据库操作
def perform_query():
    session = Session()  # 创建会话实例

    try:
        result = session.execute("SELECT * FROM your_table").fetchall()
        print(result)
    finally:
        session.close()  # 会话使用完毕后关闭

# 执行查询
perform_query()

3、Flask集成示例

from flask import Flask, g
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

app = Flask(__name__)

engine = create_engine(
    'mysql+pymysql://user:pass@localhost/db',
    pool_size=10,
    max_overflow=5,
    pool_recycle=3600
)

# 使用scoped_session确保线程安全
db_session = scoped_session(
    sessionmaker(
        autocommit=False,
        autoflush=False,
        bind=engine
    )
)

@app.teardown_appcontext
def shutdown_session(exception=None):
    db_session.remove()

@app.route('/users')
def get_users():
    """使用ORM的query方法查询所有用户"""
    users = db_session.query(User).all()
    return {'users': [user.to_dict() for user in users]}


网站公告

今日签到

点亮在社区的每一天
去签到