【Python】Python解决阿里云DataWorks导出数据1万条限制的问题
【Python】Python解决阿里云DataWorks导出数据1万条限制的问题
- 一、前言
- 二、脚本功能概述
- 三、核心代码解析
- **1. 环境配置与安全设置**
- **2. 用户配置区**
- **3. 数据清洗函数**
- **4. 核心逻辑**
- 四、完整代码演示
- 五、总结
一、前言
在日常数据分析工作中,团队经常需要从阿里云DataWorks(原MaxCompute)中导出临时表数据进行分析或汇报,但由于受限于阿里云的安全策略,每次只能导出1万条,反复操作会很麻烦。
本文介绍如何用Python脚本解决方案,能够安全高效地将DataWorks中的表数据导出为Excel文件,同时处理数据中的非法字符问题。
二、脚本功能概述
主要用Python脚本实现了以下核心功能:
-
安全连接阿里云DataWorks(MaxCompute)
-
验证目标表及分区配置
-
读取表数据并转换为DataFrame
-
对特定字段进行非法字符清洗,确保不受上游脏数据影响
-
将清洗后的数据导出为Excel文件,可直接分析
三、核心代码解析
1. 环境配置与安全设置
这里要注意,AccessKey ID 和AccessKey Secret 是每个RAM账号独有的,要绝对保密,避免风控问题,在实际使用时,建议通过环境变量或配置文件管理这些敏感信息。
# -*- coding: utf-8 -*-
# AccessKey ID 和AccessKey Secret
# python_version: 3.12
2. 用户配置区
配置基本信息,因为dataworks是基于Hive引擎的,所以会有分区的概念,这里把分区概念也加入到了配置项里。
# === 用户配置区 ===
ACCESS_ID = '填写对应RAM账号的ACCESS_ID'
ACCESS_KEY = '填写对应RAM账号的ACCESS_KEY'
PROJECT_NAME = 'dataoworks项目空间名'
ENDPOINT = 'http://service.cn-shanghai.maxcompute.aliyun.com/api' # 阿里云对应region区域
TABLE_NAME = 'chat_msg_output' # 我这里引用的是客户聊天记录表
OUTPUT_DIR = '/Users/admin/Downloads'
OUTPUT_FILENAME = 'export_data.xlsx'
IS_PARTITIONED = 0 # 是否分区表
PARTITION_NAME = 0 # 分区名称
3. 数据清洗函数
数据清洗时,定义专门用于清洗表字段中的非法字符的函数,主要处理以下问题:
- HTML换行标签<>转换为系统换行符
- 删除所有HTML标签,只保留纯文本内容
- 转义双引号,确保Excel文件兼容性
- 去除不可打印的控制字符
def clean_err_msg(text):"""前面说了是聊天记录表,所以会存在表情包、特殊字符等非法字符,需要进行清洗。功能:处理HTML标签、转义双引号、替换非法换行符"""if pd.isna(text):return texttry:# 替换HTML换行标签为系统换行符text = re.sub(r'<br\s*/?>', '\n', str(text), flags=re.IGNORECASE)# 删除所有HTML标签text = re.sub(r'<.*?>', '', text)# 转义双引号为两个双引号,实现Excel兼容text = text.replace('"', '""')# 去除其他非打印字符text = re.sub(r'[\x00-\x1F\x7F-\x9F]', ' ', text)return text.strip()except Exception as e:print(f"回四叔: 清洗发现异常: {str(e)}")return text
4. 核心逻辑
核心包含以下几个关键步骤:
- 路径校验与创建:确保输出目录存在,如果不存在则自动创建。
OUTPUT_PATH = os.path.join(OUTPUT_DIR, OUTPUT_FILENAME)
os.makedirs(OUTPUT_DIR, exist_ok=True)
- ODPS连接与表验证
使用提供的凭据连接DataWorks,并获取目标表对象。
odps = ODPS(ACCESS_ID, ACCESS_KEY, PROJECT_NAME, endpoint=ENDPOINT)
table = odps.get_table(TABLE_NAME)
- 分区验证
脚本会根据IS_PARTITIONED的值检查表是否为分区表,并验证分区格式是否正确。 - 数据读取与清洗
with table.open_reader(partition=PARTITION_NAME if IS_PARTITIONED else None) as reader:df = reader.to_pandas()if 'err_msg' in df.columns:df['err_msg'] = df['err_msg'].apply(clean_err_msg)
- 数据导出
df.to_excel(OUTPUT_PATH,index=False,engine='openpyxl',sheet_name='数据导出'
)
四、完整代码演示
# -*- coding: utf-8 -*-
# AccessKey ID 和AccessKey Secret 绝对保密
# python_version: 3.12 from odps import ODPS
import pandas as pd
import os
import re# === 用户配置区 ===
ACCESS_ID = '填写对应RAM账号的ACCESS_ID'
ACCESS_KEY = '填写对应RAM账号的ACCESS_KEY'
PROJECT_NAME = 'dataoworks项目空间名'
ENDPOINT = 'http://service.cn-shanghai.maxcompute.aliyun.com/api' # 阿里云对应region区域
TABLE_NAME = 'chat_msg_output' # 我这里引用的是客户聊天记录表
OUTPUT_DIR = '/Users/admin/Downloads'
OUTPUT_FILENAME = 'export_data.xlsx'
IS_PARTITIONED = 0 # 是否分区表
PARTITION_NAME = 0 # 分区名称 # ====== 核心逻辑 ====== #
def clean_err_msg(text):if pd.isna(text):return texttry:# 替换HTML换行标签为系统换行符text = re.sub(r'<br\s*/?>', '\n', str(text), flags=re.IGNORECASE)# 删除所有HTML标签text = re.sub(r'<.*?>', '', text)# 转义双引号为两个双引号(Excel兼容)text = text.replace('"', '""')# 去除其他非打印字符text = re.sub(r'[\x00-\x1F\x7F-\x9F]', ' ', text)return text.strip()except Exception as e:print(f"报告四叔: 清洗发现异常: {str(e)}")return text# 第1步: 路径校验
OUTPUT_PATH = os.path.join(OUTPUT_DIR, OUTPUT_FILENAME)
os.makedirs(OUTPUT_DIR, exist_ok=True) # 第2步: 连接ODPS
try:odps = ODPS(ACCESS_ID, ACCESS_KEY, PROJECT_NAME, endpoint=ENDPOINT)table = odps.get_table(TABLE_NAME)# 第3步: 分区验证if IS_PARTITIONED == 1:if not table.table_schema.partitions:raise ValueError("报告四叔:❌配置错误=>该表不是分区表")if not re.match(r"^\w+='[\w-]+'$", PARTITION_NAME):raise ValueError("报告四叔:❌分区条件格式错误")table.get_partition(PARTITION_NAME)elif IS_PARTITIONED == 0 and table.table_schema.partitions:raise ValueError("报告四叔:❌配置错误:该表是分区表")# 第4步: 数据导出(新增清洗逻辑)with table.open_reader(partition=PARTITION_NAME if IS_PARTITIONED else None) as reader:df = reader.to_pandas()# 新增清洗步骤if 'err_msg' in df.columns:df['err_msg'] = df['err_msg'].apply(clean_err_msg)# 导出Excel(增加编码参数)df.to_excel(OUTPUT_PATH,index=False,engine='openpyxl',sheet_name='数据导出'# encoding='utf-8-sig' # 编码问题处理 )print(f"✅ 数据已成功导出至:{OUTPUT_PATH}")except Exception as e:print(f"报告四叔:❌导出失败:{str(e)}") # 提示失败原因,便于排查问题if os.path.exists(OUTPUT_PATH):os.remove(OUTPUT_PATH)
五、总结
以上脚本解决团队经常从阿里云DataWorks导出表数据受限的问题,特别适合需要定期导出数据进行进一步分析的场景。
脚本中的数据清洗功能确保了导出的数据质量,特别是对包含HTML标签和特殊字符的字段进行了适当处理,能够涵盖大部分数据清洗的场景。
数据开发同学只需要通过简单的配置修改,便可适应不同的表结构和导出需求。