Python项目源码57:数据格式转换工具1.0(csv+json+excel+sqlite3)

发布于:2025-05-08 ⋅ 阅读:(17) ⋅ 点赞:(0)

1.智能路径处理:自动识别并修正文件扩展名,根据转换类型自动建议目标路径,实时路径格式验证,自动补全缺失的文件扩展名。

2.增强型预览功能:使用pandastable库实现表格预览,第三方模块自己安装一下,自动加载前10行数据,支持实时预览更新,自动调整列宽,SQLite表名自动检测。

pip install pandastable

3.改进的UI交互:分栏式布局(左侧控制/右侧预览),自动保存路径建议,智能表名检测(SQLite),实时错误反馈,一键清空预览。

4.使用说明:选择转换类型,浏览选择源文件(自动生成目标路径建议),查看右侧数据预览确认数据正确性,(数据库转换需要输入/选择表名),点击"开始转换"按钮
在这里插入图片描述

# -*- coding: utf-8 -*-
# @Author : 小红牛
# 微信公众号:WdPython
import os
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
from tkinter.simpledialog import askstring
import sqlite3
import pandas as pd
from pandastable import Table

class DataConverterApp:
    def __init__(self, root):
        self.root = root
        self.root.title("智能数据转换工具1.0")
        self.root.geometry("1200x700")

        # 初始化变量
        self.source_path = tk.StringVar()
        self.target_path = tk.StringVar()
        self.table_name = tk.StringVar()
        self.conversion_type = tk.StringVar(value="excel2json")

        # 文件类型映射
        self.file_extensions = {
            "excel": ("Excel文件", ".xlsx"),
            "json": ("JSON文件", ".json"),
            "csv": ("CSV文件", ".csv"),
            "sqlite": ("SQLite数据库", ".db")
        }

        # 创建界面
        self.create_widgets()
        self.setup_preview_table()

    def setup_preview_table(self):
        """初始化数据预览表格"""
        self.preview_frame = ttk.Frame(self.preview_container)
        self.preview_frame.pack(fill=tk.BOTH, expand=True)
        self.ptable = Table(self.preview_frame, showtoolbar=False, showstatusbar=True)
        self.ptable.show()

    def create_widgets(self):
        # 主容器
        main_frame = ttk.Frame(self.root, padding=20)
        main_frame.pack(fill=tk.BOTH, expand=True)

        # 左侧控制面板
        control_frame = ttk.Frame(main_frame, width=400)
        control_frame.pack(side=tk.LEFT, fill=tk.Y)

        # 右侧预览面板
        self.preview_container = ttk.LabelFrame(main_frame, text="数据预览", padding=10)
        self.preview_container.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True)

        # 转换类型选择
        type_frame = ttk.LabelFrame(control_frame, text="转换类型", padding=10)
        type_frame.pack(fill=tk.X, pady=5)

        conversion_types = [
            ("Excel → JSON", "excel2json"),
            ("JSON → Excel", "json2excel"),
            ("CSV → JSON", "csv2json"),
            ("JSON → CSV", "json2csv"),
            ("SQLite → Excel", "sqlite2excel"),
            ("Excel → SQLite", "excel2sqlite"),
            ("SQLite → CSV", "sqlite2csv"),
            ("CSV → SQLite", "csv2sqlite")
        ]

        for text, value in conversion_types:
            rb = ttk.Radiobutton(type_frame, text=text, variable=self.conversion_type,
                                 value=value, command=self.update_ui)
            rb.pack(anchor=tk.W, pady=2)

        # 源文件设置
        source_frame = ttk.LabelFrame(control_frame, text="源文件设置", padding=10)
        source_frame.pack(fill=tk.X, pady=5)

        ttk.Label(source_frame, text="源文件路径:").pack(anchor=tk.W)
        ttk.Entry(source_frame, textvariable=self.source_path, width=50).pack(side=tk.LEFT, fill=tk.X, expand=True)
        ttk.Button(source_frame, text="浏览...", command=self.browse_source).pack(side=tk.RIGHT)

        # 目标文件设置
        target_frame = ttk.LabelFrame(control_frame, text="目标设置", padding=10)
        target_frame.pack(fill=tk.X, pady=5)

        ttk.Label(target_frame, text="目标路径:").pack(anchor=tk.W)
        ttk.Entry(target_frame, textvariable=self.target_path, width=50).pack(side=tk.LEFT, fill=tk.X, expand=True)
        ttk.Button(target_frame, text="浏览...", command=self.browse_target).pack(side=tk.RIGHT)

        # 数据库表名设置
        self.table_frame = ttk.LabelFrame(control_frame, text="数据库设置", padding=10)
        ttk.Label(self.table_frame, text="表名:").pack(side=tk.LEFT)
        ttk.Entry(self.table_frame, textvariable=self.table_name).pack(side=tk.LEFT, fill=tk.X, expand=True)

        # 操作按钮
        btn_frame = ttk.Frame(control_frame)
        btn_frame.pack(fill=tk.X, pady=10)

        ttk.Button(btn_frame, text="开始转换", command=self.start_conversion).pack(side=tk.LEFT, padx=5)
        ttk.Button(btn_frame, text="清空预览", command=self.clear_preview).pack(side=tk.LEFT, padx=5)
        ttk.Button(btn_frame, text="退出", command=self.root.quit).pack(side=tk.RIGHT, padx=5)

        # 绑定路径变化事件
        self.source_path.trace_add("write", self.update_preview)
        self.table_name.trace_add("write", self.update_preview)

        self.update_ui()

    def get_conversion_info(self):
        """获取当前转换类型信息"""
        ct = self.conversion_type.get()
        source_type = ct.split("2")[0]
        target_type = ct.split("2")[1]
        return source_type, target_type

    def update_ui(self):
        """更新界面元素"""
        source_type, target_type = self.get_conversion_info()

        # 更新数据库设置可见性
        if "sqlite" in self.conversion_type.get():
            self.table_frame.pack(fill=tk.X, pady=5)
        else:
            self.table_frame.pack_forget()

        # 自动更新目标路径后缀
        current_target = self.target_path.get()
        if current_target:
            base, _ = os.path.splitext(current_target)
            new_ext = self.file_extensions[target_type][1]
            self.target_path.set(f"{base}{new_ext}")

    def browse_source(self):
        """选择源文件"""
        source_type, _ = self.get_conversion_info()
        file_type = self.file_extensions[source_type]
        path = filedialog.askopenfilename(
            title="选择源文件",
            filetypes=[file_type, ("所有文件", "*.*")]
        )
        if path:
            self.source_path.set(path)
            self.auto_suggest_target_path(path)

    def auto_suggest_target_path(self, source_path):
        """自动生成目标路径建议"""
        source_type, target_type = self.get_conversion_info()
        base = os.path.splitext(source_path)[0]
        new_ext = self.file_extensions[target_type][1]
        suggested_path = f"{base}_converted{new_ext}"
        self.target_path.set(suggested_path)

    def browse_target(self):
        """选择目标路径"""
        _, target_type = self.get_conversion_info()
        file_type = self.file_extensions[target_type]
        path = filedialog.asksaveasfilename(
            title="保存目标文件",
            defaultextension=file_type[1],
            filetypes=[file_type, ("所有文件", "*.*")]
        )
        if path:
            self.target_path.set(path)

    def clear_preview(self):
        """清空预览"""
        self.ptable.clearTable()
        self.ptable.model.df = pd.DataFrame()
        self.ptable.redraw()

    def load_preview_data(self):
        """加载预览数据"""
        source_path = self.source_path.get()
        if not source_path:
            return None

        try:
            source_type, _ = self.get_conversion_info()
            if source_type == "excel":
                return pd.read_excel(source_path, nrows=10)
            elif source_type == "csv":
                return pd.read_csv(source_path, nrows=10)
            elif source_type == "json":
                return pd.read_json(source_path).head(10)
            elif source_type == "sqlite":
                conn = sqlite3.connect(source_path)
                table_name = self.table_name.get() or self.detect_table_name(conn)
                if table_name:
                    return pd.read_sql_query(f"SELECT * FROM {table_name} LIMIT 10", conn)
            return None
        except Exception as e:
            messagebox.showerror("预览错误", f"无法加载数据: {str(e)}")
            return None

    def detect_table_name(self, conn):
        """自动检测数据库表名"""
        cursor = conn.cursor()
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
        tables = cursor.fetchall()
        if tables:
            return askstring("选择表", "检测到多个表,请选择:", initialvalue=tables[0][0])
        return None

    def update_preview(self, *args):
        """更新数据预览"""
        df = self.load_preview_data()
        if df is not None:
            self.ptable.model.df = df
            self.ptable.redraw()
            self.ptable.autoResizeColumns()

    def start_conversion(self):
        """执行转换操作"""
        source_path = self.source_path.get()
        target_path = self.target_path.get()
        conversion_type = self.conversion_type.get()
        table_name = self.table_name.get()

        try:
            # 自动修正目标路径后缀
            _, target_type = self.get_conversion_info()
            target_ext = self.file_extensions[target_type][1]
            if not target_path.endswith(target_ext):
                target_path = f"{os.path.splitext(target_path)[0]}{target_ext}"
                self.target_path.set(target_path)

            # 执行转换逻辑
            if conversion_type == "excel2json":
                df = pd.read_excel(source_path)
                df.to_json(target_path, orient='records', indent=4)
            elif conversion_type == "json2excel":
                df = pd.read_json(source_path)
                df.to_excel(target_path, index=False)
            elif conversion_type == "csv2json":
                df = pd.read_csv(source_path)
                df.to_json(target_path, orient='records', indent=4)
            elif conversion_type == "json2csv":
                df = pd.read_json(source_path)
                df.to_csv(target_path, index=False)
            elif conversion_type == "sqlite2excel":
                conn = sqlite3.connect(source_path)
                table_name = table_name or self.detect_table_name(conn)
                df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn)
                df.to_excel(target_path, index=False)
            elif conversion_type == "excel2sqlite":
                df = pd.read_excel(source_path)
                conn = sqlite3.connect(target_path)
                df.to_sql(table_name or "Sheet1", conn, if_exists='replace', index=False)
            elif conversion_type == "sqlite2csv":
                conn = sqlite3.connect(source_path)
                table_name = table_name or self.detect_table_name(conn)
                df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn)
                df.to_csv(target_path, index=False)
            elif conversion_type == "csv2sqlite":
                df = pd.read_csv(source_path)
                conn = sqlite3.connect(target_path)
                df.to_sql(table_name or "CSV_Data", conn, if_exists='replace', index=False)

            messagebox.showinfo("成功", f"文件已成功保存到:\n{target_path}")
            self.update_preview()
        except Exception as e:
            messagebox.showerror("错误", f"转换失败: {str(e)}")


if __name__ == "__main__":
    root = tk.Tk()
    app = DataConverterApp(root)
    root.mainloop()

完毕!!感谢您的收看

----------★★跳转到历史博文集合★★----------
我的零基础Python教程,Python入门篇 进阶篇 视频教程 Py安装py项目 Python模块 Python爬虫 Json Xpath 正则表达式 Selenium Etree CssGui程序开发 Tkinter Pyqt5 列表元组字典数据可视化 matplotlib 词云图 Pyecharts 海龟画图 Pandas Bug处理 电脑小知识office自动化办公 编程工具 NumPy Pygame


网站公告

今日签到

点亮在社区的每一天
去签到