import os import decimal from datetime import datetime, date from statistics import mean import requests import tempfile from openpyxl import load_workbook from frictionless import Resource, Detector from frictionless.formats import ExcelControl import logging import gradio as gr # --- 0. 日志配置 --- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # --- 2. 核心逻辑函数 --- def get_sheets_logic(file_url): with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_in: resp = requests.get(file_url, timeout=60) f_in.write(resp.content) tmp_path = f_in.name wb = load_workbook(tmp_path, read_only=True) sheets = wb.sheetnames wb.close() os.unlink(tmp_path) return sheets def profile_data_logic(file_url, sheet_name, detail_level, sample_rows=5): """ 分析 Excel/CSV 文件并返回结构化画像。 """ src_tmp_name = None normalized_tmp_name = None try: logger.info(f"正在分析, 模式: {detail_level}") # 1. 下载文件到临时空间 resp = requests.get(file_url, timeout=60) resp.raise_for_status() # 使用 .xlsx 后缀确保 openpyxl 和 frictionless 能正确识别格式 with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_in: f_in.write(resp.content) src_tmp_name = f_in.name # 2. 规范化处理 (处理 Excel 公式、合并单元格) with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_out: normalized_tmp_name = f_out.name # 3. Frictionless 探测配置 # 加载计算后的值 (data_only=True) wb = load_workbook(src_tmp_name, data_only=True) if sheet_name and sheet_name not in wb.sheetnames: return {"status": "error", "message": f"Sheet '{sheet_name}' 不存在"} wb.save(normalized_tmp_name) # 2. 核心探测逻辑 control = ExcelControl(sheet=sheet_name, fill_merged_cells=True) if sheet_name else ExcelControl(fill_merged_cells=True) detector = Detector(sample_size=1000, field_confidence=0.9) resource = Resource(path=normalized_tmp_name, control=control, detector=detector) resource.infer(stats=True) with resource: schema = resource.schema all_rows = resource.read_rows() total_rows = len(all_rows) fields_analysis = [] for index, field in enumerate(schema.fields): # --- [Basic Level] 基础信息 --- analysis = { "name": field.name, "type": field.type } # 如果级别是 basic,直接跳过后续重型计算 if detail_level == "basic": fields_analysis.append(analysis) continue # 提取非空数据用于后续分析 col_data = [] null_count = 0 for row in all_rows: # Frictionless Row object behaves like a dictionary for field access # row[index] works in older versions but it seems safer to use field name or direct property # Let's try to access by field name which is safer val = row.get(field.name) if val is not None: col_data.append(val) else: null_count += 1 # --- [Standard Level] 质量与数值统计 --- analysis["missing_rate"] = f"{(null_count / total_rows) * 100:.2f}%" if total_rows > 0 else "0%" if field.type in ['integer', 'number'] and col_data: # 确保是可计算的数值 numeric_values = [ float(v) for v in col_data if isinstance(v, (int, float, decimal.Decimal)) ] if numeric_values: analysis["numeric_stats"] = { "min": min(numeric_values), "max": max(numeric_values), "avg": round(mean(numeric_values), 2) } # --- [Full Level] 枚举与样本 --- if detail_level == "full": # 枚举检测:如果唯一值较少(<15)且有数据,则视为枚举 unique_vals = list(set(col_data)) if 0 < len(unique_vals) <= 15: analysis["is_enumeration"] = True analysis["enum_values"] = [str(v) for v in unique_vals] else: analysis["is_enumeration"] = False # 样本提取:转换日期等特殊对象为字符串 samples = [] for v in col_data[:sample_rows]: if isinstance(v, (datetime, date)): samples.append(v.isoformat()) else: samples.append(str(v)) analysis["samples"] = samples fields_analysis.append(analysis) # 4. 组装最终报告 report = { "file_summary": { "total_rows": total_rows, "total_fields": len(fields_analysis), "detail_level_applied": detail_level }, "schema_analysis": fields_analysis } logger.info(f"分析成功: 识别到 {len(fields_analysis)} 列") return report except Exception as e: logger.error(f"分析异常: {str(e)}") return { "status": "failed", "error_type": type(e).__name__, "message": str(e) } finally: # 5. 清理临时文件,防止磁盘溢出 for path in (src_tmp_name, normalized_tmp_name): if path and os.path.exists(path): try: os.unlink(path) except Exception as cleanup_err: logger.warning(f"清理临时文件失败: {cleanup_err}") # --- 3. Gradio Interface (Will be auto-converted to MCP Tools) --- def get_excel_structure(file_url: str): """ 获取Excel所有工作表名称。 Args: file_url: Excel 文件的公网 URL 地址 Returns: list: 工作表名称列表 """ return get_sheets_logic(file_url) def profile_sheet_data(file_url: str, sheet_name: str = None, detail_level: str = "standard", sample_rows: int = 5): """ 深度分析指定工作表的数据画像。 Args: file_url: Excel 文件的公网 URL 地址 sheet_name: 要分析的工作表名称 (可选) detail_level: 分析详细程度 ('basic', 'standard', 'full') sample_rows: 当 detail_level 为 'full' 时,返回的样本行数 Returns: dict: 包含数据画像的 JSON 对象 """ return profile_data_logic(file_url, sheet_name, detail_level, sample_rows) # 构建 UI with gr.Blocks(title="Excel Analyzer Pro") as demo: gr.Markdown("# 📊 Excel Analyzer Pro") gr.Markdown("提供 Excel 文件的结构解析与深度数据画像功能。支持 MCP 协议。") with gr.Tab("获取工作表结构"): with gr.Row(): url_input_structure = gr.Textbox(label="Excel 文件 URL", placeholder="https://example.com/file.xlsx") btn_structure = gr.Button("获取工作表列表", variant="primary") output_structure = gr.JSON(label="工作表列表") btn_structure.click(get_excel_structure, inputs=[url_input_structure], outputs=[output_structure]) with gr.Tab("生成数据画像"): with gr.Row(): url_input_profile = gr.Textbox(label="Excel 文件 URL", placeholder="https://example.com/file.xlsx") sheet_input_profile = gr.Textbox(label="工作表名称 (可选)") with gr.Row(): detail_input_profile = gr.Dropdown(["basic", "standard", "full"], label="详细程度", value="standard") sample_input_profile = gr.Number(value=5, label="样本行数 (仅 full 模式)") btn_profile = gr.Button("生成数据画像", variant="primary") output_profile = gr.JSON(label="数据画像报告") btn_profile.click( profile_sheet_data, inputs=[url_input_profile, sheet_input_profile, detail_input_profile, sample_input_profile], outputs=[output_profile] ) # Hugging Face Spaces 启动入口 if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860, mcp_server=True)