经过了多次尝试,该版本是python2.7比较稳定好用的版本。以下即是完整代码
#coding=utf-8
from config import config
from app.main import platform_app
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import OperationalError, InvalidRequestError, StatementError
import time
import traceback
import threading
import logging
logger = logging.getLogger(__name__)
platform_app.config['SQLALCHEMY_DATABASE_URI'] = "mysql+pymysql://{username}:{password}@{host}:{port}/{dbname}"
platform_app.config['SQLALCHEMY_POOL_PRE_PING'] = True
platform_app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
engine_options = {
'pool_size': 200,
'max_overflow': 100,
'pool_timeout': 30,
'pool_recycle': 600,
'pool_pre_ping': True,
'connect_args': {
'connect_timeout': 5,
'read_timeout': 30,
'write_timeout': 30,
'charset': 'utf8',
}
}
platform_app.config['SQLALCHEMY_ENGINE_OPTIONS'] = engine_options
db = SQLAlchemy(platform_app)
#安全执行SQL函数
def safe_db_execute(func, *args, **kwargs):
"""安全执行数据库操作,自动处理连接失效和事务问题"""
max_retries = 3
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except (OperationalError, InvalidRequestError, StatementError) as e:
error_str = str(e).lower()
# 检查是否是需要回滚的错误
if "can't reconnect until invalid transaction is rolled back" in error_str or \
"mysql server has gone away" in error_str:
logger.warning("数据库连接问题 (尝试 %d/%d): %s", attempt + 1, max_retries, str(e))
# 尝试回滚事务
try:
db.session.rollback()
logger.info("事务已回滚")
except Exception as rollback_error:
logger.error("回滚失败: %s", str(rollback_error))
# 尝试关闭当前会话
try:
db.session.close()
logger.info("会话已关闭")
except Exception as close_error:
logger.error("关闭会话失败: %s", str(close_error))
# 刷新连接池
try:
db.engine.dispose()
logger.info("连接池已刷新")
except Exception as dispose_error:
logger.error("刷新连接池失败: %s", str(dispose_error))
# 重新创建会话
try:
# 在旧版本中需要这样重新创建会话
db.session = db.create_scoped_session()
logger.info("会话已重新创建")
except:
# 如果上面失败,尝试简单重新创建
try:
db.session = db.sessionmaker(bind=db.engine)
logger.info("会话已重新创建(备用方法)")
except Exception as session_error:
logger.error("重新创建会话失败: %s", str(session_error))
# 等待一段时间后重试
time.sleep(1)
else:
# 如果是其他错误,直接抛出
logger.error("数据库操作失败: %s", str(e))
traceback.print_exc()
raise
# 如果重试多次都失败
logger.error("数据库操作失败,重试 %d 次后放弃", max_retries)
raise Exception("数据库操作失败,重试 {} 次后放弃".format(max_retries))
#封装常用数据库操作
class SafeDBSession:
@staticmethod
def execute(query, params=None):
return safe_db_execute(db.session.execute, query, params)
@staticmethod
def query(*entities):
return safe_db_execute(db.session.query, *entities)
@staticmethod
def add(instance):
return safe_db_execute(db.session.add, instance)
@staticmethod
def commit():
return safe_db_execute(db.session.commit)
@staticmethod
def rollback():
try:
db.session.rollback()
logger.info("事务已回滚")
except Exception as e:
logger.error("回滚失败: %s", str(e))
@staticmethod
def update(entity, filter_expr, update_values):
"""
安全更新操作
:param entity: 模型类 (如 User)
:param filter_expr: 过滤表达式 (如 User.id == 1)
:param update_values: 更新值字典 (如 {'name': 'New Name'})
:return: 更新的行数
"""
def _update():
# 构建更新查询
query = db.session.query(entity).filter(filter_expr)
# 执行更新并返回影响的行数
return query.update(update_values, synchronize_session=False)
return safe_db_execute(_update)
@staticmethod
def delete(entity, filter_expr=None, instance=None):
"""
安全删除操作
:param entity: 模型类 (如 User)
:param filter_expr: 过滤表达式 (如 User.id == 1),用于批量删除
:param instance: 要删除的实例对象,用于单个对象删除
:return: 删除的行数
"""
def _delete():
if instance:
# 删除单个实例
db.session.delete(instance)
return 1 # 单个删除始终返回1
elif filter_expr:
# 批量删除
query = db.session.query(entity).filter(filter_expr)
return query.delete(synchronize_session=False)
else:
raise ValueError("必须提供filter_expr或instance参数")
return safe_db_execute(_delete)
@staticmethod
def get(entity, primary_key):
"""安全获取单个对象"""
def _get():
return db.session.query(entity).get(primary_key)
return safe_db_execute(_get)
@staticmethod
def flush():
"""安全刷新会话"""
return safe_db_execute(db.session.flush)
#确保在请求结束时移除会话
@platform_app.teardown_appcontext
def shutdown_session(exception=None):
try:
db.session.remove()
logger.debug("会话已清理")
except Exception as e:
logger.error("清理会话时出错: %s", str(e))
# 确保所有连接都关闭
try:
db.engine.dispose()
logger.debug("连接池已刷新")
except Exception as e:
logger.error("刷新连接池失败: %s", str(e))
def connection_heartbeat():
"""定期执行心跳查询保持连接活跃"""
while True:
time.sleep(300) # 每5分钟一次
try:
SafeDBSession.execute("SELECT 1")
logger.debug("数据库心跳正常")
except Exception as e:
logger.error("数据库心跳失败: %s", str(e))
#启动心跳线程
if not platform_app.debug:
heartbeat_thread = threading.Thread(target=connection_heartbeat)
heartbeat_thread.daemon = True
heartbeat_thread.start()