smart_table_parsing / profile_sheet_data.py
Scoooooott's picture
Fix data access logic in profile functions
89b3827
import json
import os
import requests
import tempfile
import decimal
from datetime import datetime, date
from statistics import mean
from openpyxl import load_workbook
from frictionless import Resource, Detector
from frictionless.formats import ExcelControl
import logging
logger = logging.getLogger(__name__)
def main(file_info, detail_level="standard", sample_rows=5):
"""
分析 Excel/CSV 文件并返回结构化画像。
:param file_info: 包含 fileUrl 和 fileKey 的字典
:param detail_level: 控制返回信息的详细程度
- 'basic': 仅包含列名和数据类型。
- 'standard': 包含 basic 内容 + 空值率 + 数值统计(最大/最小/平均)。
- 'full': 包含 standard 内容 + 枚举值检测 + 自定义数量的数据样本。
:param sample_rows: 当 detail_level 为 'full' 时,返回的样例行数。
"""
src_tmp_name = None
normalized_tmp_name = None
try:
file_url = file_info.get("fileUrl")
file_key = file_info.get("fileKey", "file")
logger.info(f"正在分析文件: {file_key}, 模式: {detail_level}")
# 1. 下载文件到临时空间
resp = requests.get(file_url, timeout=60)
resp.raise_for_status()
# 使用 .xlsx 后缀确保 openpyxl 和 frictionless 能正确识别格式
with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_in:
f_in.write(resp.content)
src_tmp_name = f_in.name
# 2. 规范化处理 (处理 Excel 公式、合并单元格)
with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_out:
normalized_tmp_name = f_out.name
# data_only=True 确保读取的是计算后的值而非公式
wb = load_workbook(src_tmp_name, data_only=True)
wb.save(normalized_tmp_name)
# 3. Frictionless 探测配置
control = ExcelControl(fill_merged_cells=True)
# 增加 header_rows 参数尝试自动探测表头
detector = Detector(sample_size=1000, field_confidence=0.9)
resource = Resource(path=normalized_tmp_name, control=control, detector=detector)
resource.infer(stats=True)
logger.info(f"Frictionless 推断的表头: {resource.schema.field_names}")
# logger.info(f"Frictionless 样本行: {resource.read_rows(limit=2)}") # read_rows argument issue
with resource:
schema = resource.schema
all_rows = resource.read_rows()
total_rows = len(all_rows)
fields_analysis = []
for index, field in enumerate(schema.fields):
# --- [Basic Level] 基础信息 ---
analysis = {
"name": field.name,
"type": field.type
}
# 如果级别是 basic,直接跳过后续重型计算
if detail_level == "basic":
fields_analysis.append(analysis)
continue
# 提取非空数据用于后续分析
col_data = []
null_count = 0
for row in all_rows:
# Frictionless Row object behaves like a dictionary for field access
val = row.get(field.name)
if val is not None:
col_data.append(val)
else:
null_count += 1
# --- [Standard Level] 质量与数值统计 ---
analysis["missing_rate"] = f"{(null_count / total_rows) * 100:.2f}%" if total_rows > 0 else "0%"
if field.type in ['integer', 'number'] and col_data:
# 确保是可计算的数值
numeric_values = [
float(v) for v in col_data
if isinstance(v, (int, float, decimal.Decimal))
]
if numeric_values:
analysis["numeric_stats"] = {
"min": min(numeric_values),
"max": max(numeric_values),
"avg": round(mean(numeric_values), 2)
}
# --- [Full Level] 枚举与样本 ---
if detail_level == "full":
# 枚举检测:如果唯一值较少(<15)且有数据,则视为枚举
unique_vals = list(set(col_data))
if 0 < len(unique_vals) <= 15:
analysis["is_enumeration"] = True
analysis["enum_values"] = [str(v) for v in unique_vals]
else:
analysis["is_enumeration"] = False
# 样本提取:转换日期等特殊对象为字符串
samples = []
for v in col_data[:sample_rows]:
if isinstance(v, (datetime, date)):
samples.append(v.isoformat())
else:
samples.append(str(v))
analysis["samples"] = samples
fields_analysis.append(analysis)
# 4. 组装最终报告
report = {
"file_summary": {
"key": file_key,
"total_rows": total_rows,
"total_fields": len(fields_analysis),
"detail_level_applied": detail_level
},
"schema_analysis": fields_analysis
}
logger.info(f"分析成功: 识别到 {len(fields_analysis)} 列")
return report
except Exception as e:
logger.error(f"分析异常: {str(e)}")
return {
"status": "failed",
"error_type": type(e).__name__,
"message": str(e)
}
finally:
# 5. 清理临时文件,防止磁盘溢出
for path in (src_tmp_name, normalized_tmp_name):
if path and os.path.exists(path):
try:
os.unlink(path)
except Exception as cleanup_err:
logger.warning(f"清理临时文件失败: {cleanup_err}")