Spaces:
Sleeping
Sleeping
File size: 8,943 Bytes
2ccb4a8 78cd7f8 2ccb4a8 89b3827 2ccb4a8 78cd7f8 2ccb4a8 78cd7f8 2ccb4a8 78cd7f8 2ccb4a8 78cd7f8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 | import os
import decimal
from datetime import datetime, date
from statistics import mean
import requests
import tempfile
from openpyxl import load_workbook
from frictionless import Resource, Detector
from frictionless.formats import ExcelControl
import logging
import gradio as gr
# --- 0. 日志配置 ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- 2. 核心逻辑函数 ---
def get_sheets_logic(file_url):
with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_in:
resp = requests.get(file_url, timeout=60)
f_in.write(resp.content)
tmp_path = f_in.name
wb = load_workbook(tmp_path, read_only=True)
sheets = wb.sheetnames
wb.close()
os.unlink(tmp_path)
return sheets
def profile_data_logic(file_url, sheet_name, detail_level, sample_rows=5):
"""
分析 Excel/CSV 文件并返回结构化画像。
"""
src_tmp_name = None
normalized_tmp_name = None
try:
logger.info(f"正在分析, 模式: {detail_level}")
# 1. 下载文件到临时空间
resp = requests.get(file_url, timeout=60)
resp.raise_for_status()
# 使用 .xlsx 后缀确保 openpyxl 和 frictionless 能正确识别格式
with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_in:
f_in.write(resp.content)
src_tmp_name = f_in.name
# 2. 规范化处理 (处理 Excel 公式、合并单元格)
with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_out:
normalized_tmp_name = f_out.name
# 3. Frictionless 探测配置
# 加载计算后的值 (data_only=True)
wb = load_workbook(src_tmp_name, data_only=True)
if sheet_name and sheet_name not in wb.sheetnames:
return {"status": "error", "message": f"Sheet '{sheet_name}' 不存在"}
wb.save(normalized_tmp_name)
# 2. 核心探测逻辑
control = ExcelControl(sheet=sheet_name, fill_merged_cells=True) if sheet_name else ExcelControl(fill_merged_cells=True)
detector = Detector(sample_size=1000, field_confidence=0.9)
resource = Resource(path=normalized_tmp_name, control=control, detector=detector)
resource.infer(stats=True)
with resource:
schema = resource.schema
all_rows = resource.read_rows()
total_rows = len(all_rows)
fields_analysis = []
for index, field in enumerate(schema.fields):
# --- [Basic Level] 基础信息 ---
analysis = {
"name": field.name,
"type": field.type
}
# 如果级别是 basic,直接跳过后续重型计算
if detail_level == "basic":
fields_analysis.append(analysis)
continue
# 提取非空数据用于后续分析
col_data = []
null_count = 0
for row in all_rows:
# Frictionless Row object behaves like a dictionary for field access
# row[index] works in older versions but it seems safer to use field name or direct property
# Let's try to access by field name which is safer
val = row.get(field.name)
if val is not None:
col_data.append(val)
else:
null_count += 1
# --- [Standard Level] 质量与数值统计 ---
analysis["missing_rate"] = f"{(null_count / total_rows) * 100:.2f}%" if total_rows > 0 else "0%"
if field.type in ['integer', 'number'] and col_data:
# 确保是可计算的数值
numeric_values = [
float(v) for v in col_data
if isinstance(v, (int, float, decimal.Decimal))
]
if numeric_values:
analysis["numeric_stats"] = {
"min": min(numeric_values),
"max": max(numeric_values),
"avg": round(mean(numeric_values), 2)
}
# --- [Full Level] 枚举与样本 ---
if detail_level == "full":
# 枚举检测:如果唯一值较少(<15)且有数据,则视为枚举
unique_vals = list(set(col_data))
if 0 < len(unique_vals) <= 15:
analysis["is_enumeration"] = True
analysis["enum_values"] = [str(v) for v in unique_vals]
else:
analysis["is_enumeration"] = False
# 样本提取:转换日期等特殊对象为字符串
samples = []
for v in col_data[:sample_rows]:
if isinstance(v, (datetime, date)):
samples.append(v.isoformat())
else:
samples.append(str(v))
analysis["samples"] = samples
fields_analysis.append(analysis)
# 4. 组装最终报告
report = {
"file_summary": {
"total_rows": total_rows,
"total_fields": len(fields_analysis),
"detail_level_applied": detail_level
},
"schema_analysis": fields_analysis
}
logger.info(f"分析成功: 识别到 {len(fields_analysis)} 列")
return report
except Exception as e:
logger.error(f"分析异常: {str(e)}")
return {
"status": "failed",
"error_type": type(e).__name__,
"message": str(e)
}
finally:
# 5. 清理临时文件,防止磁盘溢出
for path in (src_tmp_name, normalized_tmp_name):
if path and os.path.exists(path):
try:
os.unlink(path)
except Exception as cleanup_err:
logger.warning(f"清理临时文件失败: {cleanup_err}")
# --- 3. Gradio Interface (Will be auto-converted to MCP Tools) ---
def get_excel_structure(file_url: str):
"""
获取Excel所有工作表名称。
Args:
file_url: Excel 文件的公网 URL 地址
Returns:
list: 工作表名称列表
"""
return get_sheets_logic(file_url)
def profile_sheet_data(file_url: str, sheet_name: str = None, detail_level: str = "standard", sample_rows: int = 5):
"""
深度分析指定工作表的数据画像。
Args:
file_url: Excel 文件的公网 URL 地址
sheet_name: 要分析的工作表名称 (可选)
detail_level: 分析详细程度 ('basic', 'standard', 'full')
sample_rows: 当 detail_level 为 'full' 时,返回的样本行数
Returns:
dict: 包含数据画像的 JSON 对象
"""
return profile_data_logic(file_url, sheet_name, detail_level, sample_rows)
# 构建 UI
with gr.Blocks(title="Excel Analyzer Pro") as demo:
gr.Markdown("# 📊 Excel Analyzer Pro")
gr.Markdown("提供 Excel 文件的结构解析与深度数据画像功能。支持 MCP 协议。")
with gr.Tab("获取工作表结构"):
with gr.Row():
url_input_structure = gr.Textbox(label="Excel 文件 URL", placeholder="https://example.com/file.xlsx")
btn_structure = gr.Button("获取工作表列表", variant="primary")
output_structure = gr.JSON(label="工作表列表")
btn_structure.click(get_excel_structure, inputs=[url_input_structure], outputs=[output_structure])
with gr.Tab("生成数据画像"):
with gr.Row():
url_input_profile = gr.Textbox(label="Excel 文件 URL", placeholder="https://example.com/file.xlsx")
sheet_input_profile = gr.Textbox(label="工作表名称 (可选)")
with gr.Row():
detail_input_profile = gr.Dropdown(["basic", "standard", "full"], label="详细程度", value="standard")
sample_input_profile = gr.Number(value=5, label="样本行数 (仅 full 模式)")
btn_profile = gr.Button("生成数据画像", variant="primary")
output_profile = gr.JSON(label="数据画像报告")
btn_profile.click(
profile_sheet_data,
inputs=[url_input_profile, sheet_input_profile, detail_input_profile, sample_input_profile],
outputs=[output_profile]
)
# Hugging Face Spaces 启动入口
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860, mcp_server=True)
|