Excel常用功能函数

发布于:2025-08-14 ⋅ 阅读:(14) ⋅ 点赞:(0)

Excel数据处理工具集

功能:合并文件夹、合并文件、去重处理、自定义排序等

1. 合并文件夹中所有excel

from pathlib import Path
import pandas as pd

def merge_excels(folder: str | Path) -> pd.DataFrame:
    """
    合并指定文件夹下所有 Excel 文件(支持 .xls/.xlsx/.xlsm)。
    返回一个 DataFrame;若文件夹为空或无可识别文件,返回空 DataFrame。
    """
    folder = Path(folder)
    files = sorted(folder.glob("*.xls*"))  # 通配 *.xls、*.xlsx、*.xlsm

    if not files:
        return pd.DataFrame()

    frames = [pd.read_excel(f, sheet_name=None) for f in files]  # 读取所有 sheet
    # 扁平化:把每个 sheet 拼起来,并附加来源文件名
    dfs = [
        df.assign(_src=file.name)
        for file, book in zip(files, frames)
        for sheet_name, df in book.items()
    ]
    return pd.concat(dfs, ignore_index=True)

# 使用示例
if __name__ == "__main__":
    merged = merge_excels("./data")
    merged.to_excel("./data/merged.xlsx", index=False)

2. 合并两个Excel

import pandas as pd
from pathlib import Path

def merge_excel_by_key(left_file: str | Path,
                       right_file: str | Path,
                       left_key: str,
                       right_key: str,
                       how: str = 'inner') -> pd.DataFrame:
    """
    以 left_file.x 与 right_file.y 为键合并两个 Excel
    """
    left  = pd.read_excel(left_file)
    right = pd.read_excel(right_file)
    return pd.merge(left, right, left_on=left_key, right_on=right_key, how=how)

# 使用示例
if __name__ == '__main__':
    df = merge_excel_by_key('a.xlsx', 'b.xlsx', left_key='x', right_key='y', how='left')
    df.to_excel('merged.xlsx', index=False)

3.去除Excel重复数据

import pandas as pd

def drop_duplicates(file: str,new_file: str) -> None:
    """读取 Excel → 去重 → 覆盖保存(原地修改)"""
    df = pd.read_excel(file)
    df.drop_duplicates(inplace=True)
    df.to_excel(new_file, index=False)

# 使用示例
drop_duplicates('merged.xlsx')

4.去除指定列的重复数据

import pandas as pd

def drop_duplicates_by_cols(file: str, cols: list[str], new_file: str) -> None:
    """按指定列去重后原地保存"""
    df = pd.read_excel(file)
    df.drop_duplicates(subset=cols, inplace=True)
    df.to_excel(new_file, index=False)

# 使用示例:按 ['x', 'y'] 去重
drop_duplicates_by_cols('merged.xlsx', ['x', 'y'],'merged_clean.xlsx')

5. A Excel的X列按照B Excel的Y列排序

import pandas as pd

def sort_by_filter(a_file: str, b_file: str, out_file: str) -> None:
    """
    用 B 的 name_filter 顺序对 A 的 name 列排序,并输出新文件。
    若 A 中存在 B 没有的名字,放在最后。
    """
    a = pd.read_excel(a_file)
    b = pd.read_excel(b_file)

    # 建立排序映射
    order = {name: idx for idx, name in enumerate(b['name_filter'])}
    a['_rank'] = a['name'].map(order).fillfillna(len(order))

    a.sort_values('_rank').drop(columns='_rank').to_excel(out_file, index=False)

# 用法
sort_by_filter('A.xlsx', 'B.xlsx', 'sorted_A.xlsx')
"""
Excel数据处理工具集
功能:合并文件夹、合并文件、去重处理、自定义排序等
设计原则:高内聚、低耦合、单一职责、类型提示、错误处理
"""

from pathlib import Path
import pandas as pd
from typing import Union, List, Optional, Dict


def merge_excels_from_folder(
    folder_path: Union[str, Path],
    output_file: Optional[Union[str, Path]] = None,
    include_sheets: bool = True
) -> pd.DataFrame:
    """
    合并文件夹中的所有Excel文件(包括所有工作表)
    
    Args:
        folder_path: 包含Excel文件的文件夹路径
        output_file: 合并结果输出路径(可选)
        include_sheets: 是否包含所有工作表(True包含所有,False只包含第一个工作表)
    
    Returns:
        合并后的DataFrame
        
    Raises:
        FileNotFoundError: 如果文件夹不存在
        ValueError: 如果文件夹中没有Excel文件
    """
    folder = Path(folder_path)
    if not folder.exists():
        raise FileNotFoundError(f"文件夹不存在: {folder_path}")
    
    # 支持所有Excel扩展名
    files = sorted(folder.glob("*.[xX][lL][sS]*"))
    if not files:
        raise ValueError(f"文件夹中没有Excel文件: {folder_path}")
    
    # 读取所有文件
    frames = []
    for file in files:
        try:
            if include_sheets:
                # 读取所有工作表
                sheets = pd.read_excel(file, sheet_name=None)
                for sheet_name, df in sheets.items():
                    df['_source_file'] = file.name
                    df['_source_sheet'] = sheet_name
                    frames.append(df)
            else:
                # 只读取第一个工作表
                df = pd.read_excel(file)
                df['_source_file'] = file.name
                frames.append(df)
        except Exception as e:
            print(f"警告: 文件 {file.name} 读取失败 - {str(e)}")
            continue
    
    # 合并所有数据
    if not frames:
        return pd.DataFrame()
    
    merged_df = pd.concat(frames, ignore_index=True)
    
    # 输出结果
    if output_file:
        output_file = Path(output_file)
        output_file.parent.mkdir(parents=True, exist_ok=True)
        merged_df.to_excel(output_file, index=False)
        print(f"合并完成! 共处理 {len(files)} 个文件,输出到: {output_file}")
    
    return merged_df


def merge_two_excels(
    left_file: Union[str, Path],
    right_file: Union[str, Path],
    left_key: Union[str, List[str]],
    right_key: Union[str, List[str]],
    output_file: Optional[Union[str, Path]] = None,
    merge_method: str = 'inner',
    suffixes: tuple = ('_left', '_right')
) -> pd.DataFrame:
    """
    合并两个Excel文件(类似SQL JOIN操作)
    
    Args:
        left_file: 左侧文件路径
        right_file: 右侧文件路径
        left_key: 左侧文件合并键(单列名或列名列表)
        right_key: 右侧文件合并键(单列名或列名列表)
        output_file: 输出文件路径(可选)
        merge_method: 合并方式 ('inner', 'left', 'right', 'outer')
        suffixes: 重名列后缀
        
    Returns:
        合并后的DataFrame
        
    Raises:
        FileNotFoundError: 如果输入文件不存在
        ValueError: 如果键列不存在
    """
    # 验证文件存在
    left_path = Path(left_file)
    right_path = Path(right_file)
    if not left_path.exists():
        raise FileNotFoundError(f"左侧文件不存在: {left_file}")
    if not right_path.exists():
        raise FileNotFoundError(f"右侧文件不存在: {right_file}")
    
    # 读取数据
    left_df = pd.read_excel(left_path)
    right_df = pd.read_excel(right_path)
    
    # 验证键列存在
    for key in ([left_key] if isinstance(left_key, str) else left_key):
        if key not in left_df.columns:
            raise ValueError(f"左侧文件缺少键列: {key}")
    
    for key in ([right_key] if isinstance(right_key, str) else right_key):
        if key not in right_df.columns:
            raise ValueError(f"右侧文件缺少键列: {key}")
    
    # 执行合并
    merged_df = pd.merge(
        left_df, 
        right_df, 
        left_on=left_key,
        right_on=right_key,
        how=merge_method,
        suffixes=suffixes
    )
    
    # 输出结果
    if output_file:
        output_path = Path(output_file)
        output_path.parent.mkdir(parents=True, exist_ok=True)
        merged_df.to_excel(output_path, index=False)
        print(f"文件合并完成! 左侧: {left_path.name}, 右侧: {right_path.name}, 输出到: {output_path}")
    
    return merged_df


def remove_duplicates(
    input_file: Union[str, Path],
    output_file: Optional[Union[str, Path]] = None,
    subset: Optional[Union[str, List[str]]] = None,
    keep: str = 'first',
    inplace: bool = False
) -> pd.DataFrame:
    """
    移除Excel中的重复行
    
    Args:
        input_file: 输入文件路径
        output_file: 输出文件路径(可选)
        subset: 去重依据的列(默认所有列)
        keep: 保留策略 ('first', 'last', False)
        inplace: 是否替换原文件(需要output_file=None)
        
    Returns:
        去重后的DataFrame
        
    Raises:
        ValueError: 如果inplace=True但未提供output_file
    """
    input_path = Path(input_file)
    if not input_path.exists():
        raise FileNotFoundError(f"输入文件不存在: {input_file}")
    
    # 读取数据
    df = pd.read_excel(input_path)
    
    # 执行去重
    cleaned_df = df.drop_duplicates(subset=subset, keep=keep)
    dup_count = len(df) - len(cleaned_df)
    
    # 处理输出
    if inplace:
        if output_file:
            raise ValueError("inplace模式不能同时指定output_file")
        cleaned_df.to_excel(input_path, index=False)
        print(f"去重完成! 移除 {dup_count} 行重复数据,已覆盖原文件")
        return cleaned_df
    
    output_path = output_file or input_path.with_name(f"{input_path.stem}_cleaned{input_path.suffix}")
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    cleaned_df.to_excel(output_path, index=False)
    
    print(f"去重完成! 移除 {dup_count} 行重复数据,输出到: {output_path}")
    return cleaned_df


def sort_by_template(
    data_file: Union[str, Path],
    template_file: Union[str, Path],
    data_column: str,
    template_column: str,
    output_file: Optional[Union[str, Path]] = None,
    unmatched_placement: str = 'top'
) -> pd.DataFrame:
    """
    按照模板文件顺序对数据文件排序
    
    Args:
        data_file: 待排序数据文件
        template_file: 排序模板文件
        data_column: 待排序列名
        template_column: 模板列名
        output_file: 输出文件路径(可选)
        unmatched_placement: 未匹配项位置 ('top'或'bottom')
        
    Returns:
        排序后的DataFrame
        
    Raises:
        FileNotFoundError: 如果文件不存在
        ValueError: 如果列名不存在
    """
    # 验证文件存在
    data_path = Path(data_file)
    template_path = Path(template_file)
    if not data_path.exists():
        raise FileNotFoundError(f"数据文件不存在: {data_file}")
    if not template_path.exists():
        raise FileNotFoundError(f"模板文件不存在: {template_file}")
    
    # 读取数据
    df_data = pd.read_excel(data_path)
    df_template = pd.read_excel(template_path)
    
    # 验证列存在
    if data_column not in df_data.columns:
        raise ValueError(f"数据文件缺少列: {data_column}")
    if template_column not in df_template.columns:
        raise ValueError(f"模板文件缺少列: {template_column}")
    
    # 创建排序映射
    order_map = {}
    template_values = df_template[template_column].tolist()
    for idx, value in enumerate(template_values):
        if value not in order_map:  # 避免重复值
            order_map[value] = idx
    
    # 创建排序列
    max_rank = len(order_map)
    df_data['_sort_rank'] = df_data[data_column].map(order_map)
    
    # 处理未匹配值
    if unmatched_placement == 'top':
        df_data['_sort_rank'] = df_data['_sort_rank'].fillna(-1)
    else:  # bottom
        df_data['_sort_rank'] = df_data['_sort_rank'].fillna(max_rank)
    
    # 排序并清理
    sorted_df = df_data.sort_values('_sort_rank').drop(columns='_sort_rank')
    
    # 输出结果
    if output_file:
        output_path = Path(output_file)
        output_path.parent.mkdir(parents=True, exist_ok=True)
        sorted_df.to_excel(output_path, index=False)
        print(f"排序完成! 输出到: {output_path}")
    else:
        print("排序完成! 未输出文件")
    
    return sorted_df


if __name__ == "__main__":
    """功能测试示例"""
    # 示例1: 合并文件夹
    # merge_excels_from_folder('./data', './output/merged.xlsx')
    
    # 示例2: 合并两个文件
    # merge_two_excels(
    #     'sales.xlsx', 
    #     'products.xlsx',
    #     left_key='product_id',
    #     right_key='id',
    #     output_file='sales_with_products.xlsx',
    #     merge_method='left'
    # )
    
    # 示例3: 去重处理
    # remove_duplicates('raw_data.xlsx', subset=['id'], keep='last', inplace=True)
    
    # 示例4: 按模板排序
    # sort_by_template(
    #     'employees.xlsx',
    #     'department_order.xlsx',
    #     data_column='department',
    #     template_column='dept_order',
    #     output_file='sorted_employees.xlsx',
    #     unmatched_placement='bottom'
    # )

网站公告

今日签到

点亮在社区的每一天
去签到