Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import requests | |
| import tempfile | |
| import decimal | |
| from datetime import datetime, date | |
| from statistics import mean | |
| from openpyxl import load_workbook | |
| from frictionless import Resource, Detector | |
| from frictionless.formats import ExcelControl | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| def main(file_info, detail_level="standard", sample_rows=5): | |
| """ | |
| 分析 Excel/CSV 文件并返回结构化画像。 | |
| :param file_info: 包含 fileUrl 和 fileKey 的字典 | |
| :param detail_level: 控制返回信息的详细程度 | |
| - 'basic': 仅包含列名和数据类型。 | |
| - 'standard': 包含 basic 内容 + 空值率 + 数值统计(最大/最小/平均)。 | |
| - 'full': 包含 standard 内容 + 枚举值检测 + 自定义数量的数据样本。 | |
| :param sample_rows: 当 detail_level 为 'full' 时,返回的样例行数。 | |
| """ | |
| src_tmp_name = None | |
| normalized_tmp_name = None | |
| try: | |
| file_url = file_info.get("fileUrl") | |
| file_key = file_info.get("fileKey", "file") | |
| logger.info(f"正在分析文件: {file_key}, 模式: {detail_level}") | |
| # 1. 下载文件到临时空间 | |
| resp = requests.get(file_url, timeout=60) | |
| resp.raise_for_status() | |
| # 使用 .xlsx 后缀确保 openpyxl 和 frictionless 能正确识别格式 | |
| with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_in: | |
| f_in.write(resp.content) | |
| src_tmp_name = f_in.name | |
| # 2. 规范化处理 (处理 Excel 公式、合并单元格) | |
| with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_out: | |
| normalized_tmp_name = f_out.name | |
| # data_only=True 确保读取的是计算后的值而非公式 | |
| wb = load_workbook(src_tmp_name, data_only=True) | |
| wb.save(normalized_tmp_name) | |
| # 3. Frictionless 探测配置 | |
| control = ExcelControl(fill_merged_cells=True) | |
| # 增加 header_rows 参数尝试自动探测表头 | |
| detector = Detector(sample_size=1000, field_confidence=0.9) | |
| resource = Resource(path=normalized_tmp_name, control=control, detector=detector) | |
| resource.infer(stats=True) | |
| logger.info(f"Frictionless 推断的表头: {resource.schema.field_names}") | |
| # logger.info(f"Frictionless 样本行: {resource.read_rows(limit=2)}") # read_rows argument issue | |
| with resource: | |
| schema = resource.schema | |
| all_rows = resource.read_rows() | |
| total_rows = len(all_rows) | |
| fields_analysis = [] | |
| for index, field in enumerate(schema.fields): | |
| # --- [Basic Level] 基础信息 --- | |
| analysis = { | |
| "name": field.name, | |
| "type": field.type | |
| } | |
| # 如果级别是 basic,直接跳过后续重型计算 | |
| if detail_level == "basic": | |
| fields_analysis.append(analysis) | |
| continue | |
| # 提取非空数据用于后续分析 | |
| col_data = [] | |
| null_count = 0 | |
| for row in all_rows: | |
| # Frictionless Row object behaves like a dictionary for field access | |
| val = row.get(field.name) | |
| if val is not None: | |
| col_data.append(val) | |
| else: | |
| null_count += 1 | |
| # --- [Standard Level] 质量与数值统计 --- | |
| analysis["missing_rate"] = f"{(null_count / total_rows) * 100:.2f}%" if total_rows > 0 else "0%" | |
| if field.type in ['integer', 'number'] and col_data: | |
| # 确保是可计算的数值 | |
| numeric_values = [ | |
| float(v) for v in col_data | |
| if isinstance(v, (int, float, decimal.Decimal)) | |
| ] | |
| if numeric_values: | |
| analysis["numeric_stats"] = { | |
| "min": min(numeric_values), | |
| "max": max(numeric_values), | |
| "avg": round(mean(numeric_values), 2) | |
| } | |
| # --- [Full Level] 枚举与样本 --- | |
| if detail_level == "full": | |
| # 枚举检测:如果唯一值较少(<15)且有数据,则视为枚举 | |
| unique_vals = list(set(col_data)) | |
| if 0 < len(unique_vals) <= 15: | |
| analysis["is_enumeration"] = True | |
| analysis["enum_values"] = [str(v) for v in unique_vals] | |
| else: | |
| analysis["is_enumeration"] = False | |
| # 样本提取:转换日期等特殊对象为字符串 | |
| samples = [] | |
| for v in col_data[:sample_rows]: | |
| if isinstance(v, (datetime, date)): | |
| samples.append(v.isoformat()) | |
| else: | |
| samples.append(str(v)) | |
| analysis["samples"] = samples | |
| fields_analysis.append(analysis) | |
| # 4. 组装最终报告 | |
| report = { | |
| "file_summary": { | |
| "key": file_key, | |
| "total_rows": total_rows, | |
| "total_fields": len(fields_analysis), | |
| "detail_level_applied": detail_level | |
| }, | |
| "schema_analysis": fields_analysis | |
| } | |
| logger.info(f"分析成功: 识别到 {len(fields_analysis)} 列") | |
| return report | |
| except Exception as e: | |
| logger.error(f"分析异常: {str(e)}") | |
| return { | |
| "status": "failed", | |
| "error_type": type(e).__name__, | |
| "message": str(e) | |
| } | |
| finally: | |
| # 5. 清理临时文件,防止磁盘溢出 | |
| for path in (src_tmp_name, normalized_tmp_name): | |
| if path and os.path.exists(path): | |
| try: | |
| os.unlink(path) | |
| except Exception as cleanup_err: | |
| logger.warning(f"清理临时文件失败: {cleanup_err}") |