import sys
import json
import pyodbc
import re
import os
import chardet
from PyQt5.QtWidgets import (QApplication, QMainWindow, QPushButton, QFileDialog, QLabel, QHBoxLayout, QVBoxLayout,
QWidget, QMessageBox, QCheckBox, QFormLayout, QDialog, QDialogButtonBox, QScrollArea,
QFrame, QGridLayout, QTextEdit)
from PyQt5.QtCore import Qt
from PyQt5.QtGui import QGuiApplication
class SQLUploader(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle('SQL Server 存储过程上传工具')
self.setGeometry(100, 100, 600, 400)
self.center()
self.init_ui()
self.load_config()
self.source_connection = None
self.target_connections = []
self.uploaded_file_paths = []
self.procedures_list = []
def init_ui(self):
"""初始化用户界面"""
self.connect_source_button = QPushButton('连接源数据库')
self.connect_source_button.clicked.connect(self.connect_source_database)
self.connect_source_button.setFixedSize(120, 40)
self.select_target_button = QPushButton('选择目标数据库')
self.select_target_button.clicked.connect(self.select_and_connect_target_databases)
self.select_target_button.setFixedSize(120, 40)
self.upload_button = QPushButton('上传存储过程')
self.upload_button.clicked.connect(self.upload_procedures)
self.upload_button.setEnabled(False)
self.upload_button.setFixedSize(120, 40)
self.update_button = QPushButton('更新存储过程')
self.update_button.clicked.connect(self.update_procedures)
self.update_button.setEnabled(False)
self.update_button.setFixedSize(120, 40)
self.get_procedures_button = QPushButton('获取源存储过程')
self.get_procedures_button.clicked.connect(self.get_source_procedures)
self.get_procedures_button.setEnabled(False)
self.get_procedures_button.setFixedSize(120, 40)
self.batch_update_button = QPushButton('批量更新存储过程')
self.batch_update_button.clicked.connect(self.batch_update_procedures)
self.batch_update_button.setEnabled(False)
self.batch_update_button.setFixedSize(120, 40)
self.source_db_status_label = QLabel('源数据库连接状态: <br>尚未连接')
self.target_db_status_label = QLabel('目标数据库连接状态: <br>尚未连接')
self.file_status_label = QLabel('文件上传状态: <br>尚未上传')
self.update_status_label = QLabel('更新状态: 尚未更新')
self.status_scroll_area = QScrollArea()
self.status_scroll_area.setWidgetResizable(True)
self.status_scroll_area.setMaximumHeight(150)
self.status_scroll_area.setFrameShape(QFrame.NoFrame)
status_top_layout = QHBoxLayout()
status_top_layout.addWidget(self.target_db_status_label)
status_top_layout.addWidget(self.source_db_status_label)
status_top_layout.addWidget(self.file_status_label)
status_layout = QVBoxLayout()
status_layout.addLayout(status_top_layout)
status_layout.addWidget(self.update_status_label)
status_content = QWidget()
status_content.setLayout(status_layout)
self.status_scroll_area.setWidget(status_content)
button_layout = QGridLayout()
button_layout.addWidget(self.select_target_button, 0, 0)
button_layout.addWidget(self.upload_button, 0, 1)
button_layout.addWidget(self.update_button, 0, 2)
button_layout.addWidget(self.connect_source_button, 1, 0)
button_layout.addWidget(self.get_procedures_button, 1, 1)
button_layout.addWidget(self.batch_update_button, 1, 2)
main_layout = QVBoxLayout()
main_layout.addLayout(button_layout)
main_layout.addWidget(self.status_scroll_area)
container = QWidget()
container.setLayout(main_layout)
self.setCentralWidget(container)
def update_status_labels(self, proc_names):
"""更新状态标签,纵向排列存储过程名称"""
self.file_status_label.setText('文件上传状态: 尚未上传')
if proc_names:
proc_names_text = "<br>".join(proc_names)
self.file_status_label.setText(f'文件上传状态: 已上传存储过程:<br>{proc_names_text}')
def center(self):
"""将窗口居中显示"""
screen = QGuiApplication.primaryScreen()
rect = screen.availableGeometry()
size = self.geometry()
self.move((rect.width() - size.width()) // 2, (rect.height() - size.height()) // 2)
def load_config(self):
"""从 config.json 文件中加载数据库配置"""
try:
if getattr(sys, 'frozen', False):
config_path = os.path.join(sys._MEIPASS, 'config.json')
else:
config_path = 'config.json'
with open(config_path, 'r', encoding='utf-8') as file:
self.config = json.load(file)
self.source_db_config = self.config.get('source_database', {})
self.target_db_configs = self.config.get('target_databases', [])
except Exception as e:
QMessageBox.critical(self, '错误', f'加载配置失败: {e}')
sys.exit(1)
def build_connection_string(self, db_config):
"""根据数据库配置构建连接字符串"""
return f"DRIVER={{SQL Server}};SERVER={db_config['host']};DATABASE={db_config['database']};UID={db_config['user']};PWD={db_config['password']}"
def connect_source_database(self):
"""连接源数据库"""
if self.source_connection:
self.disconnect_source_database()
self.source_db_status_label.setText('源数据库连接状态: 连接中...')
db_config = self.source_db_config
conn_str = self.build_connection_string(db_config)
try:
with pyodbc.connect(conn_str) as conn:
self.source_connection = conn
self.source_db_status_label.setText(
f'源数据库连接状态: <br>{self.source_db_config["name"]} 连接成功')
self.get_procedures_button.setEnabled(True)
except pyodbc.Error as e:
self.source_db_status_label.setText(f'源数据库连接状态: <br>连接失败 - {str(e)}')
QMessageBox.critical(self, '错误', f'连接源数据库失败: {e}')
except Exception as e:
self.source_db_status_label.setText(f'源数据库连接状态: <br>连接失败 - {str(e)}')
QMessageBox.critical(self, '错误', f'连接源数据库失败: {e}')
def select_and_connect_target_databases(self):
"""显示目标数据库选择对话框并连接所选的数据库"""
if not self.target_db_configs:
QMessageBox.warning(self, '警告', '没有目标数据库配置可供选择。')
return
dialog = QDialog(self)
dialog.setWindowTitle('选择目标数据库')
dialog.setGeometry(200, 200, 300, 300)
self.center_dialog(dialog)
grid_layout = QGridLayout()
grid_layout.setHorizontalSpacing(5)
grid_layout.setVerticalSpacing(5)
select_all_button = QPushButton('全选')
select_all_button.clicked.connect(self.select_all_checkboxes)
select_all_button.setFixedSize(100, 40)
clear_all_button = QPushButton('清空')
clear_all_button.clicked.connect(self.clear_all_checkboxes)
clear_all_button.setFixedSize(100, 40)
button_layout = QHBoxLayout()
button_layout.addWidget(select_all_button)
button_layout.addWidget(clear_all_button)
self.checkboxes = {}
max_items_per_column = 10
num_columns = (len(self.target_db_configs) + max_items_per_column - 1) // max_items_per_column
for index, db in enumerate(self.target_db_configs):
row = index % max_items_per_column
col = index // max_items_per_column
checkbox = QCheckBox(db['name'])
grid_layout.addWidget(checkbox, row, col)
self.checkboxes[db['name']] = (db, checkbox)
button_box = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel)
button_box.button(QDialogButtonBox.Ok).setText('连接')
button_box.button(QDialogButtonBox.Cancel).setText('关闭')
button_box.accepted.connect(self.check_and_connect_selected_target_databases)
button_box.rejected.connect(dialog.reject)
main_layout = QVBoxLayout()
main_layout.addLayout(button_layout)
main_layout.addLayout(grid_layout)
main_layout.addWidget(button_box)
dialog.setLayout(main_layout)
dialog.exec_()
def check_and_connect_selected_target_databases(self):
"""检查是否选择了目标数据库,并连接所选的数据库"""
selected_databases = [name for name, (_, checkbox) in self.checkboxes.items() if checkbox.isChecked()]
if not selected_databases:
QMessageBox.warning(self, '警告', '请至少选择一个目标数据库。')
return
self.connect_selected_target_databases(self.sender().parent())
def select_all_checkboxes(self):
"""全选所有复选框"""
for checkbox in self.checkboxes.values():
checkbox[1].setChecked(True)
def clear_all_checkboxes(self):
"""清空所有复选框"""
for checkbox in self.checkboxes.values():
checkbox[1].setChecked(False)
def center_dialog(self, dialog):
"""将对话框居中显示"""
screen = QGuiApplication.primaryScreen()
rect = screen.availableGeometry()
size = dialog.geometry()
dialog.move((rect.width() - size.width()) // 2, (rect.height() - size.height()) // 2)
def connect_selected_target_databases(self, dialog):
"""连接用户选择的目标数据库"""
if self.target_connections:
self.disconnect_all_target_databases()
self.target_db_status_label.setText('目标数据库连接状态: 连接中...')
self.target_connections = []
connection_results = []
for name, (db_config, checkbox) in self.checkboxes.items():
if checkbox.isChecked():
conn_str = self.build_connection_string(db_config)
try:
with pyodbc.connect(conn_str) as conn:
self.target_connections.append((name, conn_str))
connection_results.append(f'{name} 连接成功')
except pyodbc.Error as e:
connection_results.append(f'连接失败 {name} 数据库 - {str(e)}')
except Exception as e:
connection_results.append(f'连接失败 {name} 数据库 - {str(e)}')
dialog.accept()
if self.target_connections:
self.upload_button.setEnabled(True)
self.target_db_status_label.setText('目标数据库连接状态:<br>' + '<br>'.join(connection_results))
else:
self.update_button.setEnabled(False)
self.upload_button.setEnabled(False)
self.batch_update_button.setEnabled(False)
self.target_db_status_label.setText('目标数据库连接状态: 所有目标数据库连接失败。')
QMessageBox.information(self, '连接结果', '\n'.join(connection_results))
def disconnect_all_target_databases(self):
"""断开所有目标数据库连接"""
self.target_connections = []
self.target_db_status_label.setText('目标数据库连接状态: 所有目标数据库连接已断开。')
def detect_file_encoding(self, file_path):
"""检测文件编码"""
try:
with open(file_path, 'rb') as file:
raw_data = file.read(10000)
result = chardet.detect(raw_data)
return result['encoding']
except Exception as e:
QMessageBox.critical(self, '错误', f'检测文件编码失败: {e}')
return None
def read_stored_procedure(self, file_path):
"""从文件中读取 SQL 存储过程"""
try:
encoding = self.detect_file_encoding(file_path)
if encoding is None:
return ''
with open(file_path, 'r', encoding=encoding) as file:
return file.read()
except Exception as e:
QMessageBox.critical(self, '错误', f'读取存储过程文件失败: {e}')
return ''
def clean_sql_script(self, sql_script):
"""清理 SQL 脚本中的不必要的部分"""
cleaned_script = re.sub(r'^USE \[.*?\]\s*', '', sql_script, flags=re.IGNORECASE | re.MULTILINE)
cleaned_script = re.sub(r'\s*GO\s*', '', cleaned_script, flags=re.IGNORECASE)
cleaned_script = re.sub(r'^SET ANSI_NULLS ON\s*', '', cleaned_script, flags=re.IGNORECASE | re.MULTILINE)
cleaned_script = re.sub(r'^SET QUOTED_IDENTIFIER ON\s*', '', cleaned_script, flags=re.IGNORECASE | re.MULTILINE)
match = re.match(r'^(?:CREATE|ALTER)\s+PROCEDURE', cleaned_script, re.IGNORECASE)
if match:
cleaned_script = cleaned_script.strip()
return cleaned_script
def extract_procedure_name(self, proc_sql):
"""从 SQL 脚本中提取存储过程名称"""
match = re.search(r'(?:CREATE|ALTER)\s+PROCEDURE\s+\[?dbo\]?\.\[?([^\]]+)\]?', proc_sql, re.IGNORECASE)
return match.group(1) if match else '未知存储过程'
def upload_procedures(self):
"""处理文件对话框并上传多个存储过程"""
options = QFileDialog.Options()
file_paths, _ = QFileDialog.getOpenFileNames(self, '打开 SQL 文件', '', 'SQL 文件 (*.sql);;所有文件 (*)',
options=options)
if file_paths:
self.proc_names = []
self.update_status_labels(self.proc_names)
self.uploaded_file_paths = file_paths
for file_path in file_paths:
try:
proc_sql = self.read_stored_procedure(file_path)
if not proc_sql:
continue
set_statements = []
set_statements.extend(re.findall(r'^SET ANSI_NULLS ON\s*', proc_sql, re.IGNORECASE | re.MULTILINE))
set_statements.extend(
re.findall(r'^SET QUOTED_IDENTIFIER ON\s*', proc_sql, re.IGNORECASE | re.MULTILINE))
cleaned_sql = self.clean_sql_script(proc_sql)
if not cleaned_sql:
QMessageBox.warning(self, '警告', f'存储过程脚本为空或无效: {file_path}')
continue
final_sql = '\n'.join(set_statements) + '\n' + cleaned_sql
proc_name = self.extract_procedure_name(cleaned_sql)
self.proc_names.append(proc_name)
self.update_status_labels(self.proc_names)
except Exception as e:
QMessageBox.critical(self, '错误', f'上传存储过程失败: {e}')
print(f"上传存储过程失败: {e}")
if self.proc_names:
self.update_button.setEnabled(True)
QMessageBox.information(self, '信息', f'{len(self.proc_names)} 个存储过程已准备好进行更新。')
def upload_stored_procedure(self, connection_str, proc_name, proc_sql):
"""将存储过程上传到数据库"""
try:
with pyodbc.connect(connection_str) as conn:
cursor = conn.cursor()
check_proc_sql = f"""
IF OBJECT_ID('{proc_name}', 'P') IS NOT NULL
SELECT 1
ELSE
SELECT 0
"""
cursor.execute(check_proc_sql)
exists = cursor.fetchone()[0]
if exists:
drop_proc_sql = f"IF OBJECT_ID('{proc_name}', 'P') IS NOT NULL DROP PROCEDURE {proc_name};"
cursor.execute(drop_proc_sql)
conn.commit()
proc_sql = self.replace_alter_with_create(proc_sql)
else:
proc_sql = self.replace_alter_with_create(proc_sql)
cursor.execute(proc_sql)
conn.commit()
return True, ""
except pyodbc.Error as e:
print(f"数据库错误: {e}")
return False, str(e)
except Exception as e:
print(f"其他错误: {e}")
return False, str(e)
def replace_alter_with_create(self, sql_script):
"""将 ALTER PROCEDURE 替换为 CREATE PROCEDURE"""
altered_script = re.sub(
r'\bALTER\s+PROCEDURE\b',
'CREATE PROCEDURE',
sql_script,
flags=re.IGNORECASE
)
return altered_script
def update_procedures(self):
"""更新所有连接的目标数据库中的存储过程"""
if not self.target_connections or not self.uploaded_file_paths:
QMessageBox.warning(self, '警告', '请确保已连接目标数据库并上传存储过程文件。')
return
success = True
update_results = {}
failed_procs = {}
for file_path in self.uploaded_file_paths:
proc_sql = self.read_stored_procedure(file_path)
if not proc_sql:
QMessageBox.warning(self, '警告', f'读取存储过程 SQL 脚本失败: {file_path}')
continue
cleaned_sql = self.clean_sql_script(proc_sql)
if not cleaned_sql:
QMessageBox.warning(self, '警告', f'存储过程脚本为空或无效: {file_path}')
continue
proc_name = self.extract_procedure_name(cleaned_sql)
for name, conn_str in self.target_connections:
result, error_msg = self.upload_stored_procedure(conn_str, proc_name, cleaned_sql)
if result:
if name not in update_results:
update_results[name] = {'success': 0, 'failure': 0}
update_results[name]['success'] += 1
else:
success = False
if name not in failed_procs:
failed_procs[name] = []
failed_procs[name].append(f"存储过程 '{proc_name}': {error_msg}")
if name not in update_results:
update_results[name] = {'success': 0, 'failure': 0}
update_results[name]['failure'] += 1
update_info = []
for name, stats in update_results.items():
update_info.append(f"{name} 数据库: 更新成功 {stats['success']} 条, 更新失败 {stats['failure']} 条")
if failed_procs:
for name, errors in failed_procs.items():
update_info.append(f"{name} 数据库更新失败的存储过程:")
update_info.extend(errors)
QMessageBox.information(self, '更新结果', '\n'.join(update_info))
self.update_status_label.setText('<br>'.join(update_info))
def get_source_procedures(self):
"""获取源数据库中所有存储过程的内容,排除以 sp_ 开头的存储过程"""
if not self.source_connection:
QMessageBox.warning(self, '警告', '请先连接到源数据库。')
return
cursor = self.source_connection.cursor()
try:
cursor.execute("""
SELECT
p.name AS ProcedureName,
sm.definition AS ProcedureDefinition
FROM sys.procedures p
JOIN sys.sql_modules sm ON p.object_id = sm.object_id
WHERE p.name NOT LIKE 'sp_%'
ORDER BY p.name
""")
procedures = cursor.fetchall()
self.procedures_list = [(row.ProcedureName, row.ProcedureDefinition) for row in procedures]
proc_names = [row.ProcedureName for row in procedures]
self.show_procedures_dialog(proc_names)
self.batch_update_button.setEnabled(True)
except pyodbc.Error as e:
QMessageBox.critical(self, '错误', f'获取存储过程失败: {e}')
except Exception as e:
QMessageBox.critical(self, '错误', f'获取存储过程失败: {e}')
def show_procedures_dialog(self, proc_names):
"""显示存储过程名称的对话框"""
dialog = QDialog(self)
dialog.setWindowTitle('存储过程列表')
layout = QVBoxLayout()
text_edit = QTextEdit()
text_edit.setReadOnly(True)
text_edit.setText("\n".join(proc_names))
text_edit.setMaximumHeight(200)
text_edit.setVerticalScrollBarPolicy(Qt.ScrollBarAlwaysOn)
layout.addWidget(text_edit)
button_box = QDialogButtonBox(QDialogButtonBox.Ok)
button_box.accepted.connect(dialog.accept)
layout.addWidget(button_box)
dialog.setLayout(layout)
dialog.exec_()
def batch_update_procedures(self):
"""批量更新存储过程到目标数据库"""
if not self.source_connection:
QMessageBox.warning(self, '警告', '请先连接到源数据库。')
return
if not self.target_connections:
QMessageBox.warning(self, '警告', '请连接到目标数据库。')
return
if not self.procedures_list:
QMessageBox.warning(self, '警告', '请先获取源数据库中的存储过程。')
return
successful_updates = 0
failed_procs = []
for proc_name, proc_definition in self.procedures_list:
proc_successful = True
for _, conn_str in self.target_connections:
try:
with pyodbc.connect(conn_str) as conn:
cursor = conn.cursor()
cursor.execute(f"IF OBJECT_ID('{proc_name}', 'P') IS NOT NULL DROP PROCEDURE {proc_name}")
cursor.execute(proc_definition)
conn.commit()
except pyodbc.Error as e:
proc_successful = False
break
except Exception as e:
proc_successful = False
break
if proc_successful:
successful_updates += 1
else:
failed_procs.append(proc_name)
if len(failed_procs) == 0:
QMessageBox.information(self, '成功', '存储过程已成功批量更新到所有目标数据库。')
else:
message = f'批量更新完成: 成功更新 {successful_updates} 条, 失败 {len(failed_procs)} 条'
failed_procs_str = "<br>".join(failed_procs)
message += f'<br>失败的存储过程:<br>{failed_procs_str}'
QMessageBox.warning(self, '部分成功', message)
self.update_status_label.setText(f'成功批量更新 {successful_updates} 条, 失败 {len(failed_procs)} 条')
def closeEvent(self, event):
"""关闭事件,确保程序退出时断开所有数据库连接"""
self.disconnect_all_target_databases()
event.accept()
if __name__ == '__main__':
app = QApplication(sys.argv)
window = SQLUploader()
window.show()
sys.exit(app.exec_())
config.json配置文件
{
"source_database": {
"name": "源数据库",
"host": "",
"database": "",
"user": "",
"password": ""
},
"target_databases": [
{
"name": "目标数据库",
"host": "",
"database": "",
"user": "",
"password": ""
}
]
}