python2.7+flask1.1.4+SQLAlchemy1.3.0+Flask-SQLAlchemy2.1连接mysql稳定方式

发布于:2025-06-19 ⋅ 阅读:(13) ⋅ 点赞:(0)

经过了多次尝试,该版本是python2.7比较稳定好用的版本。以下即是完整代码

#coding=utf-8

from config import config
from app.main import platform_app
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import OperationalError, InvalidRequestError, StatementError
import time
import traceback
import threading
import logging

logger = logging.getLogger(__name__)

platform_app.config['SQLALCHEMY_DATABASE_URI'] = "mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}"
platform_app.config['SQLALCHEMY_POOL_PRE_PING'] = True
platform_app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

engine_options = {
    'pool_size': 200,
    'max_overflow': 100,
    'pool_timeout': 30,
    'pool_recycle': 600,
    'pool_pre_ping': True,
    'connect_args': {
        'connect_timeout': 5,
        'read_timeout': 30,
        'write_timeout': 30,
        'charset': 'utf8',
    }
}

platform_app.config['SQLALCHEMY_ENGINE_OPTIONS'] = engine_options

db = SQLAlchemy(platform_app)


#安全执行SQL函数
def safe_db_execute(func, *args, **kwargs):
    """安全执行数据库操作,自动处理连接失效和事务问题"""
    max_retries = 3
    for attempt in range(max_retries):
        try:
            return func(*args, **kwargs)
        except (OperationalError, InvalidRequestError, StatementError) as e:
            error_str = str(e).lower()

            # 检查是否是需要回滚的错误
            if "can't reconnect until invalid transaction is rolled back" in error_str or \
                    "mysql server has gone away" in error_str:

                logger.warning("数据库连接问题 (尝试 %d/%d): %s", attempt + 1, max_retries, str(e))

                # 尝试回滚事务
                try:
                    db.session.rollback()
                    logger.info("事务已回滚")
                except Exception as rollback_error:
                    logger.error("回滚失败: %s", str(rollback_error))

                # 尝试关闭当前会话
                try:
                    db.session.close()
                    logger.info("会话已关闭")
                except Exception as close_error:
                    logger.error("关闭会话失败: %s", str(close_error))

                # 刷新连接池
                try:
                    db.engine.dispose()
                    logger.info("连接池已刷新")
                except Exception as dispose_error:
                    logger.error("刷新连接池失败: %s", str(dispose_error))

                # 重新创建会话
                try:
                    # 在旧版本中需要这样重新创建会话
                    db.session = db.create_scoped_session()
                    logger.info("会话已重新创建")
                except:
                    # 如果上面失败,尝试简单重新创建
                    try:
                        db.session = db.sessionmaker(bind=db.engine)
                        logger.info("会话已重新创建(备用方法)")
                    except Exception as session_error:
                        logger.error("重新创建会话失败: %s", str(session_error))

                # 等待一段时间后重试
                time.sleep(1)
            else:
                # 如果是其他错误,直接抛出
                logger.error("数据库操作失败: %s", str(e))
                traceback.print_exc()
                raise
    # 如果重试多次都失败
    logger.error("数据库操作失败,重试 %d 次后放弃", max_retries)
    raise Exception("数据库操作失败,重试 {} 次后放弃".format(max_retries))


#封装常用数据库操作
class SafeDBSession:
    @staticmethod
    def execute(query, params=None):
        return safe_db_execute(db.session.execute, query, params)

    @staticmethod
    def query(*entities):
        return safe_db_execute(db.session.query, *entities)

    @staticmethod
    def add(instance):
        return safe_db_execute(db.session.add, instance)

    @staticmethod
    def commit():
        return safe_db_execute(db.session.commit)

    @staticmethod
    def rollback():
        try:
            db.session.rollback()
            logger.info("事务已回滚")
        except Exception as e:
            logger.error("回滚失败: %s", str(e))

    @staticmethod
    def update(entity, filter_expr, update_values):
        """
        安全更新操作
        :param entity: 模型类 (如 User)
        :param filter_expr: 过滤表达式 (如 User.id == 1)
        :param update_values: 更新值字典 (如 {'name': 'New Name'})
        :return: 更新的行数
        """

        def _update():
            # 构建更新查询
            query = db.session.query(entity).filter(filter_expr)
            # 执行更新并返回影响的行数
            return query.update(update_values, synchronize_session=False)

        return safe_db_execute(_update)

    @staticmethod
    def delete(entity, filter_expr=None, instance=None):
        """
        安全删除操作
        :param entity: 模型类 (如 User)
        :param filter_expr: 过滤表达式 (如 User.id == 1),用于批量删除
        :param instance: 要删除的实例对象,用于单个对象删除
        :return: 删除的行数
        """

        def _delete():
            if instance:
                # 删除单个实例
                db.session.delete(instance)
                return 1  # 单个删除始终返回1
            elif filter_expr:
                # 批量删除
                query = db.session.query(entity).filter(filter_expr)
                return query.delete(synchronize_session=False)
            else:
                raise ValueError("必须提供filter_expr或instance参数")

        return safe_db_execute(_delete)

    @staticmethod
    def get(entity, primary_key):
        """安全获取单个对象"""

        def _get():
            return db.session.query(entity).get(primary_key)

        return safe_db_execute(_get)

    @staticmethod
    def flush():
        """安全刷新会话"""
        return safe_db_execute(db.session.flush)


#确保在请求结束时移除会话
@platform_app.teardown_appcontext
def shutdown_session(exception=None):
    try:
        db.session.remove()
        logger.debug("会话已清理")
    except Exception as e:
        logger.error("清理会话时出错: %s", str(e))

    # 确保所有连接都关闭
    try:
        db.engine.dispose()
        logger.debug("连接池已刷新")
    except Exception as e:
        logger.error("刷新连接池失败: %s", str(e))

def connection_heartbeat():
    """定期执行心跳查询保持连接活跃"""
    while True:
        time.sleep(300)  # 每5分钟一次
        try:
            SafeDBSession.execute("SELECT 1")
            logger.debug("数据库心跳正常")
        except Exception as e:
            logger.error("数据库心跳失败: %s", str(e))

#启动心跳线程
if not platform_app.debug:
    heartbeat_thread = threading.Thread(target=connection_heartbeat)
    heartbeat_thread.daemon = True
    heartbeat_thread.start()

网站公告

今日签到

点亮在社区的每一天
去签到