【Python】Python解决阿里云DataWorks导出数据1万条限制的问题

发布于:2025-09-15 ⋅ 阅读:(22) ⋅ 点赞:(0)

一、前言

在日常数据分析工作中,团队经常需要从阿里云DataWorks(原MaxCompute)中导出临时表数据进行分析或汇报,但由于受限于阿里云的安全策略,每次只能导出1万条,反复操作会很麻烦。
在这里插入图片描述

本文介绍如何用Python脚本解决方案,能够安全高效地将DataWorks中的表数据导出为Excel文件,同时处理数据中的非法字符问题。

二、脚本功能概述

主要用Python脚本实现了以下核心功能:

  1. 安全连接阿里云DataWorks(MaxCompute)

  2. 验证目标表及分区配置

  3. 读取表数据并转换为DataFrame

  4. 对特定字段进行非法字符清洗,确保不受上游脏数据影响

  5. 将清洗后的数据导出为Excel文件,可直接分析

三、核心代码解析

1. 环境配置与安全设置

这里要注意,AccessKey ID 和AccessKey Secret 是每个RAM账号独有的,要绝对保密,避免风控问题,在实际使用时,建议通过环境变量或配置文件管理这些敏感信息。

# -*- coding: utf-8 -*-
# AccessKey ID 和AccessKey Secret  
# python_version: 3.12 

2. 用户配置区

配置基本信息,因为dataworks是基于Hive引擎的,所以会有分区的概念,这里把分区概念也加入到了配置项里。

# === 用户配置区 ===
ACCESS_ID = '填写对应RAM账号的ACCESS_ID'
ACCESS_KEY = '填写对应RAM账号的ACCESS_KEY'
PROJECT_NAME = 'dataoworks项目空间名'          
ENDPOINT = 'http://service.cn-shanghai.maxcompute.aliyun.com/api'  # 阿里云对应region区域
TABLE_NAME = 'chat_msg_output'            # 我这里引用的是客户聊天记录表
OUTPUT_DIR = '/Users/admin/Downloads'  
OUTPUT_FILENAME = 'export_data.xlsx'  
IS_PARTITIONED = 0                        # 是否分区表           
PARTITION_NAME = 0                        # 分区名称 

3. 数据清洗函数

数据清洗时,定义专门用于清洗表字段中的非法字符的函数,主要处理以下问题:
- HTML换行标签<>转换为系统换行符
- 删除所有HTML标签,只保留纯文本内容
- 转义双引号,确保Excel文件兼容性
- 去除不可打印的控制字符

def clean_err_msg(text):
    """
    前面说了是聊天记录表,所以会存在表情包、特殊字符等非法字符,需要进行清洗。
    功能:处理HTML标签、转义双引号、替换非法换行符
    """
    if pd.isna(text):
        return text
    try:
        # 替换HTML换行标签为系统换行符
        text = re.sub(r'<br\s*/?>', '\n', str(text), flags=re.IGNORECASE)
        # 删除所有HTML标签
        text = re.sub(r'<.*?>', '', text)
        # 转义双引号为两个双引号,实现Excel兼容
        text = text.replace('"', '""')
        # 去除其他非打印字符
        text = re.sub(r'[\x00-\x1F\x7F-\x9F]', ' ', text)
        return text.strip()
    except Exception as e:
        print(f"回四叔: 清洗发现异常: {str(e)}")
        return text

4. 核心逻辑

核心包含以下几个关键步骤:

  • 路径校验与创建:确保输出目录存在,如果不存在则自动创建。
OUTPUT_PATH = os.path.join(OUTPUT_DIR, OUTPUT_FILENAME)
os.makedirs(OUTPUT_DIR, exist_ok=True)

  • ODPS连接与表验证
    使用提供的凭据连接DataWorks,并获取目标表对象。
odps = ODPS(ACCESS_ID, ACCESS_KEY, PROJECT_NAME, endpoint=ENDPOINT)
table = odps.get_table(TABLE_NAME)

  • 分区验证
    脚本会根据IS_PARTITIONED的值检查表是否为分区表,并验证分区格式是否正确。
  • 数据读取与清洗
with table.open_reader(partition=PARTITION_NAME if IS_PARTITIONED else None) as reader:
    df = reader.to_pandas()
    
    if 'err_msg' in df.columns:
        df['err_msg'] = df['err_msg'].apply(clean_err_msg)

  • 数据导出
df.to_excel(
    OUTPUT_PATH,
    index=False,
    engine='openpyxl',
    sheet_name='数据导出'
)

四、完整代码演示

# -*- coding: utf-8 -*-
# AccessKey ID 和AccessKey Secret 绝对保密 
# python_version: 3.12 

from odps import ODPS
import pandas as pd
import os
import re

# === 用户配置区 ===
ACCESS_ID = '填写对应RAM账号的ACCESS_ID'
ACCESS_KEY = '填写对应RAM账号的ACCESS_KEY'
PROJECT_NAME = 'dataoworks项目空间名'          
ENDPOINT = 'http://service.cn-shanghai.maxcompute.aliyun.com/api'  # 阿里云对应region区域
TABLE_NAME = 'chat_msg_output'            # 我这里引用的是客户聊天记录表
OUTPUT_DIR = '/Users/admin/Downloads'  
OUTPUT_FILENAME = 'export_data.xlsx'  
IS_PARTITIONED = 0                        # 是否分区表           
PARTITION_NAME = 0                        # 分区名称       

# ====== 核心逻辑 ====== #
def clean_err_msg(text):
    if pd.isna(text):
        return text
    try:
        # 替换HTML换行标签为系统换行符
        text = re.sub(r'<br\s*/?>', '\n', str(text), flags=re.IGNORECASE)
        # 删除所有HTML标签
        text = re.sub(r'<.*?>', '', text)
        # 转义双引号为两个双引号(Excel兼容)
        text = text.replace('"', '""')
        # 去除其他非打印字符
        text = re.sub(r'[\x00-\x1F\x7F-\x9F]', ' ', text)
        return text.strip()
    except Exception as e:
        print(f"报告四叔: 清洗发现异常: {str(e)}")
        return text

# 第1步: 路径校验
OUTPUT_PATH = os.path.join(OUTPUT_DIR, OUTPUT_FILENAME)
os.makedirs(OUTPUT_DIR, exist_ok=True) 

# 第2步: 连接ODPS
try:
    odps = ODPS(ACCESS_ID, ACCESS_KEY, PROJECT_NAME, endpoint=ENDPOINT)
    table = odps.get_table(TABLE_NAME)

    # 第3步: 分区验证
    if IS_PARTITIONED == 1:
        if not table.table_schema.partitions:
            raise ValueError("报告四叔:❌配置错误=>该表不是分区表")
        if not re.match(r"^\w+='[\w-]+'$", PARTITION_NAME):
            raise ValueError("报告四叔:❌分区条件格式错误")
        table.get_partition(PARTITION_NAME)
    elif IS_PARTITIONED == 0 and table.table_schema.partitions:
        raise ValueError("报告四叔:❌配置错误:该表是分区表")

    # 第4步: 数据导出(新增清洗逻辑)
    with table.open_reader(partition=PARTITION_NAME if IS_PARTITIONED else None) as reader:
        df = reader.to_pandas()
        
        # 新增清洗步骤
        if 'err_msg' in df.columns:
            df['err_msg'] = df['err_msg'].apply(clean_err_msg)
            
        # 导出Excel(增加编码参数)
        df.to_excel(
            OUTPUT_PATH,
            index=False,
            engine='openpyxl',
            sheet_name='数据导出'
          #  encoding='utf-8-sig'  # 编码问题处理 
        )
    
    print(f"✅ 数据已成功导出至:{OUTPUT_PATH}")

except Exception as e:
    print(f"报告四叔:❌导出失败:{str(e)}")    # 提示失败原因,便于排查问题
    if os.path.exists(OUTPUT_PATH):
        os.remove(OUTPUT_PATH)

五、总结

以上脚本解决团队经常从阿里云DataWorks导出表数据受限的问题,特别适合需要定期导出数据进行进一步分析的场景。
脚本中的数据清洗功能确保了导出的数据质量,特别是对包含HTML标签和特殊字符的字段进行了适当处理,能够涵盖大部分数据清洗的场景。

数据开发同学只需要通过简单的配置修改,便可适应不同的表结构和导出需求。