File size: 8,943 Bytes
2ccb4a8
 
 
 
 
 
 
 
 
 
78cd7f8
2ccb4a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89b3827
 
 
 
2ccb4a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78cd7f8
 
 
 
 
 
 
 
 
 
 
 
 
2ccb4a8
78cd7f8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2ccb4a8
78cd7f8
2ccb4a8
78cd7f8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import os
import decimal
from datetime import datetime, date
from statistics import mean
import requests
import tempfile
from openpyxl import load_workbook
from frictionless import Resource, Detector
from frictionless.formats import ExcelControl
import logging
import gradio as gr

# --- 0. 日志配置 ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# --- 2. 核心逻辑函数 ---
def get_sheets_logic(file_url):
    with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_in:
        resp = requests.get(file_url, timeout=60)
        f_in.write(resp.content)
        tmp_path = f_in.name
    wb = load_workbook(tmp_path, read_only=True)
    sheets = wb.sheetnames
    wb.close()
    os.unlink(tmp_path)
    return sheets

def profile_data_logic(file_url, sheet_name, detail_level, sample_rows=5):
    """
    分析 Excel/CSV 文件并返回结构化画像。
    """
    src_tmp_name = None
    normalized_tmp_name = None

    try:
        logger.info(f"正在分析, 模式: {detail_level}")

        # 1. 下载文件到临时空间
        resp = requests.get(file_url, timeout=60)
        resp.raise_for_status()
        
        # 使用 .xlsx 后缀确保 openpyxl 和 frictionless 能正确识别格式
        with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_in:
            f_in.write(resp.content)
            src_tmp_name = f_in.name

        # 2. 规范化处理 (处理 Excel 公式、合并单元格)
        with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_out:
            normalized_tmp_name = f_out.name


        # 3. Frictionless 探测配置
        # 加载计算后的值 (data_only=True)
        wb = load_workbook(src_tmp_name, data_only=True)
        if sheet_name and sheet_name not in wb.sheetnames:
            return {"status": "error", "message": f"Sheet '{sheet_name}' 不存在"}
        wb.save(normalized_tmp_name)

        # 2. 核心探测逻辑
        control = ExcelControl(sheet=sheet_name, fill_merged_cells=True) if sheet_name else ExcelControl(fill_merged_cells=True)
        
        detector = Detector(sample_size=1000, field_confidence=0.9)
        resource = Resource(path=normalized_tmp_name, control=control, detector=detector)
        resource.infer(stats=True)

        with resource:
            schema = resource.schema
            all_rows = resource.read_rows()
            total_rows = len(all_rows)
            
            fields_analysis = []
            for index, field in enumerate(schema.fields):
                # --- [Basic Level] 基础信息 ---
                analysis = {
                    "name": field.name,
                    "type": field.type
                }

                # 如果级别是 basic,直接跳过后续重型计算
                if detail_level == "basic":
                    fields_analysis.append(analysis)
                    continue

                # 提取非空数据用于后续分析
                col_data = []
                null_count = 0
                for row in all_rows:
                    # Frictionless Row object behaves like a dictionary for field access
                    # row[index] works in older versions but it seems safer to use field name or direct property
                    # Let's try to access by field name which is safer
                    val = row.get(field.name)
                    if val is not None:
                        col_data.append(val)
                    else:
                        null_count += 1

                # --- [Standard Level] 质量与数值统计 ---
                analysis["missing_rate"] = f"{(null_count / total_rows) * 100:.2f}%" if total_rows > 0 else "0%"
                
                if field.type in ['integer', 'number'] and col_data:
                    # 确保是可计算的数值
                    numeric_values = [
                        float(v) for v in col_data 
                        if isinstance(v, (int, float, decimal.Decimal))
                    ]
                    if numeric_values:
                        analysis["numeric_stats"] = {
                            "min": min(numeric_values),
                            "max": max(numeric_values),
                            "avg": round(mean(numeric_values), 2)
                        }

                # --- [Full Level] 枚举与样本 ---
                if detail_level == "full":
                    # 枚举检测:如果唯一值较少(<15)且有数据,则视为枚举
                    unique_vals = list(set(col_data))
                    if 0 < len(unique_vals) <= 15:
                        analysis["is_enumeration"] = True
                        analysis["enum_values"] = [str(v) for v in unique_vals]
                    else:
                        analysis["is_enumeration"] = False
                    
                    # 样本提取:转换日期等特殊对象为字符串
                    samples = []
                    for v in col_data[:sample_rows]:
                        if isinstance(v, (datetime, date)):
                            samples.append(v.isoformat())
                        else:
                            samples.append(str(v))
                    analysis["samples"] = samples

                fields_analysis.append(analysis)

            # 4. 组装最终报告
            report = {
                "file_summary": {
                    "total_rows": total_rows,
                    "total_fields": len(fields_analysis),
                    "detail_level_applied": detail_level
                },
                "schema_analysis": fields_analysis
            }

            logger.info(f"分析成功: 识别到 {len(fields_analysis)} 列")
            return report

    except Exception as e:
        logger.error(f"分析异常: {str(e)}")
        return {
            "status": "failed",
            "error_type": type(e).__name__,
            "message": str(e)
        }

    finally:
        # 5. 清理临时文件,防止磁盘溢出
        for path in (src_tmp_name, normalized_tmp_name):
            if path and os.path.exists(path):
                try:
                    os.unlink(path)
                except Exception as cleanup_err:
                    logger.warning(f"清理临时文件失败: {cleanup_err}")

# --- 3. Gradio Interface (Will be auto-converted to MCP Tools) ---

def get_excel_structure(file_url: str):
    """
    获取Excel所有工作表名称。
    
    Args:
        file_url: Excel 文件的公网 URL 地址
    
    Returns:
        list: 工作表名称列表
    """
    return get_sheets_logic(file_url)

def profile_sheet_data(file_url: str, sheet_name: str = None, detail_level: str = "standard", sample_rows: int = 5):
    """
    深度分析指定工作表的数据画像。
    
    Args:
        file_url: Excel 文件的公网 URL 地址
        sheet_name: 要分析的工作表名称 (可选)
        detail_level: 分析详细程度 ('basic', 'standard', 'full')
        sample_rows: 当 detail_level 为 'full' 时,返回的样本行数
    
    Returns:
        dict: 包含数据画像的 JSON 对象
    """
    return profile_data_logic(file_url, sheet_name, detail_level, sample_rows)

# 构建 UI
with gr.Blocks(title="Excel Analyzer Pro") as demo:
    gr.Markdown("# 📊 Excel Analyzer Pro")
    gr.Markdown("提供 Excel 文件的结构解析与深度数据画像功能。支持 MCP 协议。")
    
    with gr.Tab("获取工作表结构"):
        with gr.Row():
            url_input_structure = gr.Textbox(label="Excel 文件 URL", placeholder="https://example.com/file.xlsx")
            btn_structure = gr.Button("获取工作表列表", variant="primary")
        output_structure = gr.JSON(label="工作表列表")
        
        btn_structure.click(get_excel_structure, inputs=[url_input_structure], outputs=[output_structure])

    with gr.Tab("生成数据画像"):
        with gr.Row():
            url_input_profile = gr.Textbox(label="Excel 文件 URL", placeholder="https://example.com/file.xlsx")
            sheet_input_profile = gr.Textbox(label="工作表名称 (可选)")
        with gr.Row():
            detail_input_profile = gr.Dropdown(["basic", "standard", "full"], label="详细程度", value="standard")
            sample_input_profile = gr.Number(value=5, label="样本行数 (仅 full 模式)")
        
        btn_profile = gr.Button("生成数据画像", variant="primary")
        output_profile = gr.JSON(label="数据画像报告")
        
        btn_profile.click(
            profile_sheet_data, 
            inputs=[url_input_profile, sheet_input_profile, detail_input_profile, sample_input_profile], 
            outputs=[output_profile]
        )

# Hugging Face Spaces 启动入口
if __name__ == "__main__":
    demo.launch(server_name="0.0.0.0", server_port=7860, mcp_server=True)