Medica_DecisionSupportAI / upload_ingest.py
Rajan Sharma
Update upload_ingest.py
7ae997f verified
raw
history blame
8.15 kB
# upload_ingest.py
from __future__ import annotations
import os
import json
from typing import Dict, List, Any, Tuple
import pandas as pd
import numpy as np
# Optional parsers
try:
import pdfplumber # noqa: F401
_HAS_PDFPLUMBER = True
except Exception:
_HAS_PDFPLUMBER = False
NUMERIC_BOUNDS = {
# key substring -> (lo, hi, unit_hint)
"a1c": (3.0, 20.0, "%"),
"sbp": (60.0, 250.0, "mmHg"),
"dbp": (30.0, 150.0, "mmHg"),
"bmi": (10.0, 70.0, "kg/m²"),
"chol": (2.0, 12.0, "mmol/L"),
"mmhg": (60.0, 250.0, "mmHg"),
}
def _read_text_file(path: str) -> str:
try:
with open(path, "r", encoding="utf-8", errors="ignore") as f:
return f.read()
except Exception:
return ""
def _infer_unit(col_name: str) -> str | None:
n = col_name.lower()
for k, (_, _, unit) in NUMERIC_BOUNDS.items():
if k in n:
return unit
return None
def _bounds_key(col_name: str) -> str | None:
n = col_name.lower()
for k in NUMERIC_BOUNDS.keys():
if k in n:
return k
return None
def _numeric_profile(s: pd.Series, col_name: str) -> Dict[str, Any]:
x = pd.to_numeric(s, errors="coerce")
desc = x.dropna().describe(percentiles=[.25, .5, .75])
out = {
"count": float(desc["count"]) if "count" in desc else 0.0,
"mean": float(desc["mean"]) if "mean" in desc else None,
"std": float(desc["std"]) if "std" in desc else None,
"min": float(desc["min"]) if "min" in desc else None,
"p25": float(desc["25%"]) if "25%" in desc else None,
"p50": float(desc["50%"]) if "50%" in desc else None,
"p75": float(desc["75%"]) if "75%" in desc else None,
"max": float(desc["max"]) if "max" in desc else None,
}
# out-of-bounds flag (clinical guardrails)
key = _bounds_key(col_name)
if key:
lo, hi, unit = NUMERIC_BOUNDS[key]
oob = ((x < lo) | (x > hi)).sum()
out["bounds"] = {"lo": lo, "hi": hi, "unit": unit, "oob_count": int(oob)}
return out
def _categorical_profile(s: pd.Series, top_k: int = 10) -> Dict[str, Any]:
vc = s.astype(str).fillna("").value_counts()
top = [{"value": k, "count": int(v)} for k, v in vc.head(top_k).items()]
return {
"cardinality": int(vc.shape[0]),
"top_values": top
}
def summarize_csv(path: str, profile_row_cap: int = 1_000_000) -> Tuple[Dict[str, Any], str]:
"""
Return (summary_json, digest_text)
- summary_json: structured profile
- digest_text : one-liner for prompt context
"""
df = pd.read_csv(path, low_memory=False)
n_rows, n_cols = df.shape
# Downsample for speed if extremely large (stats still decent for overview)
if n_rows > profile_row_cap:
df_sample = df.sample(min(profile_row_cap, n_rows), random_state=42)
else:
df_sample = df
cols_summary: List[Dict[str, Any]] = []
for c in df_sample.columns:
s = df_sample[c]
nonnull = int(s.notna().sum())
missing_pct = float(100 * (1 - nonnull / max(1, len(s))))
unit = _infer_unit(str(c))
# dtype inference
dtype = (
"numeric" if pd.api.types.is_numeric_dtype(s) else
"datetime" if pd.api.types.is_datetime64_any_dtype(s) else
"bool" if pd.api.types.is_bool_dtype(s) else
"categorical"
)
item: Dict[str, Any] = {"name": str(c), "dtype": dtype, "unit": unit,
"nonnull": nonnull, "missing_pct": round(missing_pct, 2)}
if dtype == "numeric":
item["stats"] = _numeric_profile(s, str(c))
else:
item["category_profile"] = _categorical_profile(s)
cols_summary.append(item)
# quick digest numbers
num_cols = sum(1 for c in cols_summary if c["dtype"] == "numeric")
cat_cols = sum(1 for c in cols_summary if c["dtype"] == "categorical")
med_missing = float(np.median([c["missing_pct"] for c in cols_summary])) if cols_summary else 0.0
summary_json = {
"file": os.path.basename(path),
"rows": int(n_rows),
"cols": int(n_cols),
"columns": cols_summary,
"privacy": {"small_cell_threshold": 10, "applied": True},
"notes": [],
}
digest_text = (f"{summary_json['file']}: {n_rows:,} rows; {n_cols} cols "
f"({num_cols} numeric, {cat_cols} categorical). "
f"Missingness median {med_missing:.1f}%.")
return summary_json, digest_text
def _read_csv_artifact(path: str) -> Dict[str, Any]:
# Lightweight legacy artifact (kept for compatibility with existing flows)
df = pd.read_csv(path, nrows=1000, dtype=str, low_memory=False)
cols = list(df.columns.astype(str))
preview = df.head(3).to_dict(orient="records")
text_summary = f"CSV FILE: {os.path.basename(path)}\nCOLUMNS: {', '.join(cols)}\nSAMPLE ROWS: {json.dumps(preview)}"
return {
"kind": "csv",
"name": os.path.basename(path),
"path": path,
"columns": cols,
"n_rows_sampled": len(df),
"preview_rows": preview,
"text": text_summary,
}
def _read_pdf_text(path: str) -> str:
if not _HAS_PDFPLUMBER:
return ""
import pdfplumber
out = []
try:
with pdfplumber.open(path) as pdf:
for page in pdf.pages[:15]:
t = page.extract_text() or ""
if t.strip():
out.append(t)
except Exception:
return ""
return "\n\n".join(out)
def _read_docx_text(path: str) -> str:
try:
import docx
except Exception:
return ""
try:
doc = docx.Document(path)
return "\n".join(p.text for p in doc.paragraphs if p.text.strip())
except Exception:
return ""
def _read_image_text(path: str) -> str:
try:
import pytesseract
from PIL import Image
img = Image.open(path)
return pytesseract.image_to_string(img) or ""
except Exception:
return ""
def extract_text_from_files(paths: List[str]) -> Dict[str, Any]:
"""
Returns:
{
"chunks": [str, ...], # textual chunks for retrieval
"artifacts": [ { structured meta }, ... ] # e.g., CSV columns + CSV summary
}
"""
chunks: List[str] = []
artifacts: List[Dict[str, Any]] = []
for p in paths or []:
if not p or not os.path.exists(p):
continue
name = os.path.basename(p).lower()
if name.endswith(".csv") or name.endswith(".tsv"):
try:
# New: structured summary + digest
summary_json, digest_text = summarize_csv(p)
artifacts.append({
"kind": "csv_summary",
"name": os.path.basename(p),
"path": p,
"summary": summary_json,
"digest": digest_text,
})
# Legacy artifact (columns/preview) kept for compatibility
art = _read_csv_artifact(p)
artifacts.append(art)
# Add short digest to text chunks (helps retrieval)
chunks.append(f"UPLOADED DATA SUMMARY:\n{digest_text}")
except Exception:
chunks.append(_read_text_file(p))
elif name.endswith(".pdf"):
txt = _read_pdf_text(p)
if txt.strip():
chunks.append(txt)
elif name.endswith(".docx"):
txt = _read_docx_text(p)
if txt.strip():
chunks.append(txt)
elif name.endswith((".txt", ".md", ".json")):
txt = _read_text_file(p)
if txt.strip():
chunks.append(txt)
elif name.endswith((".png", ".jpg", ".jpeg")):
txt = _read_image_text(p)
if txt.strip():
chunks.append(f"IMAGE OCR ({os.path.basename(p)}):\n{txt}")
else:
txt = _read_text_file(p)
if txt.strip():
chunks.append(txt)
return {"chunks": chunks, "artifacts": artifacts}