ollama调用千问2.5-vl视频图片UI界面小程序分享

发布于:2025-05-19 ⋅ 阅读:(19) ⋅ 点赞:(0)

1、问题描述:

ollama调用千问2.5-vl视频图片内容,通常用命令行工具不方便,于是做了一个python UI界面与大家分享。需要提前安装ollama,并下载千问qwen2.5vl:7b 模型,在ollama官网即可下载。 (8G-6G 显卡可用),用于识别图片信息。之前还下载了 qwen3:8b版,发现也可以此程序调用,比图片识别更快,用qwen3:8b文字直接提问,随便输入张图片即可。图片不起作用。 不知为何qwen2.5vl:7b 默认只支持cpu预处理图片,所以,图片推理的过程非常慢。qwen3:8b 默认支持gpu,速度快100倍,反应迅速,机会秒回复。这就是gpu 与cpu,推理的天壤之别吧,哈哈。南无阿弥陀佛。

如下图:
在这里插入图片描述
使用方法:很简单,

2、图片推理:

模型管理列表栏,选择相应的qwen2.5vl:7b模型,点击选择模型按钮,之后,直接在最下面,点击选择图片按钮,支持三张,太多图片推理太慢了。单张最快,cpu推理就是这样,之后,在提示词栏,输入要对图片做的推理要求,默认是描述图片内容,也可以问图片里的特殊的人事物,等等。也可以指定要求推理输出的文字字数,1000字以内没啥问题。
在这里插入图片描述

3、文字推理:

同理,在模型管理列表栏,选择相应的qwen3:8b模型,点击选择模型按钮,之后,直接在最下面,点击选择图片按钮,随便选张图,如果之前已经选了,就忽略此步。之后,在提示词栏,输入要提问的提示词即可,几千字以内似乎没啥问题。南无阿弥陀佛。

4、程序免费下载地址:

程序下载地址:https://download.csdn.net/download/tian0000hai/90856287
南无阿弥陀佛。

5、源码分享:ollama_qwen25vl_gui.py

import os
import sys
import base64
import json
import requests

import concurrent.futures

import numpy as np
import cv2  # 需要安装OpenCV库

# 确保打包后能找到证书
if getattr(sys, 'frozen', False):
    # 如果是打包后的可执行文件
    requests.utils.DEFAULT_CA_BUNDLE_PATH = os.path.join(sys._MEIPASS, 'certifi', 'cacert.pem')





import threading
import queue
import tkinter as tk
from tkinter import filedialog, ttk, messagebox, scrolledtext
from PIL import Image, ImageTk, PngImagePlugin
import subprocess
import platform
import time

class OllamaQwenGUI:
    def __init__(self, root):
        self.root = root
        self.root.title("Ollama Qwen2.5-VL 图片识别工具 - 南无阿弥陀佛")
        self.root.geometry("1000x750")
        self.root.minsize(900, 900)
        
        # 设置中文字体支持
        self.font_family = "SimHei" if sys.platform.startswith("win") else "WenQuanYi Micro Hei"
        
        # 创建Ollama API配置
        self.api_url = "http://localhost:11434/api"
        self.model_name = "qwen2.5vl:7b"
        
        # API服务进程
        self.api_process = None
        self.console_output_queue = queue.Queue()
        
        # 识别进度相关
        self.is_recognizing = False
        self.progress_queue = queue.Queue()
        
        # 创建GUI组件
        self.create_widgets()
        
        # 创建线程间通信队列
        self.queue = queue.Queue()
        
        # 初始化模型列表和API状态
        self.check_api_status()
        self.refresh_model_list()
        
        # 启动控制台输出监控线程
        self.monitor_console_output = True
        threading.Thread(target=self.read_console_output, daemon=True).start()
        
        # 启动进度更新线程
        threading.Thread(target=self.update_progress, daemon=True).start()
    
    def create_widgets(self):
        # 创建主框架
        main_frame = ttk.Frame(self.root, padding="10")
        main_frame.pack(fill=tk.BOTH, expand=True)
        
        # 创建顶部API配置区域
        config_frame = ttk.LabelFrame(main_frame, text="Ollama API 配置", padding="10")
        config_frame.pack(fill=tk.X, pady=5)
        
        ttk.Label(config_frame, text="API 地址:", font=(self.font_family, 10)).grid(row=0, column=0, sticky=tk.W, padx=5, pady=5)
        self.api_entry = ttk.Entry(config_frame, width=40, font=(self.font_family, 10))
        self.api_entry.insert(0, self.api_url)
        self.api_entry.grid(row=0, column=1, sticky=tk.W, padx=5, pady=5)
        
        ttk.Label(config_frame, text="模型名称:", font=(self.font_family, 10)).grid(row=0, column=2, sticky=tk.W, padx=5, pady=5)
        self.model_entry = ttk.Entry(config_frame, width=20, font=(self.font_family, 10))
        self.model_entry.insert(0, self.model_name)
        self.model_entry.grid(row=0, column=3, sticky=tk.W, padx=5, pady=5)
        
        # 创建API服务控制区域
        api_frame = ttk.LabelFrame(main_frame, text="API 服务控制", padding="10")
        api_frame.pack(fill=tk.X, pady=5)
        
        self.api_status_var = tk.StringVar(value="API 状态: 未知")
        self.api_status_label = ttk.Label(api_frame, textvariable=self.api_status_var, font=(self.font_family, 10))
        self.api_status_label.grid(row=0, column=0, sticky=tk.W, padx=5, pady=5)
        
        self.start_api_button = ttk.Button(api_frame, text="启动 API", command=self.start_api, width=12)
        self.start_api_button.grid(row=0, column=1, padx=5, pady=5)
        
        self.stop_api_button = ttk.Button(api_frame, text="停止 API", command=self.stop_api, width=12)
        self.stop_api_button.grid(row=0, column=2, padx=5, pady=5)
        
        self.check_api_button = ttk.Button(api_frame, text="检查状态", command=self.check_api_status, width=12)
        self.check_api_button.grid(row=0, column=3, padx=5, pady=5)
        
        # 创建模型管理区域
        model_frame = ttk.LabelFrame(main_frame, text="模型管理 -(鼠标中键滚轮可查看更多内容)", padding="10")
        model_frame.pack(fill=tk.X, pady=5)
        
        self.model_listbox = tk.Listbox(model_frame, width=40, height=5, font=(self.font_family, 10))
        self.model_listbox.grid(row=0, column=0, rowspan=2, sticky=tk.W+tk.E, padx=5, pady=5)
             
        # 添加选择按钮
        self.select_button = ttk.Button(model_frame, text="选择模型", command=self.select_model, width=12)
        self.select_button.grid(row=1, column=1, padx=5, pady=5)
        
        self.download_button = ttk.Button(model_frame, text="下载模型", command=self.download_model, width=12)
        self.download_button.grid(row=0, column=1, padx=5, pady=5)
        
        self.start_button = ttk.Button(model_frame, text="启动模型", command=self.start_model, width=12)
        self.start_button.grid(row=0, column=2, padx=5, pady=5)
        
        self.stop_button = ttk.Button(model_frame, text="停止模型", command=self.stop_model, width=12)
        self.stop_button.grid(row=0, column=3, padx=5, pady=5)
        
        self.refresh_button = ttk.Button(model_frame, text="刷新列表", command=self.refresh_model_list, width=12)
        self.refresh_button.grid(row=1, column=2, padx=5, pady=5)
        
        # 添加删除模型按钮
        self.delete_button = ttk.Button(model_frame, text="删除模型", command=self.delete_model, width=12)
        self.delete_button.grid(row=1, column=3, padx=5, pady=5)

        self.model_status_var = tk.StringVar(value="模型状态: 未知")
        self.model_status_label = ttk.Label(model_frame, textvariable=self.model_status_var, font=(self.font_family, 10))
        self.model_status_label.grid(row=1, column=4, sticky=tk.W, padx=5, pady=5)
        
        # 创建中间图片和结果区域
        middle_frame = ttk.Frame(main_frame, padding="5")
        middle_frame.pack(fill=tk.BOTH, expand=True)
        
        # 左侧图片区域
        image_frame = ttk.LabelFrame(middle_frame, text="图片预览", padding="10")
        image_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=5, pady=5)
        
        self.image_container = ttk.Frame(image_frame)
        self.image_container.pack(fill=tk.BOTH, expand=True)
        
        self.image_label = ttk.Label(self.image_container, text="请选择一张或多张图片(建议:不超超过三张)", font=(self.font_family, 12))
        self.image_label.pack(expand=True)
        
        # 右侧结果区域
        result_frame = ttk.LabelFrame(middle_frame, text="识别结果", padding="10")
        result_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=5, pady=5)
        
        # 创建结果和控制台输出的标签页
        self.result_notebook = ttk.Notebook(result_frame)
        self.result_notebook.pack(fill=tk.BOTH, expand=True)
        
        # 识别结果标签页
        self.result_tab = ttk.Frame(self.result_notebook)
        self.result_notebook.add(self.result_tab, text="识别结果")
        
        # 创建进度指示器框架
        self.progress_frame = ttk.Frame(self.result_tab)
        self.progress_frame.pack(fill=tk.X, pady=5)
        
        self.progress_var = tk.StringVar(value="准备就绪")
        self.progress_label = ttk.Label(self.progress_frame, textvariable=self.progress_var, font=(self.font_family, 10))
        self.progress_label.pack(side=tk.LEFT, padx=5)
        
        self.progress_bar = ttk.Progressbar(self.progress_frame, orient=tk.HORIZONTAL, length=100, mode='indeterminate')
        self.progress_bar.pack(side=tk.LEFT, padx=5)
        
        self.result_text = scrolledtext.ScrolledText(self.result_tab, wrap=tk.WORD, font=(self.font_family, 11), height=18)
        self.result_text.pack(fill=tk.BOTH, expand=True)
        
        # 控制台输出标签页
        self.console_tab = ttk.Frame(self.result_notebook)
        self.result_notebook.add(self.console_tab, text="控制台输出")
        
        self.console_text = scrolledtext.ScrolledText(self.console_tab, wrap=tk.WORD, font=(self.font_family, 10), height=20)
        self.console_text.pack(fill=tk.BOTH, expand=True)
        
        # 创建底部控制区域
        control_frame = ttk.Frame(main_frame, padding="10")
        control_frame.pack(fill=tk.X, pady=5)
        
        ttk.Label(control_frame, text="提示词:", font=(self.font_family, 10)).grid(row=0, column=0, sticky=tk.W, padx=5, pady=5)
        self.prompt_var = tk.StringVar(value="描述这些图片的内容")
        self.prompt_entry = ttk.Entry(control_frame, textvariable=self.prompt_var, width=60, font=(self.font_family, 10))
        self.prompt_entry.grid(row=0, column=1, sticky=tk.W, padx=5, pady=5)
        
        self.browse_button = ttk.Button(control_frame, text="选择图片", command=self.browse_image, width=12)
        self.browse_button.grid(row=0, column=2, padx=5, pady=5)
        
        self.recognize_button = ttk.Button(control_frame, text="开始识别", command=self.start_recognition, width=12)
        self.recognize_button.grid(row=0, column=3, padx=5, pady=5)
        
        # 创建状态栏
        self.status_var = tk.StringVar(value="就绪")
        self.status_bar = ttk.Label(self.root, textvariable=self.status_var, relief=tk.SUNKEN, anchor=tk.W)
        self.status_bar.pack(side=tk.BOTTOM, fill=tk.X)
    
    def browse_image(self):
        file_paths = filedialog.askopenfilenames(
            title="选择图片",
            filetypes=[("选择图片", "*.jpg;*.jpeg;*.png;*.bmp;*.webp;*.tiff;*.tif;*.gif")]
        )
        
        if file_paths:
            self.image_paths = file_paths
            self.display_images(file_paths)
            self.status_var.set(f"已选择 {len(file_paths)} 张图片")
    
    def display_images(self, file_paths):
        # 清空之前的图片显示
        for widget in self.image_container.winfo_children():
            widget.destroy()
        
        try:
            max_width = 200
            max_height = 200
            for file_path in file_paths:
                # 增加PNG文本块的最大大小限制,防止iCCP警告
                PngImagePlugin.MAX_TEXT_CHUNK = 100 * (1024**2)  # 100MB
                
                image = Image.open(file_path)
                
                # 检查图片格式,如果是PNG则尝试处理iCCP警告
                if image.format == 'PNG':
                    # 尝试移除iCCP chunk
                    try:
                        image = image.convert('RGB')
                    except Exception as e:
                        print(f"处理PNG图片时出错: {e}")
                
                # 计算调整后的尺寸
                width, height = image.size
                if width > max_width or height > max_height:
                    ratio = min(max_width/width, max_height/height)
                    width = int(width * ratio)
                    height = int(height * ratio)
                    image = image.resize((width, height), Image.LANCZOS)
                
                # 显示图片
                photo = ImageTk.PhotoImage(image)
                label = ttk.Label(self.image_container, image=photo)
                label.image = photo
                label.pack(side=tk.LEFT, padx=5, pady=5)
        
        except Exception as e:
            self.image_label.config(text=f"无法显示图片: {str(e)}")
            print(f"加载图片时出错: {e}")
    
    def start_recognition(self):
        if not hasattr(self, 'image_paths'):
            messagebox.showwarning("警告", "请先选择一张或多张图片")
            return
        
        # 检查API是否运行
        if not self.check_api_status(show_message=False):
            messagebox.showwarning("警告", "API服务未启动,请先启动API服务")
            return
        
        # 清空结果区域
        self.result_text.delete(1.0, tk.END)
        
        # 更新状态
        self.is_recognizing = True
        self.progress_var.set("准备识别...")
        self.progress_bar.start()
        
        # 更新状态栏并禁用按钮
        self.status_var.set("正在识别...")
        self.browse_button.config(state=tk.DISABLED)
        self.recognize_button.config(state=tk.DISABLED)
        
        # 在新线程中执行识别任务
        threading.Thread(target=self.perform_recognition, daemon=True).start()
    
    
            
    def encode_image_to_base64(self, image_path):
        """使用OpenCV和NumPy优化Base64编码"""
        try:
            img = cv2.imread(image_path)
            if img is None:
                # 回退到标准方法
                with open(image_path, "rb") as f:
                    return base64.b64encode(f.read()).decode('utf-8')
            
            # 检查图像编码是否成功
            success, buffer = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, 80])
            if success:
                return base64.b64encode(buffer).decode('utf-8')
            else:
                raise Exception("图像编码失败")
                
        except Exception as e:
            print(f"使用OpenCV编码失败: {e}")
            # 出错时回退到标准方法
            with open(image_path, "rb") as f:
                return base64.b64encode(f.read()).decode('utf-8')
    
    
    
    def perform_recognition(self):
        try:
            
             # 优化1: 使用线程池并行处理多个图像的Base64编码
            with concurrent.futures.ThreadPoolExecutor() as executor:
                future_to_path = {executor.submit(self.encode_image_to_base64, path): path for path in self.image_paths}
                base64_images = []
                for future in concurrent.futures.as_completed(future_to_path):
                    try:
                        base64_images.append(future.result())
                    except Exception as e:
                        print(f"编码图像时出错: {e}")
            
            
            
            # 准备API请求数据
            api_url = f"{self.api_entry.get().strip()}/generate"
            model_name = self.model_entry.get().strip()
            prompt = self.prompt_var.get().strip()
            
            # 发送进度更新
            self.progress_queue.put("正在初始化模型...")
            
            payload = {
                "model": model_name,
                "prompt": prompt,
                "stream": True,  # 启用流式输出
                "images": base64_images
            }
            
            # 发送请求到Ollama API
            self.progress_queue.put("正在发送请求...")
            response = requests.post(api_url, json=payload, stream=True)
            
            if response.status_code == 200:
                self.progress_queue.put("开始接收识别结果...")
                self.result_text.insert(tk.END, "识别结果:\n\n")
                
                # 处理流式响应
                full_response = ""
                for line in response.iter_lines():
                    if line:
                        # 解析JSON行
                        try:
                            chunk = json.loads(line)
                            if 'response' in chunk:
                                text_chunk = chunk['response']
                                full_response += text_chunk
                                self.result_text.insert(tk.END, text_chunk)
                                self.result_text.see(tk.END)  # 滚动到底部
                        except json.JSONDecodeError as e:
                            self.progress_queue.put(f"解析响应错误: {str(e)}")
                
                self.progress_queue.put("识别完成")
                self.queue.put(("success", "识别完成"))
            else:
                self.queue.put(("error", f"API请求失败: HTTP {response.status_code} - {response.text}"))
        
        
        
        
        except Exception as e:
            self.queue.put(("error", f"发生错误: {str(e)}"))
        
        finally:
            # 无论成功或失败,都要恢复UI状态
            self.is_recognizing = False
            self.progress_bar.stop()
            self.queue.put(("update_ui",))
        
        # 安排下一次队列检查
        self.root.after(100, self.check_queue)
    
    def download_model(self):
        model_name = self.model_entry.get().strip()
        if not model_name:
            messagebox.showwarning("警告", "请输入要下载的模型名称")
            return
        
        # 检查API是否运行
        if not self.check_api_status(show_message=False):
            messagebox.showwarning("警告", "API服务未启动,请先启动API服务")
            return
        
        # 更新状态栏并禁用按钮
        self.status_var.set(f"正在下载模型: {model_name}")
        self.download_button.config(state=tk.DISABLED)
        
        # 在新线程中执行下载任务
        threading.Thread(target=self.perform_model_download, args=(model_name,), daemon=True).start()
    
    def perform_model_download(self, model_name):
        try:
            api_url = f"{self.api_entry.get().strip()}/pull"
            
            payload = {
                "name": model_name,
                "stream": True  # 启用流式输出获取进度
            }
            
            # 发送请求到Ollama API
            response = requests.post(api_url, json=payload, stream=True)
            
            if response.status_code == 200:
                self.queue.put(("success", f"开始下载模型: {model_name}"))
                
                # 处理流式响应获取进度
                for line in response.iter_lines():
                    if line:
                        try:
                            chunk = json.loads(line)
                            if 'status' in chunk:
                                status = chunk['status']
                                self.queue.put(("success", f"下载状态: {status}"))
                            if 'completed' in chunk and 'total' in chunk:
                                progress = chunk['completed'] / chunk['total'] * 100
                                self.queue.put(("success", f"下载进度: {progress:.1f}%"))
                        except json.JSONDecodeError:
                            pass
                
                self.queue.put(("success", f"模型 {model_name} 下载成功"))
                self.refresh_model_list()
            else:
                self.queue.put(("error", f"模型下载失败: HTTP {response.status_code} - {response.text}"))
        
        except Exception as e:
            self.queue.put(("error", f"发生错误: {str(e)}"))
        
        finally:
            # 恢复UI状态
            self.queue.put(("update_download_ui",))
        
        # 安排下一次队列检查
        self.root.after(100, self.check_queue)
    
    def start_model(self):
        model_name = self.model_entry.get().strip()
        if not model_name:
            messagebox.showwarning("警告", "请选择要启动的模型")
            return
        
        # 检查API是否运行
        if not self.check_api_status(show_message=False):
            messagebox.showwarning("警告", "API服务未启动,请先启动API服务")
            return
        
        # 更新状态栏并禁用按钮
        self.status_var.set(f"正在启动模型: {model_name}")
        self.start_button.config(state=tk.DISABLED)
        
        # 在新线程中执行启动任务
        threading.Thread(target=self.perform_model_start, args=(model_name,), daemon=True).start()
    
    def perform_model_start(self, model_name):
        try:
            # 简单模拟启动模型 - Ollama通常在调用时自动启动模型
            # 实际可能需要调用特定API
            self.queue.put(("success", f"正在加载模型: {model_name}"))
            
            # 发送一个简单请求来触发模型加载
            api_url = f"{self.api_entry.get().strip()}/generate"
            payload = {
                "model": model_name,
                "prompt": "加载模型",
                "stream": False
            }
            
            response = requests.post(api_url, json=payload)
            
            if response.status_code == 200:
                self.queue.put(("success", f"模型 {model_name} 已启动"))
            else:
                self.queue.put(("error", f"启动模型失败: HTTP {response.status_code}"))
            
            self.update_model_status(model_name)
        
        except Exception as e:
            self.queue.put(("error", f"启动模型失败: {str(e)}"))
        
        finally:
            # 恢复UI状态
            self.queue.put(("update_start_ui",))
        
        # 安排下一次队列检查
        self.root.after(100, self.check_queue)
    
    def stop_model(self):
        model_name = self.model_entry.get().strip()
        if not model_name:
            messagebox.showwarning("警告", "请选择要停止的模型")
            return
        
        # 检查API是否运行
        if not self.check_api_status(show_message=False):
            messagebox.showwarning("警告", "API服务未启动,模型可能已停止")
            return
        
        # 更新状态栏并禁用按钮
        self.status_var.set(f"正在停止模型: {model_name} 并清空显存...")
        self.stop_button.config(state=tk.DISABLED)
        
        # 在新线程中执行停止任务
        threading.Thread(target=self.perform_model_stop, args=(model_name,), daemon=True).start()
    
    def perform_model_stop(self, model_name):
        try:
            # 执行停止模型程序
            stop_model_proc = "ollama stop " + model_name
            return_code = os.system(stop_model_proc)
            if return_code == 0:
                self.queue.put(("success", f"模型 {model_name} 已停止"))
                self.refresh_model_list()
            else:
                self.queue.put(("error", f"模型停止失败: HTTP {response.status_code} - {response.text}"))
        
        except Exception as e:
            self.queue.put(("error", f"停止模型失败: {str(e)}"))
        
        finally:
            # 恢复UI状态
            self.queue.put(("update_stop_ui",))
        
        # 安排下一次队列检查
        self.root.after(100, self.check_queue)
    
    def refresh_model_list(self):
        # 清空列表
        self.model_listbox.delete(0, tk.END)
        
        # 检查API是否运行
        if not self.check_api_status(show_message=False):
            self.queue.put(("error", "无法获取模型列表:API服务未启动"))
            return
        
        # 在新线程中获取模型列表
        threading.Thread(target=self.fetch_model_list, daemon=True).start()
    
    def fetch_model_list(self):
        try:
            api_url = f"{self.api_entry.get().strip()}/tags"
            
            # 发送请求到Ollama API
            response = requests.get(api_url)
            
            if response.status_code == 200:
                models = response.json().get("models", [])
                model_names = [model.get("name", "") for model in models]
                
                # 将模型名称添加到队列中更新UI
                self.queue.put(("update_model_list", model_names))
                
                # 更新当前模型状态
                current_model = self.model_entry.get().strip()
                self.update_model_status(current_model)
            else:
                self.queue.put(("error", f"获取模型列表失败: HTTP {response.status_code} - {response.text}"))
        
        except Exception as e:
            self.queue.put(("error", f"发生错误: {str(e)}"))
        
        # 安排下一次队列检查
        self.root.after(100, self.check_queue)
    
    def update_model_status(self, model_name):
        # 简单实现,实际应通过API查询模型状态
        # 这里假设如果模型在列表中,则认为是可用状态
        model_names = [self.model_listbox.get(i) for i in range(self.model_listbox.size())]
        
        status = "已安装" if model_name in model_names else "未安装"
        self.model_status_var.set(f"模型状态: {status}")
    
    def select_model(self):
        """从列表中选择模型并填充到模型名称输入框"""
        selected_indices = self.model_listbox.curselection()
        if not selected_indices:
            messagebox.showwarning("警告", "请先从列表中选择一个模型")
            return
        
        model_name = self.model_listbox.get(selected_indices[0])
        self.model_entry.delete(0, tk.END)
        self.model_entry.insert(0, model_name)
        self.status_var.set(f"已选择模型: {model_name}")
        self.update_model_status(model_name)
    
    def start_api(self):
        if self.api_process is not None and self.api_process.poll() is None:
            messagebox.showwarning("警告", "API服务已在运行中")
            return
        
        # 清空控制台输出
        self.console_text.delete(1.0, tk.END)
        
        # 更新状态栏并禁用按钮
        self.status_var.set("正在启动API服务...")
        self.start_api_button.config(state=tk.DISABLED)
        
        # 在新线程中启动API
        threading.Thread(target=self.perform_api_start, daemon=True).start()
    
    def perform_api_start(self):
        try:
            #os.system("set OLLAMA_CUDA=1  # Windows cmd")
            # 设置CUDA环境变量
            os.environ["OLLAMA_CUDA"] = "1"  # 明确启用CUDA
            # 获取API地址和端口
            api_full_url = self.api_entry.get().strip()
            base_url = api_full_url.rsplit('/', 1)[0]
            host, port = base_url.replace('http://', '').split(':')
            
            # 根据操作系统执行不同的命令
            if platform.system() == "Windows":
                self.api_process = subprocess.Popen(
                    f"ollama serve -h {host} -p {port}",
                    shell=True,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    text=True,
                    bufsize=1
                )
            else:
                self.api_process = subprocess.Popen(
                    f"ollama serve -h {host} -p {port}",
                    shell=True,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    preexec_fn=os.setsid,
                    text=True,
                    bufsize=1
                )
            
            # 等待服务启动(尝试连接)
            max_attempts = 10
            for attempt in range(max_attempts):
                try:
                    response = requests.get(f"{api_full_url}/tags", timeout=2)
                    if response.status_code == 200:
                        self.queue.put(("success", "API服务已启动"))
                        self.check_api_status()
                        return
                except:
                    time.sleep(1)
            
            self.queue.put(("error", "API服务启动超时"))
            
        except Exception as e:
            self.queue.put(("error", f"启动API服务失败: {str(e)}"))
        
        finally:
            # 恢复UI状态
            self.queue.put(("update_api_start_ui",))
        
        # 安排下一次队列检查
        self.root.after(100, self.check_queue)
    
    def stop_api(self):
        if self.api_process is None or self.api_process.poll() is not None:
            messagebox.showwarning("警告", "API服务未在运行中")
            self.api_process = None
            self.api_status_var.set("API 状态: 未运行")
            return
        
        # 更新状态栏并禁用按钮
        self.status_var.set("正在停止API服务...")
        self.stop_api_button.config(state=tk.DISABLED)
        
        # 在新线程中停止API
        threading.Thread(target=self.perform_api_stop, daemon=True).start()
    
    def perform_api_stop(self):
        try:
            if self.api_process is not None:
                # 终止进程
                if platform.system() == "Windows":
                    self.api_process.terminate()
                else:
                    import os
                    import signal
                    os.killpg(os.getpgid(self.api_process.pid), signal.SIGTERM)
                
                # 等待进程结束
                try:
                    self.api_process.wait(timeout=5)
                except subprocess.TimeoutExpired:
                    self.api_process.kill()
                    self.api_process.wait()
                
                self.api_process = None
            
            self.queue.put(("success", "API服务已停止"))
            self.api_status_var.set("API 状态: 未运行")
            
        except Exception as e:
            self.queue.put(("error", f"停止API服务失败: {str(e)}"))
        
        finally:
            # 恢复UI状态
            self.queue.put(("update_api_stop_ui",))
        
        # 安排下一次队列检查
        self.root.after(100, self.check_queue)
    
    def check_api_status(self, show_message=True):
        try:
            api_url = f"{self.api_entry.get().strip()}/tags"
            response = requests.get(api_url, timeout=2)
            
            if response.status_code == 200:
                self.api_status_var.set("API 状态: 运行中")
                if show_message:
                    self.queue.put(("success", "API服务正在运行"))
                return True
            else:
                self.api_status_var.set("API 状态: 未运行")
                if show_message:
                    self.queue.put(("error", f"API服务未响应: HTTP {response.status_code}"))
                return False
                
        except Exception as e:
            self.api_status_var.set("API 状态: 未运行")
            if show_message:
                self.queue.put(("error", f"无法连接到API服务: {str(e)}"))
            return False
    
    def read_console_output(self):
        """读取API进程的控制台输出并添加到队列中"""
        while self.monitor_console_output:
            if self.api_process is not None and self.api_process.poll() is None:
                # 读取标准输出
                if self.api_process.stdout:
                    for line in iter(self.api_process.stdout.readline, ''):
                        if line:
                            self.console_output_queue.put(line.strip())
                
                # 读取错误输出
                if self.api_process.stderr:
                    for line in iter(self.api_process.stderr.readline, ''):
                        if line:
                            self.console_output_queue.put(f"[错误] {line.strip()}")
            
            # 检查队列并更新UI
            self.root.after(100, self.update_console_text)
            time.sleep(0.1)
    
    def update_console_text(self):
        """从队列中获取控制台输出并更新UI"""
        while not self.console_output_queue.empty():
            try:
                line = self.console_output_queue.get()
                self.console_text.insert(tk.END, line + "\n")
                self.console_text.see(tk.END)  # 滚动到底部
            except queue.Empty:
                pass
    
    def update_progress(self):
        """更新识别进度"""
        while True:
            if not self.progress_queue.empty():
                try:
                    progress_msg = self.progress_queue.get()
                    self.progress_var.set(progress_msg)
                except queue.Empty:
                    pass
            time.sleep(0.1)
    
    def check_queue(self):
        while not self.queue.empty():
            try:
                msg = self.queue.get()
                if msg[0] == "success":
                    self.result_text.insert(tk.END, msg[1] + "\n")
                    self.status_var.set(msg[1])
                elif msg[0] == "error":
                    self.result_text.insert(tk.END, f"错误: {msg[1]}\n")
                    self.status_var.set(f"错误: {msg[1]}")
                elif msg[0] == "update_ui":
                    self.browse_button.config(state=tk.NORMAL)
                    self.recognize_button.config(state=tk.NORMAL)
                elif msg[0] == "update_download_ui":
                    self.download_button.config(state=tk.NORMAL)
                elif msg[0] == "update_start_ui":
                    self.start_button.config(state=tk.NORMAL)
                elif msg[0] == "update_stop_ui":
                    self.stop_button.config(state=tk.NORMAL)
                elif msg[0] == "update_model_list":
                    for model in msg[1]:
                        self.model_listbox.insert(tk.END, model)
                elif msg[0] == "update_api_start_ui":
                    self.start_api_button.config(state=tk.NORMAL)
                elif msg[0] == "update_api_stop_ui":
                    self.stop_api_button.config(state=tk.NORMAL)
                elif msg[0] == "update_delete_ui":
                    self.delete_button.config(state=tk.NORMAL)
            except queue.Empty:
                pass
        # 继续检查队列
        self.root.after(100, self.check_queue)

    def delete_model(self):
        model_name = self.model_entry.get().strip()
        if not model_name:
            messagebox.showwarning("警告", "请输入要删除的模型名称")
            return

        # 检查API是否运行
        if not self.check_api_status(show_message=False):
            messagebox.showwarning("警告", "API服务未启动,请先启动API服务")
            return

        # 更新状态栏并禁用按钮
        self.status_var.set(f"正在删除模型: {model_name}")
        self.delete_button.config(state=tk.DISABLED)

        # 在新线程中执行删除任务
        threading.Thread(target=self.perform_model_delete, args=(model_name,), daemon=True).start()

    def perform_model_delete(self, model_name):
        try:
            # 执行删除模型程序
            delete_model_proc = "ollama rm " + model_name
            return_code = os.system(delete_model_proc)
            if return_code == 0:
                self.queue.put(("success", f"模型 {model_name} 已删除"))
                self.refresh_model_list()
            else:
                self.queue.put(("error", f"模型删除失败: HTTP {response.status_code} - {response.text}"))
                
        except Exception as e:
            self.queue.put(("error", f"发生错误: {str(e)}"))

        finally:
            # 恢复UI状态
            self.queue.put(("update_delete_ui",))

        # 安排下一次队列检查
        self.root.after(100, self.check_queue)

if __name__ == "__main__":
    root = tk.Tk()
    app = OllamaQwenGUI(root)
    
    # 窗口关闭时确保API进程被终止
    def on_closing():
        app.monitor_console_output = False
        if app.api_process is not None and app.api_process.poll() is None:
            app.perform_api_stop()
        root.destroy()
    
    root.protocol("WM_DELETE_WINDOW", on_closing)
    root.mainloop()    

南无阿弥陀佛,哈哈。


网站公告

今日签到

点亮在社区的每一天
去签到