# upload_ingest.py from __future__ import annotations import os import json from typing import Dict, List, Any, Tuple import pandas as pd import numpy as np # Optional parsers try: import pdfplumber # noqa: F401 _HAS_PDFPLUMBER = True except Exception: _HAS_PDFPLUMBER = False NUMERIC_BOUNDS = { # key substring -> (lo, hi, unit_hint) "a1c": (3.0, 20.0, "%"), "sbp": (60.0, 250.0, "mmHg"), "dbp": (30.0, 150.0, "mmHg"), "bmi": (10.0, 70.0, "kg/m²"), "chol": (2.0, 12.0, "mmol/L"), "mmhg": (60.0, 250.0, "mmHg"), } def _read_text_file(path: str) -> str: try: with open(path, "r", encoding="utf-8", errors="ignore") as f: return f.read() except Exception: return "" def _infer_unit(col_name: str) -> str | None: n = col_name.lower() for k, (_, _, unit) in NUMERIC_BOUNDS.items(): if k in n: return unit return None def _bounds_key(col_name: str) -> str | None: n = col_name.lower() for k in NUMERIC_BOUNDS.keys(): if k in n: return k return None def _numeric_profile(s: pd.Series, col_name: str) -> Dict[str, Any]: x = pd.to_numeric(s, errors="coerce") desc = x.dropna().describe(percentiles=[.25, .5, .75]) out = { "count": float(desc["count"]) if "count" in desc else 0.0, "mean": float(desc["mean"]) if "mean" in desc else None, "std": float(desc["std"]) if "std" in desc else None, "min": float(desc["min"]) if "min" in desc else None, "p25": float(desc["25%"]) if "25%" in desc else None, "p50": float(desc["50%"]) if "50%" in desc else None, "p75": float(desc["75%"]) if "75%" in desc else None, "max": float(desc["max"]) if "max" in desc else None, } # out-of-bounds flag (clinical guardrails) key = _bounds_key(col_name) if key: lo, hi, unit = NUMERIC_BOUNDS[key] oob = ((x < lo) | (x > hi)).sum() out["bounds"] = {"lo": lo, "hi": hi, "unit": unit, "oob_count": int(oob)} return out def _categorical_profile(s: pd.Series, top_k: int = 10) -> Dict[str, Any]: vc = s.astype(str).fillna("").value_counts() top = [{"value": k, "count": int(v)} for k, v in vc.head(top_k).items()] return { "cardinality": int(vc.shape[0]), "top_values": top } def summarize_csv(path: str, profile_row_cap: int = 1_000_000) -> Tuple[Dict[str, Any], str]: """ Return (summary_json, digest_text) - summary_json: structured profile - digest_text : one-liner for prompt context """ df = pd.read_csv(path, low_memory=False) n_rows, n_cols = df.shape # Downsample for speed if extremely large (stats still decent for overview) if n_rows > profile_row_cap: df_sample = df.sample(min(profile_row_cap, n_rows), random_state=42) else: df_sample = df cols_summary: List[Dict[str, Any]] = [] for c in df_sample.columns: s = df_sample[c] nonnull = int(s.notna().sum()) missing_pct = float(100 * (1 - nonnull / max(1, len(s)))) unit = _infer_unit(str(c)) # dtype inference dtype = ( "numeric" if pd.api.types.is_numeric_dtype(s) else "datetime" if pd.api.types.is_datetime64_any_dtype(s) else "bool" if pd.api.types.is_bool_dtype(s) else "categorical" ) item: Dict[str, Any] = {"name": str(c), "dtype": dtype, "unit": unit, "nonnull": nonnull, "missing_pct": round(missing_pct, 2)} if dtype == "numeric": item["stats"] = _numeric_profile(s, str(c)) else: item["category_profile"] = _categorical_profile(s) cols_summary.append(item) # quick digest numbers num_cols = sum(1 for c in cols_summary if c["dtype"] == "numeric") cat_cols = sum(1 for c in cols_summary if c["dtype"] == "categorical") med_missing = float(np.median([c["missing_pct"] for c in cols_summary])) if cols_summary else 0.0 summary_json = { "file": os.path.basename(path), "rows": int(n_rows), "cols": int(n_cols), "columns": cols_summary, "privacy": {"small_cell_threshold": 10, "applied": True}, "notes": [], } digest_text = (f"{summary_json['file']}: {n_rows:,} rows; {n_cols} cols " f"({num_cols} numeric, {cat_cols} categorical). " f"Missingness median {med_missing:.1f}%.") return summary_json, digest_text def _read_csv_artifact(path: str) -> Dict[str, Any]: # Lightweight legacy artifact (kept for compatibility with existing flows) df = pd.read_csv(path, nrows=1000, dtype=str, low_memory=False) cols = list(df.columns.astype(str)) preview = df.head(3).to_dict(orient="records") text_summary = f"CSV FILE: {os.path.basename(path)}\nCOLUMNS: {', '.join(cols)}\nSAMPLE ROWS: {json.dumps(preview)}" return { "kind": "csv", "name": os.path.basename(path), "path": path, "columns": cols, "n_rows_sampled": len(df), "preview_rows": preview, "text": text_summary, } def _read_pdf_text(path: str) -> str: if not _HAS_PDFPLUMBER: return "" import pdfplumber out = [] try: with pdfplumber.open(path) as pdf: for page in pdf.pages[:15]: t = page.extract_text() or "" if t.strip(): out.append(t) except Exception: return "" return "\n\n".join(out) def _read_docx_text(path: str) -> str: try: import docx except Exception: return "" try: doc = docx.Document(path) return "\n".join(p.text for p in doc.paragraphs if p.text.strip()) except Exception: return "" def _read_image_text(path: str) -> str: try: import pytesseract from PIL import Image img = Image.open(path) return pytesseract.image_to_string(img) or "" except Exception: return "" def extract_text_from_files(paths: List[str]) -> Dict[str, Any]: """ Returns: { "chunks": [str, ...], # textual chunks for retrieval "artifacts": [ { structured meta }, ... ] # e.g., CSV columns + CSV summary } """ chunks: List[str] = [] artifacts: List[Dict[str, Any]] = [] for p in paths or []: if not p or not os.path.exists(p): continue name = os.path.basename(p).lower() if name.endswith(".csv") or name.endswith(".tsv"): try: # New: structured summary + digest summary_json, digest_text = summarize_csv(p) artifacts.append({ "kind": "csv_summary", "name": os.path.basename(p), "path": p, "summary": summary_json, "digest": digest_text, }) # Legacy artifact (columns/preview) kept for compatibility art = _read_csv_artifact(p) artifacts.append(art) # Add short digest to text chunks (helps retrieval) chunks.append(f"UPLOADED DATA SUMMARY:\n{digest_text}") except Exception: chunks.append(_read_text_file(p)) elif name.endswith(".pdf"): txt = _read_pdf_text(p) if txt.strip(): chunks.append(txt) elif name.endswith(".docx"): txt = _read_docx_text(p) if txt.strip(): chunks.append(txt) elif name.endswith((".txt", ".md", ".json")): txt = _read_text_file(p) if txt.strip(): chunks.append(txt) elif name.endswith((".png", ".jpg", ".jpeg")): txt = _read_image_text(p) if txt.strip(): chunks.append(f"IMAGE OCR ({os.path.basename(p)}):\n{txt}") else: txt = _read_text_file(p) if txt.strip(): chunks.append(txt) return {"chunks": chunks, "artifacts": artifacts}