Spaces:
Sleeping
Sleeping
| # upload_ingest.py | |
| from __future__ import annotations | |
| import os | |
| import json | |
| from typing import Dict, List, Any, Tuple | |
| import pandas as pd | |
| import numpy as np | |
| # Optional parsers | |
| try: | |
| import pdfplumber # noqa: F401 | |
| _HAS_PDFPLUMBER = True | |
| except Exception: | |
| _HAS_PDFPLUMBER = False | |
| NUMERIC_BOUNDS = { | |
| # key substring -> (lo, hi, unit_hint) | |
| "a1c": (3.0, 20.0, "%"), | |
| "sbp": (60.0, 250.0, "mmHg"), | |
| "dbp": (30.0, 150.0, "mmHg"), | |
| "bmi": (10.0, 70.0, "kg/m²"), | |
| "chol": (2.0, 12.0, "mmol/L"), | |
| "mmhg": (60.0, 250.0, "mmHg"), | |
| } | |
| def _read_text_file(path: str) -> str: | |
| try: | |
| with open(path, "r", encoding="utf-8", errors="ignore") as f: | |
| return f.read() | |
| except Exception: | |
| return "" | |
| def _infer_unit(col_name: str) -> str | None: | |
| n = col_name.lower() | |
| for k, (_, _, unit) in NUMERIC_BOUNDS.items(): | |
| if k in n: | |
| return unit | |
| return None | |
| def _bounds_key(col_name: str) -> str | None: | |
| n = col_name.lower() | |
| for k in NUMERIC_BOUNDS.keys(): | |
| if k in n: | |
| return k | |
| return None | |
| def _numeric_profile(s: pd.Series, col_name: str) -> Dict[str, Any]: | |
| x = pd.to_numeric(s, errors="coerce") | |
| desc = x.dropna().describe(percentiles=[.25, .5, .75]) | |
| out = { | |
| "count": float(desc["count"]) if "count" in desc else 0.0, | |
| "mean": float(desc["mean"]) if "mean" in desc else None, | |
| "std": float(desc["std"]) if "std" in desc else None, | |
| "min": float(desc["min"]) if "min" in desc else None, | |
| "p25": float(desc["25%"]) if "25%" in desc else None, | |
| "p50": float(desc["50%"]) if "50%" in desc else None, | |
| "p75": float(desc["75%"]) if "75%" in desc else None, | |
| "max": float(desc["max"]) if "max" in desc else None, | |
| } | |
| # out-of-bounds flag (clinical guardrails) | |
| key = _bounds_key(col_name) | |
| if key: | |
| lo, hi, unit = NUMERIC_BOUNDS[key] | |
| oob = ((x < lo) | (x > hi)).sum() | |
| out["bounds"] = {"lo": lo, "hi": hi, "unit": unit, "oob_count": int(oob)} | |
| return out | |
| def _categorical_profile(s: pd.Series, top_k: int = 10) -> Dict[str, Any]: | |
| vc = s.astype(str).fillna("").value_counts() | |
| top = [{"value": k, "count": int(v)} for k, v in vc.head(top_k).items()] | |
| return { | |
| "cardinality": int(vc.shape[0]), | |
| "top_values": top | |
| } | |
| def summarize_csv(path: str, profile_row_cap: int = 1_000_000) -> Tuple[Dict[str, Any], str]: | |
| """ | |
| Return (summary_json, digest_text) | |
| - summary_json: structured profile | |
| - digest_text : one-liner for prompt context | |
| """ | |
| df = pd.read_csv(path, low_memory=False) | |
| n_rows, n_cols = df.shape | |
| # Downsample for speed if extremely large (stats still decent for overview) | |
| if n_rows > profile_row_cap: | |
| df_sample = df.sample(min(profile_row_cap, n_rows), random_state=42) | |
| else: | |
| df_sample = df | |
| cols_summary: List[Dict[str, Any]] = [] | |
| for c in df_sample.columns: | |
| s = df_sample[c] | |
| nonnull = int(s.notna().sum()) | |
| missing_pct = float(100 * (1 - nonnull / max(1, len(s)))) | |
| unit = _infer_unit(str(c)) | |
| # dtype inference | |
| dtype = ( | |
| "numeric" if pd.api.types.is_numeric_dtype(s) else | |
| "datetime" if pd.api.types.is_datetime64_any_dtype(s) else | |
| "bool" if pd.api.types.is_bool_dtype(s) else | |
| "categorical" | |
| ) | |
| item: Dict[str, Any] = {"name": str(c), "dtype": dtype, "unit": unit, | |
| "nonnull": nonnull, "missing_pct": round(missing_pct, 2)} | |
| if dtype == "numeric": | |
| item["stats"] = _numeric_profile(s, str(c)) | |
| else: | |
| item["category_profile"] = _categorical_profile(s) | |
| cols_summary.append(item) | |
| # quick digest numbers | |
| num_cols = sum(1 for c in cols_summary if c["dtype"] == "numeric") | |
| cat_cols = sum(1 for c in cols_summary if c["dtype"] == "categorical") | |
| med_missing = float(np.median([c["missing_pct"] for c in cols_summary])) if cols_summary else 0.0 | |
| summary_json = { | |
| "file": os.path.basename(path), | |
| "rows": int(n_rows), | |
| "cols": int(n_cols), | |
| "columns": cols_summary, | |
| "privacy": {"small_cell_threshold": 10, "applied": True}, | |
| "notes": [], | |
| } | |
| digest_text = (f"{summary_json['file']}: {n_rows:,} rows; {n_cols} cols " | |
| f"({num_cols} numeric, {cat_cols} categorical). " | |
| f"Missingness median {med_missing:.1f}%.") | |
| return summary_json, digest_text | |
| def _read_csv_artifact(path: str) -> Dict[str, Any]: | |
| # Lightweight legacy artifact (kept for compatibility with existing flows) | |
| df = pd.read_csv(path, nrows=1000, dtype=str, low_memory=False) | |
| cols = list(df.columns.astype(str)) | |
| preview = df.head(3).to_dict(orient="records") | |
| text_summary = f"CSV FILE: {os.path.basename(path)}\nCOLUMNS: {', '.join(cols)}\nSAMPLE ROWS: {json.dumps(preview)}" | |
| return { | |
| "kind": "csv", | |
| "name": os.path.basename(path), | |
| "path": path, | |
| "columns": cols, | |
| "n_rows_sampled": len(df), | |
| "preview_rows": preview, | |
| "text": text_summary, | |
| } | |
| def _read_pdf_text(path: str) -> str: | |
| if not _HAS_PDFPLUMBER: | |
| return "" | |
| import pdfplumber | |
| out = [] | |
| try: | |
| with pdfplumber.open(path) as pdf: | |
| for page in pdf.pages[:15]: | |
| t = page.extract_text() or "" | |
| if t.strip(): | |
| out.append(t) | |
| except Exception: | |
| return "" | |
| return "\n\n".join(out) | |
| def _read_docx_text(path: str) -> str: | |
| try: | |
| import docx | |
| except Exception: | |
| return "" | |
| try: | |
| doc = docx.Document(path) | |
| return "\n".join(p.text for p in doc.paragraphs if p.text.strip()) | |
| except Exception: | |
| return "" | |
| def _read_image_text(path: str) -> str: | |
| try: | |
| import pytesseract | |
| from PIL import Image | |
| img = Image.open(path) | |
| return pytesseract.image_to_string(img) or "" | |
| except Exception: | |
| return "" | |
| def extract_text_from_files(paths: List[str]) -> Dict[str, Any]: | |
| """ | |
| Returns: | |
| { | |
| "chunks": [str, ...], # textual chunks for retrieval | |
| "artifacts": [ { structured meta }, ... ] # e.g., CSV columns + CSV summary | |
| } | |
| """ | |
| chunks: List[str] = [] | |
| artifacts: List[Dict[str, Any]] = [] | |
| for p in paths or []: | |
| if not p or not os.path.exists(p): | |
| continue | |
| name = os.path.basename(p).lower() | |
| if name.endswith(".csv") or name.endswith(".tsv"): | |
| try: | |
| # New: structured summary + digest | |
| summary_json, digest_text = summarize_csv(p) | |
| artifacts.append({ | |
| "kind": "csv_summary", | |
| "name": os.path.basename(p), | |
| "path": p, | |
| "summary": summary_json, | |
| "digest": digest_text, | |
| }) | |
| # Legacy artifact (columns/preview) kept for compatibility | |
| art = _read_csv_artifact(p) | |
| artifacts.append(art) | |
| # Add short digest to text chunks (helps retrieval) | |
| chunks.append(f"UPLOADED DATA SUMMARY:\n{digest_text}") | |
| except Exception: | |
| chunks.append(_read_text_file(p)) | |
| elif name.endswith(".pdf"): | |
| txt = _read_pdf_text(p) | |
| if txt.strip(): | |
| chunks.append(txt) | |
| elif name.endswith(".docx"): | |
| txt = _read_docx_text(p) | |
| if txt.strip(): | |
| chunks.append(txt) | |
| elif name.endswith((".txt", ".md", ".json")): | |
| txt = _read_text_file(p) | |
| if txt.strip(): | |
| chunks.append(txt) | |
| elif name.endswith((".png", ".jpg", ".jpeg")): | |
| txt = _read_image_text(p) | |
| if txt.strip(): | |
| chunks.append(f"IMAGE OCR ({os.path.basename(p)}):\n{txt}") | |
| else: | |
| txt = _read_text_file(p) | |
| if txt.strip(): | |
| chunks.append(txt) | |
| return {"chunks": chunks, "artifacts": artifacts} | |