import json import os import requests import tempfile import decimal from datetime import datetime, date from statistics import mean from openpyxl import load_workbook from frictionless import Resource, Detector from frictionless.formats import ExcelControl import logging logger = logging.getLogger(__name__) def main(file_info, detail_level="standard", sample_rows=5): """ 分析 Excel/CSV 文件并返回结构化画像。 :param file_info: 包含 fileUrl 和 fileKey 的字典 :param detail_level: 控制返回信息的详细程度 - 'basic': 仅包含列名和数据类型。 - 'standard': 包含 basic 内容 + 空值率 + 数值统计(最大/最小/平均)。 - 'full': 包含 standard 内容 + 枚举值检测 + 自定义数量的数据样本。 :param sample_rows: 当 detail_level 为 'full' 时,返回的样例行数。 """ src_tmp_name = None normalized_tmp_name = None try: file_url = file_info.get("fileUrl") file_key = file_info.get("fileKey", "file") logger.info(f"正在分析文件: {file_key}, 模式: {detail_level}") # 1. 下载文件到临时空间 resp = requests.get(file_url, timeout=60) resp.raise_for_status() # 使用 .xlsx 后缀确保 openpyxl 和 frictionless 能正确识别格式 with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_in: f_in.write(resp.content) src_tmp_name = f_in.name # 2. 规范化处理 (处理 Excel 公式、合并单元格) with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as f_out: normalized_tmp_name = f_out.name # data_only=True 确保读取的是计算后的值而非公式 wb = load_workbook(src_tmp_name, data_only=True) wb.save(normalized_tmp_name) # 3. Frictionless 探测配置 control = ExcelControl(fill_merged_cells=True) # 增加 header_rows 参数尝试自动探测表头 detector = Detector(sample_size=1000, field_confidence=0.9) resource = Resource(path=normalized_tmp_name, control=control, detector=detector) resource.infer(stats=True) logger.info(f"Frictionless 推断的表头: {resource.schema.field_names}") # logger.info(f"Frictionless 样本行: {resource.read_rows(limit=2)}") # read_rows argument issue with resource: schema = resource.schema all_rows = resource.read_rows() total_rows = len(all_rows) fields_analysis = [] for index, field in enumerate(schema.fields): # --- [Basic Level] 基础信息 --- analysis = { "name": field.name, "type": field.type } # 如果级别是 basic,直接跳过后续重型计算 if detail_level == "basic": fields_analysis.append(analysis) continue # 提取非空数据用于后续分析 col_data = [] null_count = 0 for row in all_rows: # Frictionless Row object behaves like a dictionary for field access val = row.get(field.name) if val is not None: col_data.append(val) else: null_count += 1 # --- [Standard Level] 质量与数值统计 --- analysis["missing_rate"] = f"{(null_count / total_rows) * 100:.2f}%" if total_rows > 0 else "0%" if field.type in ['integer', 'number'] and col_data: # 确保是可计算的数值 numeric_values = [ float(v) for v in col_data if isinstance(v, (int, float, decimal.Decimal)) ] if numeric_values: analysis["numeric_stats"] = { "min": min(numeric_values), "max": max(numeric_values), "avg": round(mean(numeric_values), 2) } # --- [Full Level] 枚举与样本 --- if detail_level == "full": # 枚举检测:如果唯一值较少(<15)且有数据,则视为枚举 unique_vals = list(set(col_data)) if 0 < len(unique_vals) <= 15: analysis["is_enumeration"] = True analysis["enum_values"] = [str(v) for v in unique_vals] else: analysis["is_enumeration"] = False # 样本提取:转换日期等特殊对象为字符串 samples = [] for v in col_data[:sample_rows]: if isinstance(v, (datetime, date)): samples.append(v.isoformat()) else: samples.append(str(v)) analysis["samples"] = samples fields_analysis.append(analysis) # 4. 组装最终报告 report = { "file_summary": { "key": file_key, "total_rows": total_rows, "total_fields": len(fields_analysis), "detail_level_applied": detail_level }, "schema_analysis": fields_analysis } logger.info(f"分析成功: 识别到 {len(fields_analysis)} 列") return report except Exception as e: logger.error(f"分析异常: {str(e)}") return { "status": "failed", "error_type": type(e).__name__, "message": str(e) } finally: # 5. 清理临时文件,防止磁盘溢出 for path in (src_tmp_name, normalized_tmp_name): if path and os.path.exists(path): try: os.unlink(path) except Exception as cleanup_err: logger.warning(f"清理临时文件失败: {cleanup_err}")