Scoooooott's picture
Fix data access logic in profile functions
89b3827
import os
import decimal
from datetime import datetime, date
from statistics import mean
import requests
import tempfile
from openpyxl import load_workbook
from frictionless import Resource, Detector
from frictionless.formats import ExcelControl
import logging
import gradio as gr
# --- 0. 日志配置 ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- 2. 核心逻辑函数 ---
def get_sheets_logic(file_url):
with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_in:
resp = requests.get(file_url, timeout=60)
f_in.write(resp.content)
tmp_path = f_in.name
wb = load_workbook(tmp_path, read_only=True)
sheets = wb.sheetnames
wb.close()
os.unlink(tmp_path)
return sheets
def profile_data_logic(file_url, sheet_name, detail_level, sample_rows=5):
"""
分析 Excel/CSV 文件并返回结构化画像。
"""
src_tmp_name = None
normalized_tmp_name = None
try:
logger.info(f"正在分析, 模式: {detail_level}")
# 1. 下载文件到临时空间
resp = requests.get(file_url, timeout=60)
resp.raise_for_status()
# 使用 .xlsx 后缀确保 openpyxl 和 frictionless 能正确识别格式
with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_in:
f_in.write(resp.content)
src_tmp_name = f_in.name
# 2. 规范化处理 (处理 Excel 公式、合并单元格)
with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_out:
normalized_tmp_name = f_out.name
# 3. Frictionless 探测配置
# 加载计算后的值 (data_only=True)
wb = load_workbook(src_tmp_name, data_only=True)
if sheet_name and sheet_name not in wb.sheetnames:
return {"status": "error", "message": f"Sheet '{sheet_name}' 不存在"}
wb.save(normalized_tmp_name)
# 2. 核心探测逻辑
control = ExcelControl(sheet=sheet_name, fill_merged_cells=True) if sheet_name else ExcelControl(fill_merged_cells=True)
detector = Detector(sample_size=1000, field_confidence=0.9)
resource = Resource(path=normalized_tmp_name, control=control, detector=detector)
resource.infer(stats=True)
with resource:
schema = resource.schema
all_rows = resource.read_rows()
total_rows = len(all_rows)
fields_analysis = []
for index, field in enumerate(schema.fields):
# --- [Basic Level] 基础信息 ---
analysis = {
"name": field.name,
"type": field.type
}
# 如果级别是 basic,直接跳过后续重型计算
if detail_level == "basic":
fields_analysis.append(analysis)
continue
# 提取非空数据用于后续分析
col_data = []
null_count = 0
for row in all_rows:
# Frictionless Row object behaves like a dictionary for field access
# row[index] works in older versions but it seems safer to use field name or direct property
# Let's try to access by field name which is safer
val = row.get(field.name)
if val is not None:
col_data.append(val)
else:
null_count += 1
# --- [Standard Level] 质量与数值统计 ---
analysis["missing_rate"] = f"{(null_count / total_rows) * 100:.2f}%" if total_rows > 0 else "0%"
if field.type in ['integer', 'number'] and col_data:
# 确保是可计算的数值
numeric_values = [
float(v) for v in col_data
if isinstance(v, (int, float, decimal.Decimal))
]
if numeric_values:
analysis["numeric_stats"] = {
"min": min(numeric_values),
"max": max(numeric_values),
"avg": round(mean(numeric_values), 2)
}
# --- [Full Level] 枚举与样本 ---
if detail_level == "full":
# 枚举检测:如果唯一值较少(<15)且有数据,则视为枚举
unique_vals = list(set(col_data))
if 0 < len(unique_vals) <= 15:
analysis["is_enumeration"] = True
analysis["enum_values"] = [str(v) for v in unique_vals]
else:
analysis["is_enumeration"] = False
# 样本提取:转换日期等特殊对象为字符串
samples = []
for v in col_data[:sample_rows]:
if isinstance(v, (datetime, date)):
samples.append(v.isoformat())
else:
samples.append(str(v))
analysis["samples"] = samples
fields_analysis.append(analysis)
# 4. 组装最终报告
report = {
"file_summary": {
"total_rows": total_rows,
"total_fields": len(fields_analysis),
"detail_level_applied": detail_level
},
"schema_analysis": fields_analysis
}
logger.info(f"分析成功: 识别到 {len(fields_analysis)} 列")
return report
except Exception as e:
logger.error(f"分析异常: {str(e)}")
return {
"status": "failed",
"error_type": type(e).__name__,
"message": str(e)
}
finally:
# 5. 清理临时文件,防止磁盘溢出
for path in (src_tmp_name, normalized_tmp_name):
if path and os.path.exists(path):
try:
os.unlink(path)
except Exception as cleanup_err:
logger.warning(f"清理临时文件失败: {cleanup_err}")
# --- 3. Gradio Interface (Will be auto-converted to MCP Tools) ---
def get_excel_structure(file_url: str):
"""
获取Excel所有工作表名称。
Args:
file_url: Excel 文件的公网 URL 地址
Returns:
list: 工作表名称列表
"""
return get_sheets_logic(file_url)
def profile_sheet_data(file_url: str, sheet_name: str = None, detail_level: str = "standard", sample_rows: int = 5):
"""
深度分析指定工作表的数据画像。
Args:
file_url: Excel 文件的公网 URL 地址
sheet_name: 要分析的工作表名称 (可选)
detail_level: 分析详细程度 ('basic', 'standard', 'full')
sample_rows: 当 detail_level 为 'full' 时,返回的样本行数
Returns:
dict: 包含数据画像的 JSON 对象
"""
return profile_data_logic(file_url, sheet_name, detail_level, sample_rows)
# 构建 UI
with gr.Blocks(title="Excel Analyzer Pro") as demo:
gr.Markdown("# 📊 Excel Analyzer Pro")
gr.Markdown("提供 Excel 文件的结构解析与深度数据画像功能。支持 MCP 协议。")
with gr.Tab("获取工作表结构"):
with gr.Row():
url_input_structure = gr.Textbox(label="Excel 文件 URL", placeholder="https://example.com/file.xlsx")
btn_structure = gr.Button("获取工作表列表", variant="primary")
output_structure = gr.JSON(label="工作表列表")
btn_structure.click(get_excel_structure, inputs=[url_input_structure], outputs=[output_structure])
with gr.Tab("生成数据画像"):
with gr.Row():
url_input_profile = gr.Textbox(label="Excel 文件 URL", placeholder="https://example.com/file.xlsx")
sheet_input_profile = gr.Textbox(label="工作表名称 (可选)")
with gr.Row():
detail_input_profile = gr.Dropdown(["basic", "standard", "full"], label="详细程度", value="standard")
sample_input_profile = gr.Number(value=5, label="样本行数 (仅 full 模式)")
btn_profile = gr.Button("生成数据画像", variant="primary")
output_profile = gr.JSON(label="数据画像报告")
btn_profile.click(
profile_sheet_data,
inputs=[url_input_profile, sheet_input_profile, detail_input_profile, sample_input_profile],
outputs=[output_profile]
)
# Hugging Face Spaces 启动入口
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860, mcp_server=True)