File size: 6,327 Bytes
89b3827
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import json
import os
import requests
import tempfile
import decimal
from datetime import datetime, date
from statistics import mean
from openpyxl import load_workbook
from frictionless import Resource, Detector
from frictionless.formats import ExcelControl
import logging

logger = logging.getLogger(__name__)

def main(file_info, detail_level="standard", sample_rows=5):
    """
    分析 Excel/CSV 文件并返回结构化画像。
    
    :param file_info: 包含 fileUrl 和 fileKey 的字典
    :param detail_level: 控制返回信息的详细程度
        - 'basic': 仅包含列名和数据类型。
        - 'standard': 包含 basic 内容 + 空值率 + 数值统计(最大/最小/平均)。
        - 'full': 包含 standard 内容 + 枚举值检测 + 自定义数量的数据样本。
    :param sample_rows: 当 detail_level 为 'full' 时,返回的样例行数。
    """
    src_tmp_name = None
    normalized_tmp_name = None

    try:
        file_url = file_info.get("fileUrl")
        file_key = file_info.get("fileKey", "file")

        logger.info(f"正在分析文件: {file_key}, 模式: {detail_level}")

        # 1. 下载文件到临时空间
        resp = requests.get(file_url, timeout=60)
        resp.raise_for_status()
        
        # 使用 .xlsx 后缀确保 openpyxl 和 frictionless 能正确识别格式
        with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_in:
            f_in.write(resp.content)
            src_tmp_name = f_in.name

        # 2. 规范化处理 (处理 Excel 公式、合并单元格)
        with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_out:
            normalized_tmp_name = f_out.name

        # data_only=True 确保读取的是计算后的值而非公式
        wb = load_workbook(src_tmp_name, data_only=True)
        wb.save(normalized_tmp_name)

        # 3. Frictionless 探测配置
        control = ExcelControl(fill_merged_cells=True)
        # 增加 header_rows 参数尝试自动探测表头
        detector = Detector(sample_size=1000, field_confidence=0.9)
        resource = Resource(path=normalized_tmp_name, control=control, detector=detector)
        resource.infer(stats=True)

        logger.info(f"Frictionless 推断的表头: {resource.schema.field_names}")
        # logger.info(f"Frictionless 样本行: {resource.read_rows(limit=2)}") # read_rows argument issue

        with resource:
            schema = resource.schema
            all_rows = resource.read_rows()
            total_rows = len(all_rows)
            
            fields_analysis = []
            for index, field in enumerate(schema.fields):
                # --- [Basic Level] 基础信息 ---
                analysis = {
                    "name": field.name,
                    "type": field.type
                }

                # 如果级别是 basic,直接跳过后续重型计算
                if detail_level == "basic":
                    fields_analysis.append(analysis)
                    continue

                # 提取非空数据用于后续分析
                col_data = []
                null_count = 0
                for row in all_rows:
                    # Frictionless Row object behaves like a dictionary for field access
                    val = row.get(field.name)
                    if val is not None:
                        col_data.append(val)
                    else:
                        null_count += 1

                # --- [Standard Level] 质量与数值统计 ---
                analysis["missing_rate"] = f"{(null_count / total_rows) * 100:.2f}%" if total_rows > 0 else "0%"
                
                if field.type in ['integer', 'number'] and col_data:
                    # 确保是可计算的数值
                    numeric_values = [
                        float(v) for v in col_data 
                        if isinstance(v, (int, float, decimal.Decimal))
                    ]
                    if numeric_values:
                        analysis["numeric_stats"] = {
                            "min": min(numeric_values),
                            "max": max(numeric_values),
                            "avg": round(mean(numeric_values), 2)
                        }

                # --- [Full Level] 枚举与样本 ---
                if detail_level == "full":
                    # 枚举检测:如果唯一值较少(<15)且有数据,则视为枚举
                    unique_vals = list(set(col_data))
                    if 0 < len(unique_vals) <= 15:
                        analysis["is_enumeration"] = True
                        analysis["enum_values"] = [str(v) for v in unique_vals]
                    else:
                        analysis["is_enumeration"] = False
                    
                    # 样本提取:转换日期等特殊对象为字符串
                    samples = []
                    for v in col_data[:sample_rows]:
                        if isinstance(v, (datetime, date)):
                            samples.append(v.isoformat())
                        else:
                            samples.append(str(v))
                    analysis["samples"] = samples

                fields_analysis.append(analysis)

            # 4. 组装最终报告
            report = {
                "file_summary": {
                    "key": file_key,
                    "total_rows": total_rows,
                    "total_fields": len(fields_analysis),
                    "detail_level_applied": detail_level
                },
                "schema_analysis": fields_analysis
            }

            logger.info(f"分析成功: 识别到 {len(fields_analysis)} 列")
            return report

    except Exception as e:
        logger.error(f"分析异常: {str(e)}")
        return {
            "status": "failed",
            "error_type": type(e).__name__,
            "message": str(e)
        }

    finally:
        # 5. 清理临时文件,防止磁盘溢出
        for path in (src_tmp_name, normalized_tmp_name):
            if path and os.path.exists(path):
                try:
                    os.unlink(path)
                except Exception as cleanup_err:
                    logger.warning(f"清理临时文件失败: {cleanup_err}")