python 库 下载 ,整合在一个小程序 UIUIUI

发布于:2025-04-20 ⋅ 阅读:(68) ⋅ 点赞:(0)


上图
 



import os
import time
import threading
import requests
import subprocess
import importlib
import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urljoin

class PackageInstallerApp:
    def __init__(self, root):
        self.root = root
        self.root.title("Design By Tim")
        self.root.geometry("800x600")
        
        # 默认设置
        self.install_dir = r"C:\Users\AAA\Python_Package"
        self.default_packages = ["numpy", "opencv-python", "pyttsx3"]
        self.mirrors = [
            "https://pypi.tuna.tsinghua.edu.cn/simple/",
            "https://mirrors.aliyun.com/pypi/simple/",
            "https://pypi.mirrors.ustc.edu.cn/simple/",
            "https://mirrors.bfsu.edu.cn/pypi/web/simple/",
            "https://mirrors.cloud.tencent.com/pypi/simple/",
            "https://mirrors.nju.edu.cn/pypi/web/simple/",
            "https://mirrors.hit.edu.cn/pypi/web/simple/",
            "https://mirror.sjtu.edu.cn/pypi/web/simple/",
            "https://pypi.doubanio.com/simple/",
            "https://mirrors.zju.edu.cn/pypi/web/simple/",
            "https://mirrors.pku.edu.cn/pypi/simple/",
            "https://mirrors.yun-idc.com/pypi/simple/",
            "https://mirrors.neusoft.edu.cn/pypi/web/simple/",
            "https://mirrors.xjtu.edu.cn/pypi/web/simple/",
            "https://mirrors.huaweicloud.com/repository/pypi/simple/"
        ]
        
        # UI控件引用
        self.start_button = None
        self.cancel_button = None
        
        # 创建UI
        self.create_widgets()
    
    def create_widgets(self):
        # 主框架
        main_frame = ttk.Frame(self.root, padding="10")
        main_frame.pack(fill=tk.BOTH, expand=True)
        
        # 包列表输入
        ttk.Label(main_frame, text="要下载的包(逗号分隔):").grid(row=0, column=0, sticky=tk.W)
        self.pkg_entry = ttk.Entry(main_frame, width=50)
        self.pkg_entry.grid(row=0, column=1, sticky=tk.EW)
        self.pkg_entry.insert(0, ",".join(self.default_packages))
        
        # 安装目录
        ttk.Label(main_frame, text="安装目录:").grid(row=1, column=0, sticky=tk.W)
        self.dir_entry = ttk.Entry(main_frame, width=50)
        self.dir_entry.grid(row=1, column=1, sticky=tk.EW)
        self.dir_entry.insert(0, self.install_dir)
        
        # 按钮框架
        btn_frame = ttk.Frame(main_frame)
        btn_frame.grid(row=2, column=0, columnspan=2, pady=10)
        
        # 开始按钮
        self.start_button = ttk.Button(btn_frame, text="开始下载安装", command=self.start_process)
        self.start_button.pack(side=tk.LEFT, padx=5)
        
        # 取消按钮
        self.cancel_button = ttk.Button(btn_frame, text="取消", command=self.root.quit)
        self.cancel_button.pack(side=tk.LEFT, padx=5)
        
        # 进度条
        self.progress = ttk.Progressbar(main_frame, orient=tk.HORIZONTAL, length=500, mode='determinate')
        self.progress.grid(row=3, column=0, columnspan=2, pady=10)
        
        # 状态标签
        self.status_label = ttk.Label(main_frame, text="准备就绪")
        self.status_label.grid(row=4, column=0, columnspan=2)
        
        # 日志输出
        ttk.Label(main_frame, text="进度日志:").grid(row=5, column=0, sticky=tk.W)
        self.log_text = scrolledtext.ScrolledText(main_frame, width=80, height=20, state='normal')
        self.log_text.grid(row=6, column=0, columnspan=2, sticky=tk.NSEW)
        
        # 配置网格权重
        main_frame.columnconfigure(1, weight=1)
        main_frame.rowconfigure(6, weight=1)
    
    def log_message(self, message):
        self.log_text.config(state='normal')
        self.log_text.insert(tk.END, message + "\n")
        self.log_text.config(state='disabled')
        self.log_text.see(tk.END)
        self.root.update()
    
    def update_progress(self, value):
        self.progress['value'] = value
        self.root.update()
    
    def update_status(self, message):
        self.status_label.config(text=message)
        self.root.update()
    
    def set_ui_state(self, enabled):
        state = tk.NORMAL if enabled else tk.DISABLED
        self.start_button.config(state=state)
        self.cancel_button.config(state=state)
        self.root.update()
    
    def start_process(self):
        # 获取用户输入
        packages = [pkg.strip() for pkg in self.pkg_entry.get().split(",") if pkg.strip()]
        self.install_dir = self.dir_entry.get().strip()
        
        if not packages:
            messagebox.showerror("错误", "请输入至少一个包名")
            return
        
        os.makedirs(self.install_dir, exist_ok=True)
        
        # 禁用UI
        self.set_ui_state(False)
        self.log_text.config(state='normal')
        self.log_text.delete(1.0, tk.END)
        self.log_text.config(state='disabled')
        self.update_progress(0)
        
        # 开始下载安装流程
        threading.Thread(target=self.download_and_install, args=(packages,), daemon=True).start()
    
    def download_and_install(self, packages):
        overall_success = True  # 跟踪整体成功状态
        
        try:
            # 1. 测试镜像源速度
            self.update_status("正在测试镜像源速度...")
            self.log_message("="*50)
            self.log_message("开始测试镜像源速度")
            
            fastest_mirrors = self.find_fastest_mirrors()
            
            # 2. 下载包
            self.update_status("开始下载包...")
            self.log_message("="*50)
            self.log_message("开始下载包")
            
            downloaded_files = []
            total_packages = len(packages)
            download_success = True
            
            for i, package in enumerate(packages):
                self.update_progress((i/total_packages)*50)
                mirror = fastest_mirrors[i % len(fastest_mirrors)]
                success, files = self.download_package(mirror, package)
                if not success:
                    download_success = False
                    overall_success = False
                    self.log_message(f"⚠️ 包 {package} 下载失败,将继续尝试其他包")
                else:
                    downloaded_files.extend(files)
            
            if not download_success:
                self.log_message("警告: 部分包下载失败")
            
            # 3. 安装包
            self.update_status("开始安装包...")
            self.log_message("="*50)
            self.log_message("开始安装包")
            
            install_success = True
            
            for i, file in enumerate(downloaded_files):
                self.update_progress(50 + (i/len(downloaded_files))*40)
                if not self.install_package(file):
                    install_success = False
                    overall_success = False
                    self.log_message(f"⚠️ 文件 {file} 安装失败")
            
            if not install_success:
                self.log_message("警告: 部分包安装失败")
            
            # 4. 验证安装
            self.update_status("验证安装...")
            self.log_message("="*50)
            self.log_message("开始验证安装")
            
            verify_success = True
            
            for i, package in enumerate(packages):
                self.update_progress(90 + (i/len(packages))*10)
                if not self.test_installation(package):
                    verify_success = False
                    overall_success = False
                    self.log_message(f"⚠️ 包 {package} 验证失败")
            
            if not verify_success:
                self.log_message("警告: 部分包验证失败")
            
            self.update_progress(100)
            
            # 显示最终结果
            if overall_success:
                self.update_status("所有操作成功完成!")
                self.log_message("="*50)
                self.log_message("✅ 所有包下载安装完成并验证成功!")
                messagebox.showinfo("完成", "所有包下载安装完成并验证成功!")
            else:
                self.update_status("操作完成,但有错误发生")
                self.log_message("="*50)
                self.log_message("⚠️ 操作完成,但部分步骤失败,请检查日志")
                messagebox.showwarning("完成但有错误", 
                    "操作完成,但部分步骤失败,请检查日志了解详情")
            
        except Exception as e:
            self.log_message(f"❌ 发生严重错误: {str(e)}")
            self.update_status("操作因错误中止")
            messagebox.showerror("错误", f"处理过程中发生严重错误: {str(e)}")
            overall_success = False
        finally:
            # 重新启用UI
            self.set_ui_state(True)
    
    def find_fastest_mirrors(self):
        """找出最快的3个镜像源"""
        with ThreadPoolExecutor(max_workers=15) as executor:
            futures = [executor.submit(self.test_mirror_speed, mirror) for mirror in self.mirrors]
            results = []
            for future in as_completed(futures):
                speed, mirror = future.result()
                if speed != float('inf'):
                    results.append((speed, mirror))
                    self.log_message(f"测试镜像源 {mirror} 速度: {speed:.2f}秒")
        
        results.sort()
        fastest_mirrors = [mirror for speed, mirror in results[:3]]
        self.log_message(f"最快的3个镜像源: {', '.join(fastest_mirrors)}")
        return fastest_mirrors
    
    def test_mirror_speed(self, mirror):
        """测试镜像源速度"""
        try:
            start = time.time()
            response = requests.get(urljoin(mirror, "simple/"), timeout=5)
            if response.status_code == 200:
                return time.time() - start, mirror
        except:
            pass
        return float('inf'), mirror
    
    def download_package(self, mirror, package):
        """下载单个包"""
        try:
            self.log_message(f"正在从 {mirror} 下载 {package}...")
            cmd = [
                "python", "-m", "pip", "download", 
                package, 
                "-d", self.install_dir, 
                "-i", mirror, 
                "--trusted-host", mirror.split('//')[1].split('/')[0]
            ]
            
            process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
            stdout, stderr = process.communicate()
            
            if process.returncode == 0:
                # 获取下载的文件列表
                files = [f for f in os.listdir(self.install_dir) if f.startswith(package.replace("-", "_"))]
                self.log_message(f"✅ 成功下载 {package}: {', '.join(files)}")
                return True, files
            else:
                self.log_message(f"❌ 下载 {package} 失败: {stderr.strip()}")
                return False, []
        except Exception as e:
            self.log_message(f"❌ 下载 {package} 时发生错误: {str(e)}")
            return False, []
    
    def install_package(self, filename):
        """安装单个包"""
        try:
            filepath = os.path.join(self.install_dir, filename)
            self.log_message(f"正在安装 {filename}...")
            
            cmd = ["python", "-m", "pip", "install", filepath]
            process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
            stdout, stderr = process.communicate()
            
            if process.returncode == 0:
                self.log_message(f"✅ 成功安装 {filename}")
                return True
            else:
                self.log_message(f"❌ 安装 {filename} 失败: {stderr.strip()}")
                return False
        except Exception as e:
            self.log_message(f"❌ 安装 {filename} 时发生错误: {str(e)}")
            return False
    
    def test_installation(self, package):
        """测试包是否安装成功"""
        try:
            # 转换包名(如opencv-python -> opencv_python)
            import_name = package.replace("-", "_")
            self.log_message(f"正在验证 {package} 是否可以导入...")
            
            module = importlib.import_module(import_name)
            version = getattr(module, "__version__", "未知版本")
            
            self.log_message(f"✅ 验证成功: {package} (版本: {version})")
            return True
        except Exception as e:
            self.log_message(f"❌ 验证失败: 无法导入 {package} - {str(e)}")
            return False

if __name__ == "__main__":
    root = tk.Tk()
    app = PackageInstallerApp(root)
    root.mainloop()



 


网站公告

今日签到

点亮在社区的每一天
去签到