django的数据库原生操作sql

发布于:2025-08-03 ⋅ 阅读:(7) ⋅ 点赞:(0)
from django.db import connection
from django.db import transaction
from django.db.utils import (
    IntegrityError,
    OperationalError,
    ProgrammingError,
    DataError
)
from django.utils import timezone

class Db(object):
    """数据库操作工具类,封装了CRUD、批量操作、事务和软删除功能
        数据表必须要有:
        updated_at-更新时间
        created_at-创建时间
        is_deleted-软删除标志 0为删除,1已删除
        deleted_at-删除时间
    """

    # ------------------------------
    # 核心查询方法
    # ------------------------------
    def execute_query(self, sql, params=None, fetchone=False, return_last_id=False):
        """
        执行SQL查询
        :param sql: SQL语句
        :param params: 查询参数(元组)
        :param fetchone: 是否只返回第一条记录(SELECT专用)
        :param return_last_id: 是否返回最后插入的ID(INSERT/UPDATE/DELETE专用)
        :return: SELECT返回字典列表/单条字典,其他语句返回受影响行数或最后插入ID
        """
        try:
            with connection.cursor() as cursor:
                cursor.execute(sql, params or ())

                # 打印sql
                # print(f"{sql}====\n===={params}")
                if sql.strip().lower().startswith(('select', 'show')):
                    columns = [col[0] for col in cursor.description]
                    results = [dict(zip(columns, row)) for row in cursor.fetchall()]

                    return results[0] if fetchone and results else results
                else:
                    if return_last_id:
                        try:
                            return cursor.lastrowid
                        except AttributeError:
                            # 如果数据库后端不支持lastrowid,则返回受影响行数
                            return cursor.rowcount
                    else:
                        return cursor.rowcount

        except IntegrityError as e:
            error_msg = f"数据完整性错误(可能原因:主键冲突、唯一约束重复):{str(e)}"
            print(error_msg)
            raise ValueError(error_msg) from e
        except OperationalError as e:
            error_msg = f"数据库连接/操作失败(可能原因:数据库未启动、网络中断):{str(e)}"
            print(error_msg)
            raise ConnectionError(error_msg) from e
        except ProgrammingError as e:
            error_msg = f"SQL语法/表结构错误(可能原因:表/字段不存在、SQL拼写错误):{str(e)}"
            print(error_msg)
            raise SyntaxError(error_msg) from e
        except DataError as e:
            error_msg = f"数据格式错误(可能原因:类型不匹配、长度超限):{str(e)}"
            print(error_msg)
            raise TypeError(error_msg) from e
        except Exception as e:
            error_msg = f"意外错误:{str(e)}"
            print(error_msg)
            raise

    # ------------------------------
    # 插入操作
    # ------------------------------
    def insert_data(self, table, data, auto_time=True, return_last_id=True):
        """
        插入单条数据
        :param table: 表名字符串
        :param data: 数据字典,格式 {字段名: 值}
        :param auto_time: 是否自动添加时间字段(默认True)
        :param return_last_id: 是否返回最后插入的ID
        :return: 最后插入的ID或受影响的行数
        """
        fields = list(data.keys())
        values = list(data.values())

        if auto_time:
            now = timezone.now()
            fields.extend(['created_at', 'updated_at'])
            values.extend([now, now])

        placeholders = ', '.join(['%s'] * len(fields))
        sql = f"INSERT INTO {table} ({', '.join(fields)}) VALUES ({placeholders})"

        return self.execute_query(sql, values, return_last_id=return_last_id)

    def batch_insert(self, table, data_list, auto_time=True, batch_size=1000):
        """
        批量插入数据(自动分批次)
        :param table: 表名字符串
        :param data_list: 数据字典列表,格式 [{字段名: 值}, ...]
        :param auto_time: 是否自动添加时间字段(默认True)
        :param batch_size: 每批次处理的记录数(默认1000)
        :return: 总共插入的行数
        """
        if not data_list:
            return 0

        total_rows = 0
        total_batches = (len(data_list) + batch_size - 1) // batch_size

        for i in range(total_batches):
            start = i * batch_size
            end = start + batch_size
            batch_data = data_list[start:end]

            first_item = batch_data[0].copy()
            fields = list(first_item.keys())

            if auto_time:
                now = timezone.now()
                fields.extend(['created_at', 'updated_at'])
                for item in batch_data:
                    item['created_at'] = now
                    item['updated_at'] = now

            placeholders = ', '.join(['%s'] * len(fields))
            value_groups = [tuple(item.values()) for item in batch_data]
            flat_values = [val for group in value_groups for val in group]

            sql = f"INSERT INTO {table} ({', '.join(fields)}) VALUES "
            sql += ', '.join([f"({placeholders})"] * len(batch_data))

            rows = self.execute_query(sql, flat_values)
            total_rows += rows

        return total_rows

    # ------------------------------
    # 更新操作
    # ------------------------------
    def update_data(self, table, data, where, params=None, auto_time=True, return_last_id=False):
        """
        更新数据
        :param table: 表名字符串
        :param data: 要更新的数据字典,格式 {字段名: 值}
        :param where: WHERE条件(不带WHERE关键字),例如 "id = %s"
        :param params: WHERE条件的参数列表
        :param auto_time: 是否自动更新updated_at字段(默认True)
        :param return_last_id: 是否返回最后插入的ID(在支持的数据库中可能返回更新的行ID)
        :return: 受影响的行数或最后插入ID
        """
        set_items = []
        values = list(data.values())

        if auto_time:
            data['updated_at'] = timezone.now()
            set_items = [f"{field} = %s" for field in data.keys()]
            values = list(data.values())
        else:
            set_items = [f"{field} = %s" for field in data.keys()]

        set_clause = ', '.join(set_items)
        sql = f"UPDATE {table} SET {set_clause} WHERE {where}"

        if params:
            values.extend(params)
        return self.execute_query(sql, values, return_last_id=return_last_id)

    def batch_update(self, table, data_list, where_field='id', auto_time=True, batch_size=1000):
        """
        批量更新数据(使用CASE WHEN优化)
        :param table: 表名字符串
        :param data_list: 数据字典列表,每个字典必须包含where_field字段
        :param where_field: 用于匹配记录的字段(默认id)
        :param auto_time: 是否自动更新updated_at字段(默认True)
        :param batch_size: 每批次处理的记录数(默认1000)
        :return: 总共更新的行数
        """
        if not data_list:
            return 0

        total_rows = 0
        total_batches = (len(data_list) + batch_size - 1) // batch_size

        for i in range(total_batches):
            start = i * batch_size
            end = start + batch_size
            batch_data = data_list[start:end]

            case_clauses = []
            values = []

            update_fields = {k for item in batch_data for k in item.keys() if k != where_field}
            for field in update_fields:
                case_sql = f"{field} = CASE {where_field} "
                for item in batch_data:
                    case_sql += f"WHEN %s THEN %s "
                    values.extend([item[where_field], item[field]])
                case_sql += "END"
                case_clauses.append(case_sql)

            where_ids = [item[where_field] for item in batch_data]
            values.extend(where_ids)

            if auto_time:
                now = timezone.now()
                case_clauses.append(f"updated_at = %s")
                values.append(now)

            set_clause = ', '.join(case_clauses)
            where_placeholders = ', '.join(['%s'] * len(where_ids))
            sql = f"UPDATE {table} SET {set_clause} WHERE {where_field} IN ({where_placeholders})"

            rows = self.execute_query(sql, values)
            total_rows += rows

        return total_rows

    # ------------------------------
    # 删除/软删除操作
    # ------------------------------
    def soft_delete(self, table, where, params=None, return_last_id=False):
        """
        软删除数据(标记is_deleted=1)
        :param table: 表名字符串
        :param where: WHERE条件(不带WHERE关键字),例如 "id = %s"
        :param params: WHERE条件的参数列表
        :param return_last_id: 是否返回最后插入的ID(在支持的数据库中可能返回删除的行ID)
        :return: 受影响的行数或最后插入ID
        """
        data = {'is_deleted': 1, 'deleted_at': timezone.now()}
        return self.update_data(table, data, where, params, auto_time=False, return_last_id=return_last_id)

    def delete_data(self, table, where, params=None, hard_delete=False, return_last_id=False):
        """
        删除数据(默认软删除)
        :param table: 表名字符串
        :param where: WHERE条件(不带WHERE关键字),例如 "id = %s"
        :param params: WHERE条件的参数列表
        :param hard_delete: 是否执行硬删除(物理删除)
        :param return_last_id: 是否返回最后插入的ID(在支持的数据库中可能返回删除的行ID)
        :return: 受影响的行数或最后插入ID
        """
        if hard_delete:
            sql = f"DELETE FROM {table} WHERE {where}"
            return self.execute_query(sql, params, return_last_id=return_last_id)
        else:
            return self.soft_delete(table, where, params, return_last_id=return_last_id)

    # ------------------------------
    # 查询操作
    # ------------------------------
    def get_list(self, table, where=None, params=None, order_by=None, limit=None, offset=None,
                 with_deleted=False, only_deleted=False, fields=None):
        """
        查询列表数据
        :param table: 表名字符串
        :param where: WHERE条件(不带WHERE关键字)
        :param params: WHERE条件的参数列表
        :param order_by: 排序字段,例如 "created_at DESC"
        :param limit: 返回记录数限制
        :param offset: 偏移量(用于分页)
        :param with_deleted: 是否包含已删除数据
        :param only_deleted: 是否只返回已删除数据
        :param fields: 要返回的字段列表,默认返回所有字段
        :return: 符合条件的记录列表
        """
        if fields:
            select_str = ', '.join(fields)
        else:
            select_str = '*'

        conditions = []
        query_params = []
        if where:
            conditions.append(where)
            if params:
                query_params.extend(params)

        if not with_deleted:
            if only_deleted:
                conditions.append("is_deleted = 1")
            else:
                conditions.append("is_deleted = 0")

        order_str = f"ORDER BY {order_by}" if order_by else ""
        limit_str = f"LIMIT {limit}" if limit is not None else ""
        offset_str = f"OFFSET {offset}" if offset is not None else ""

        where_clause = " AND ".join(conditions) if conditions else ""
        where_str = f"WHERE {where_clause}" if where_clause else ""
        sql = f"SELECT {select_str} FROM {table} {where_str} {order_str} {limit_str} {offset_str}"

        return self.execute_query(sql, query_params)

    def get_one(self, table, where, params=None, with_deleted=False, only_deleted=False, fields=None):
        """
        查询单条数据
        :param table: 表名字符串
        :param where: WHERE条件(不带WHERE关键字)
        :param params: WHERE条件的参数列表
        :param with_deleted: 是否包含已删除数据
        :param only_deleted: 是否只返回已删除数据
        :param fields: 要返回的字段列表,默认返回所有字段
        :return: 符合条件的单条记录或None
        """
        return self.get_list(
            table, where, params,
            with_deleted=with_deleted,
            only_deleted=only_deleted,
            fields=fields,
            limit=1
        )

    def get_count(self, table, where=None, params=None, with_deleted=False, only_deleted=False):
        """
        查询记录总数
        :param table: 表名字符串
        :param where: WHERE条件(不带WHERE关键字)
        :param params: WHERE条件的参数列表
        :param with_deleted: 是否包含已删除数据
        :param only_deleted: 是否只返回已删除数据
        :return: 记录总数
        """
        conditions = []
        query_params = []
        if where:
            conditions.append(where)
            if params:
                query_params.extend(params)

        if not with_deleted:
            if only_deleted:
                conditions.append("is_deleted = 1")
            else:
                conditions.append("is_deleted = 0")

        where_clause = " AND ".join(conditions) if conditions else ""
        where_str = f"WHERE {where_clause}" if where_clause else ""
        sql = f"SELECT COUNT(*) AS count FROM {table} {where_str}"

        result = self.execute_query(sql, query_params, fetchone=True)
        return result['count'] if result else 0

    # ------------------------------
    # 事务操作
    # ------------------------------
    def execute_transaction(self, operations):
        """
        执行一组SQL操作作为事务
        :param operations: SQL操作列表,格式 [(SQL语句, 参数元组), ...]
        :return: 每个操作的返回结果列表
        :raises: 任何操作失败时抛出异常,所有操作回滚
        """
        with transaction.atomic():
            results = []
            for idx, (sql, params) in enumerate(operations):
                try:
                    result = self.execute_query(sql, params)
                    results.append(result)
                except Exception as e:
                    raise RuntimeError(f"事务中第{idx + 1}条SQL执行失败:{str(e)}") from e
            return results

    # ------------------------------
    # 数据恢复
    # ------------------------------
    def restore_data(self, table, where, params=None):
        """
        恢复软删除的数据(is_deleted=0)
        :param table: 表名字符串
        :param where: WHERE条件(不带WHERE关键字),例如 "id = %s"
        :param params: WHERE条件的参数列表
        :return: 受影响的行数
        """
        data = {'is_deleted': 0, 'deleted_at': None}
        return self.update_data(table, data, where, params, auto_time=True)
    def get_count(self, table, where=None, params=None, with_deleted=False, only_deleted=False):
        """
        查询记录总数
        :param table: 表名字符串
        :param where: WHERE条件(不带WHERE关键字)
        :param params: WHERE条件的参数列表
        :param with_deleted: 是否包含已删除数据
        :param only_deleted: 是否只返回已删除数据
        :return: 记录总数
        """
        conditions = []
        query_params = []
        if where:
            conditions.append(where)
            if params:
                query_params.extend(params)

        # if not with_deleted:
        #     if only_deleted:
        #         conditions.append("is_deleted = 1")
        #     else:
        #         conditions.append("is_deleted = 0")

        where_clause = " AND ".join(conditions) if conditions else ""
        where_str = f"WHERE {where_clause}" if where_clause else ""
        sql = f"SELECT COUNT(*) AS count FROM {table} {where_str}"

        result = self.execute_query(sql, query_params, fetchone=True)
        return result['count'] if result else 0

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述