Excel数据处理工具集
功能:合并文件夹、合并文件、去重处理、自定义排序等
1. 合并文件夹中所有excel
from pathlib import Path
import pandas as pd
def merge_excels(folder: str | Path) -> pd.DataFrame:
"""
合并指定文件夹下所有 Excel 文件(支持 .xls/.xlsx/.xlsm)。
返回一个 DataFrame;若文件夹为空或无可识别文件,返回空 DataFrame。
"""
folder = Path(folder)
files = sorted(folder.glob("*.xls*")) # 通配 *.xls、*.xlsx、*.xlsm
if not files:
return pd.DataFrame()
frames = [pd.read_excel(f, sheet_name=None) for f in files] # 读取所有 sheet
# 扁平化:把每个 sheet 拼起来,并附加来源文件名
dfs = [
df.assign(_src=file.name)
for file, book in zip(files, frames)
for sheet_name, df in book.items()
]
return pd.concat(dfs, ignore_index=True)
# 使用示例
if __name__ == "__main__":
merged = merge_excels("./data")
merged.to_excel("./data/merged.xlsx", index=False)
2. 合并两个Excel
import pandas as pd
from pathlib import Path
def merge_excel_by_key(left_file: str | Path,
right_file: str | Path,
left_key: str,
right_key: str,
how: str = 'inner') -> pd.DataFrame:
"""
以 left_file.x 与 right_file.y 为键合并两个 Excel
"""
left = pd.read_excel(left_file)
right = pd.read_excel(right_file)
return pd.merge(left, right, left_on=left_key, right_on=right_key, how=how)
# 使用示例
if __name__ == '__main__':
df = merge_excel_by_key('a.xlsx', 'b.xlsx', left_key='x', right_key='y', how='left')
df.to_excel('merged.xlsx', index=False)
3.去除Excel重复数据
import pandas as pd
def drop_duplicates(file: str,new_file: str) -> None:
"""读取 Excel → 去重 → 覆盖保存(原地修改)"""
df = pd.read_excel(file)
df.drop_duplicates(inplace=True)
df.to_excel(new_file, index=False)
# 使用示例
drop_duplicates('merged.xlsx')
4.去除指定列的重复数据
import pandas as pd
def drop_duplicates_by_cols(file: str, cols: list[str], new_file: str) -> None:
"""按指定列去重后原地保存"""
df = pd.read_excel(file)
df.drop_duplicates(subset=cols, inplace=True)
df.to_excel(new_file, index=False)
# 使用示例:按 ['x', 'y'] 去重
drop_duplicates_by_cols('merged.xlsx', ['x', 'y'],'merged_clean.xlsx')
5. A Excel的X列按照B Excel的Y列排序
import pandas as pd
def sort_by_filter(a_file: str, b_file: str, out_file: str) -> None:
"""
用 B 的 name_filter 顺序对 A 的 name 列排序,并输出新文件。
若 A 中存在 B 没有的名字,放在最后。
"""
a = pd.read_excel(a_file)
b = pd.read_excel(b_file)
# 建立排序映射
order = {name: idx for idx, name in enumerate(b['name_filter'])}
a['_rank'] = a['name'].map(order).fillfillna(len(order))
a.sort_values('_rank').drop(columns='_rank').to_excel(out_file, index=False)
# 用法
sort_by_filter('A.xlsx', 'B.xlsx', 'sorted_A.xlsx')
"""
Excel数据处理工具集
功能:合并文件夹、合并文件、去重处理、自定义排序等
设计原则:高内聚、低耦合、单一职责、类型提示、错误处理
"""
from pathlib import Path
import pandas as pd
from typing import Union, List, Optional, Dict
def merge_excels_from_folder(
folder_path: Union[str, Path],
output_file: Optional[Union[str, Path]] = None,
include_sheets: bool = True
) -> pd.DataFrame:
"""
合并文件夹中的所有Excel文件(包括所有工作表)
Args:
folder_path: 包含Excel文件的文件夹路径
output_file: 合并结果输出路径(可选)
include_sheets: 是否包含所有工作表(True包含所有,False只包含第一个工作表)
Returns:
合并后的DataFrame
Raises:
FileNotFoundError: 如果文件夹不存在
ValueError: 如果文件夹中没有Excel文件
"""
folder = Path(folder_path)
if not folder.exists():
raise FileNotFoundError(f"文件夹不存在: {folder_path}")
# 支持所有Excel扩展名
files = sorted(folder.glob("*.[xX][lL][sS]*"))
if not files:
raise ValueError(f"文件夹中没有Excel文件: {folder_path}")
# 读取所有文件
frames = []
for file in files:
try:
if include_sheets:
# 读取所有工作表
sheets = pd.read_excel(file, sheet_name=None)
for sheet_name, df in sheets.items():
df['_source_file'] = file.name
df['_source_sheet'] = sheet_name
frames.append(df)
else:
# 只读取第一个工作表
df = pd.read_excel(file)
df['_source_file'] = file.name
frames.append(df)
except Exception as e:
print(f"警告: 文件 {file.name} 读取失败 - {str(e)}")
continue
# 合并所有数据
if not frames:
return pd.DataFrame()
merged_df = pd.concat(frames, ignore_index=True)
# 输出结果
if output_file:
output_file = Path(output_file)
output_file.parent.mkdir(parents=True, exist_ok=True)
merged_df.to_excel(output_file, index=False)
print(f"合并完成! 共处理 {len(files)} 个文件,输出到: {output_file}")
return merged_df
def merge_two_excels(
left_file: Union[str, Path],
right_file: Union[str, Path],
left_key: Union[str, List[str]],
right_key: Union[str, List[str]],
output_file: Optional[Union[str, Path]] = None,
merge_method: str = 'inner',
suffixes: tuple = ('_left', '_right')
) -> pd.DataFrame:
"""
合并两个Excel文件(类似SQL JOIN操作)
Args:
left_file: 左侧文件路径
right_file: 右侧文件路径
left_key: 左侧文件合并键(单列名或列名列表)
right_key: 右侧文件合并键(单列名或列名列表)
output_file: 输出文件路径(可选)
merge_method: 合并方式 ('inner', 'left', 'right', 'outer')
suffixes: 重名列后缀
Returns:
合并后的DataFrame
Raises:
FileNotFoundError: 如果输入文件不存在
ValueError: 如果键列不存在
"""
# 验证文件存在
left_path = Path(left_file)
right_path = Path(right_file)
if not left_path.exists():
raise FileNotFoundError(f"左侧文件不存在: {left_file}")
if not right_path.exists():
raise FileNotFoundError(f"右侧文件不存在: {right_file}")
# 读取数据
left_df = pd.read_excel(left_path)
right_df = pd.read_excel(right_path)
# 验证键列存在
for key in ([left_key] if isinstance(left_key, str) else left_key):
if key not in left_df.columns:
raise ValueError(f"左侧文件缺少键列: {key}")
for key in ([right_key] if isinstance(right_key, str) else right_key):
if key not in right_df.columns:
raise ValueError(f"右侧文件缺少键列: {key}")
# 执行合并
merged_df = pd.merge(
left_df,
right_df,
left_on=left_key,
right_on=right_key,
how=merge_method,
suffixes=suffixes
)
# 输出结果
if output_file:
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
merged_df.to_excel(output_path, index=False)
print(f"文件合并完成! 左侧: {left_path.name}, 右侧: {right_path.name}, 输出到: {output_path}")
return merged_df
def remove_duplicates(
input_file: Union[str, Path],
output_file: Optional[Union[str, Path]] = None,
subset: Optional[Union[str, List[str]]] = None,
keep: str = 'first',
inplace: bool = False
) -> pd.DataFrame:
"""
移除Excel中的重复行
Args:
input_file: 输入文件路径
output_file: 输出文件路径(可选)
subset: 去重依据的列(默认所有列)
keep: 保留策略 ('first', 'last', False)
inplace: 是否替换原文件(需要output_file=None)
Returns:
去重后的DataFrame
Raises:
ValueError: 如果inplace=True但未提供output_file
"""
input_path = Path(input_file)
if not input_path.exists():
raise FileNotFoundError(f"输入文件不存在: {input_file}")
# 读取数据
df = pd.read_excel(input_path)
# 执行去重
cleaned_df = df.drop_duplicates(subset=subset, keep=keep)
dup_count = len(df) - len(cleaned_df)
# 处理输出
if inplace:
if output_file:
raise ValueError("inplace模式不能同时指定output_file")
cleaned_df.to_excel(input_path, index=False)
print(f"去重完成! 移除 {dup_count} 行重复数据,已覆盖原文件")
return cleaned_df
output_path = output_file or input_path.with_name(f"{input_path.stem}_cleaned{input_path.suffix}")
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
cleaned_df.to_excel(output_path, index=False)
print(f"去重完成! 移除 {dup_count} 行重复数据,输出到: {output_path}")
return cleaned_df
def sort_by_template(
data_file: Union[str, Path],
template_file: Union[str, Path],
data_column: str,
template_column: str,
output_file: Optional[Union[str, Path]] = None,
unmatched_placement: str = 'top'
) -> pd.DataFrame:
"""
按照模板文件顺序对数据文件排序
Args:
data_file: 待排序数据文件
template_file: 排序模板文件
data_column: 待排序列名
template_column: 模板列名
output_file: 输出文件路径(可选)
unmatched_placement: 未匹配项位置 ('top'或'bottom')
Returns:
排序后的DataFrame
Raises:
FileNotFoundError: 如果文件不存在
ValueError: 如果列名不存在
"""
# 验证文件存在
data_path = Path(data_file)
template_path = Path(template_file)
if not data_path.exists():
raise FileNotFoundError(f"数据文件不存在: {data_file}")
if not template_path.exists():
raise FileNotFoundError(f"模板文件不存在: {template_file}")
# 读取数据
df_data = pd.read_excel(data_path)
df_template = pd.read_excel(template_path)
# 验证列存在
if data_column not in df_data.columns:
raise ValueError(f"数据文件缺少列: {data_column}")
if template_column not in df_template.columns:
raise ValueError(f"模板文件缺少列: {template_column}")
# 创建排序映射
order_map = {}
template_values = df_template[template_column].tolist()
for idx, value in enumerate(template_values):
if value not in order_map: # 避免重复值
order_map[value] = idx
# 创建排序列
max_rank = len(order_map)
df_data['_sort_rank'] = df_data[data_column].map(order_map)
# 处理未匹配值
if unmatched_placement == 'top':
df_data['_sort_rank'] = df_data['_sort_rank'].fillna(-1)
else: # bottom
df_data['_sort_rank'] = df_data['_sort_rank'].fillna(max_rank)
# 排序并清理
sorted_df = df_data.sort_values('_sort_rank').drop(columns='_sort_rank')
# 输出结果
if output_file:
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
sorted_df.to_excel(output_path, index=False)
print(f"排序完成! 输出到: {output_path}")
else:
print("排序完成! 未输出文件")
return sorted_df
if __name__ == "__main__":
"""功能测试示例"""
# 示例1: 合并文件夹
# merge_excels_from_folder('./data', './output/merged.xlsx')
# 示例2: 合并两个文件
# merge_two_excels(
# 'sales.xlsx',
# 'products.xlsx',
# left_key='product_id',
# right_key='id',
# output_file='sales_with_products.xlsx',
# merge_method='left'
# )
# 示例3: 去重处理
# remove_duplicates('raw_data.xlsx', subset=['id'], keep='last', inplace=True)
# 示例4: 按模板排序
# sort_by_template(
# 'employees.xlsx',
# 'department_order.xlsx',
# data_column='department',
# template_column='dept_order',
# output_file='sorted_employees.xlsx',
# unmatched_placement='bottom'
# )