Spaces:
Sleeping
Sleeping
| import os | |
| import decimal | |
| from datetime import datetime, date | |
| from statistics import mean | |
| import requests | |
| import tempfile | |
| from openpyxl import load_workbook | |
| from frictionless import Resource, Detector | |
| from frictionless.formats import ExcelControl | |
| import logging | |
| import gradio as gr | |
| # --- 0. 日志配置 --- | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # --- 2. 核心逻辑函数 --- | |
| def get_sheets_logic(file_url): | |
| with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_in: | |
| resp = requests.get(file_url, timeout=60) | |
| f_in.write(resp.content) | |
| tmp_path = f_in.name | |
| wb = load_workbook(tmp_path, read_only=True) | |
| sheets = wb.sheetnames | |
| wb.close() | |
| os.unlink(tmp_path) | |
| return sheets | |
| def profile_data_logic(file_url, sheet_name, detail_level, sample_rows=5): | |
| """ | |
| 分析 Excel/CSV 文件并返回结构化画像。 | |
| """ | |
| src_tmp_name = None | |
| normalized_tmp_name = None | |
| try: | |
| logger.info(f"正在分析, 模式: {detail_level}") | |
| # 1. 下载文件到临时空间 | |
| resp = requests.get(file_url, timeout=60) | |
| resp.raise_for_status() | |
| # 使用 .xlsx 后缀确保 openpyxl 和 frictionless 能正确识别格式 | |
| with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_in: | |
| f_in.write(resp.content) | |
| src_tmp_name = f_in.name | |
| # 2. 规范化处理 (处理 Excel 公式、合并单元格) | |
| with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_out: | |
| normalized_tmp_name = f_out.name | |
| # 3. Frictionless 探测配置 | |
| # 加载计算后的值 (data_only=True) | |
| wb = load_workbook(src_tmp_name, data_only=True) | |
| if sheet_name and sheet_name not in wb.sheetnames: | |
| return {"status": "error", "message": f"Sheet '{sheet_name}' 不存在"} | |
| wb.save(normalized_tmp_name) | |
| # 2. 核心探测逻辑 | |
| control = ExcelControl(sheet=sheet_name, fill_merged_cells=True) if sheet_name else ExcelControl(fill_merged_cells=True) | |
| detector = Detector(sample_size=1000, field_confidence=0.9) | |
| resource = Resource(path=normalized_tmp_name, control=control, detector=detector) | |
| resource.infer(stats=True) | |
| with resource: | |
| schema = resource.schema | |
| all_rows = resource.read_rows() | |
| total_rows = len(all_rows) | |
| fields_analysis = [] | |
| for index, field in enumerate(schema.fields): | |
| # --- [Basic Level] 基础信息 --- | |
| analysis = { | |
| "name": field.name, | |
| "type": field.type | |
| } | |
| # 如果级别是 basic,直接跳过后续重型计算 | |
| if detail_level == "basic": | |
| fields_analysis.append(analysis) | |
| continue | |
| # 提取非空数据用于后续分析 | |
| col_data = [] | |
| null_count = 0 | |
| for row in all_rows: | |
| # Frictionless Row object behaves like a dictionary for field access | |
| # row[index] works in older versions but it seems safer to use field name or direct property | |
| # Let's try to access by field name which is safer | |
| val = row.get(field.name) | |
| if val is not None: | |
| col_data.append(val) | |
| else: | |
| null_count += 1 | |
| # --- [Standard Level] 质量与数值统计 --- | |
| analysis["missing_rate"] = f"{(null_count / total_rows) * 100:.2f}%" if total_rows > 0 else "0%" | |
| if field.type in ['integer', 'number'] and col_data: | |
| # 确保是可计算的数值 | |
| numeric_values = [ | |
| float(v) for v in col_data | |
| if isinstance(v, (int, float, decimal.Decimal)) | |
| ] | |
| if numeric_values: | |
| analysis["numeric_stats"] = { | |
| "min": min(numeric_values), | |
| "max": max(numeric_values), | |
| "avg": round(mean(numeric_values), 2) | |
| } | |
| # --- [Full Level] 枚举与样本 --- | |
| if detail_level == "full": | |
| # 枚举检测:如果唯一值较少(<15)且有数据,则视为枚举 | |
| unique_vals = list(set(col_data)) | |
| if 0 < len(unique_vals) <= 15: | |
| analysis["is_enumeration"] = True | |
| analysis["enum_values"] = [str(v) for v in unique_vals] | |
| else: | |
| analysis["is_enumeration"] = False | |
| # 样本提取:转换日期等特殊对象为字符串 | |
| samples = [] | |
| for v in col_data[:sample_rows]: | |
| if isinstance(v, (datetime, date)): | |
| samples.append(v.isoformat()) | |
| else: | |
| samples.append(str(v)) | |
| analysis["samples"] = samples | |
| fields_analysis.append(analysis) | |
| # 4. 组装最终报告 | |
| report = { | |
| "file_summary": { | |
| "total_rows": total_rows, | |
| "total_fields": len(fields_analysis), | |
| "detail_level_applied": detail_level | |
| }, | |
| "schema_analysis": fields_analysis | |
| } | |
| logger.info(f"分析成功: 识别到 {len(fields_analysis)} 列") | |
| return report | |
| except Exception as e: | |
| logger.error(f"分析异常: {str(e)}") | |
| return { | |
| "status": "failed", | |
| "error_type": type(e).__name__, | |
| "message": str(e) | |
| } | |
| finally: | |
| # 5. 清理临时文件,防止磁盘溢出 | |
| for path in (src_tmp_name, normalized_tmp_name): | |
| if path and os.path.exists(path): | |
| try: | |
| os.unlink(path) | |
| except Exception as cleanup_err: | |
| logger.warning(f"清理临时文件失败: {cleanup_err}") | |
| # --- 3. Gradio Interface (Will be auto-converted to MCP Tools) --- | |
| def get_excel_structure(file_url: str): | |
| """ | |
| 获取Excel所有工作表名称。 | |
| Args: | |
| file_url: Excel 文件的公网 URL 地址 | |
| Returns: | |
| list: 工作表名称列表 | |
| """ | |
| return get_sheets_logic(file_url) | |
| def profile_sheet_data(file_url: str, sheet_name: str = None, detail_level: str = "standard", sample_rows: int = 5): | |
| """ | |
| 深度分析指定工作表的数据画像。 | |
| Args: | |
| file_url: Excel 文件的公网 URL 地址 | |
| sheet_name: 要分析的工作表名称 (可选) | |
| detail_level: 分析详细程度 ('basic', 'standard', 'full') | |
| sample_rows: 当 detail_level 为 'full' 时,返回的样本行数 | |
| Returns: | |
| dict: 包含数据画像的 JSON 对象 | |
| """ | |
| return profile_data_logic(file_url, sheet_name, detail_level, sample_rows) | |
| # 构建 UI | |
| with gr.Blocks(title="Excel Analyzer Pro") as demo: | |
| gr.Markdown("# 📊 Excel Analyzer Pro") | |
| gr.Markdown("提供 Excel 文件的结构解析与深度数据画像功能。支持 MCP 协议。") | |
| with gr.Tab("获取工作表结构"): | |
| with gr.Row(): | |
| url_input_structure = gr.Textbox(label="Excel 文件 URL", placeholder="https://example.com/file.xlsx") | |
| btn_structure = gr.Button("获取工作表列表", variant="primary") | |
| output_structure = gr.JSON(label="工作表列表") | |
| btn_structure.click(get_excel_structure, inputs=[url_input_structure], outputs=[output_structure]) | |
| with gr.Tab("生成数据画像"): | |
| with gr.Row(): | |
| url_input_profile = gr.Textbox(label="Excel 文件 URL", placeholder="https://example.com/file.xlsx") | |
| sheet_input_profile = gr.Textbox(label="工作表名称 (可选)") | |
| with gr.Row(): | |
| detail_input_profile = gr.Dropdown(["basic", "standard", "full"], label="详细程度", value="standard") | |
| sample_input_profile = gr.Number(value=5, label="样本行数 (仅 full 模式)") | |
| btn_profile = gr.Button("生成数据画像", variant="primary") | |
| output_profile = gr.JSON(label="数据画像报告") | |
| btn_profile.click( | |
| profile_sheet_data, | |
| inputs=[url_input_profile, sheet_input_profile, detail_input_profile, sample_input_profile], | |
| outputs=[output_profile] | |
| ) | |
| # Hugging Face Spaces 启动入口 | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860, mcp_server=True) | |