在原有的zip文件内容基础上批量生成增量zip压缩包
每个zip包含原始文件内容和一个新生成的文本文件
代码示例
import os
import zipfile
import shutil
import concurrent.futures
from time import perf_counter
def process_single_zip(args):
"""处理单个ZIP文件的生成(线程工作函数)"""
i, original_zip, original_path, temp_dir, compression = args
txt_name = os.path.join(temp_dir, f"{i:03d}.txt")
new_zip = f"{original_zip}-{i:03d}.zip"
try:
# 1. 创建临时文本文件
with open(txt_name, 'wb') as f:
f.write(f"这是独立添加的第{i:03d}个文件\n".encode('utf-8'))
# 2. 创建新ZIP文件
with zipfile.ZipFile(new_zip, 'w', **compression) as zip_out:
# 2.1 复制原始ZIP内容
with zipfile.ZipFile(original_path, 'r') as zip_in:
for item in zip_in.infolist():
with zip_in.open(item) as src_file:
zip_out.writestr(item, src_file.read())
# 2.2 添加新文件
zip_out.write(txt_name, os.path.basename(txt_name))
return (True, new_zip)
except Exception as e:
return (False, f"{new_zip} 生成失败: {str(e)}")
def non_cumulative_zip_mt(
original_zip="source",
num_files=10,
compression_level=9,
max_workers=4
):
"""多线程版本增量ZIP生成器
参数:
original_zip: 原始ZIP名称
num_files: 生成文件数量
compression_level: 压缩级别(0-9)
max_workers: 最大线程数
"""
start_time = perf_counter()
original_path = f"{original_zip}.zip"
temp_dir = "temp_zip_files"
# 1. 准备环境
os.makedirs(temp_dir, exist_ok=True)
compression = {
'compression': zipfile.ZIP_DEFLATED if compression_level > 0 else zipfile.ZIP_STORED,
'compresslevel': min(9, max(0, compression_level))
}
# 2. 多线程处理
success_count = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 2.1 准备任务参数
tasks = [(i, original_zip, original_path, temp_dir, compression)
for i in range(1, num_files + 1)]
# 2.2 提交并处理任务
futures = [executor.submit(process_single_zip, task) for task in tasks]
# 2.3 获取结果
for future in concurrent.futures.as_completed(futures):
status, message = future.result()
if status:
print(f"✓ {message}")
success_count += 1
else:
print(f"✗ {message}")
# 3. 清理临时文件
shutil.rmtree(temp_dir, ignore_errors=True)
# 4. 性能报告
elapsed = perf_counter() - start_time
print(f"\n处理完成 | 成功: {success_count}/{num_files} | 耗时: {elapsed:.2f}s")
print(f"线程数: {max_workers} | 压缩级别: {compression_level}")
if __name__ == "__main__":
file_name = '1D-ZB-1270OBJ'
# ▶ 参数配置
non_cumulative_zip_mt(
original_zip=file_name,
num_files=10, # 生成10个文件
compression_level=6, # 平衡压缩率和速度
max_workers=10 # 使用10个线程
)