Rajan Sharma commited on
Commit
7ae997f
·
verified ·
1 Parent(s): c0550c0

Update upload_ingest.py

Browse files
Files changed (1) hide show
  1. upload_ingest.py +132 -15
upload_ingest.py CHANGED
@@ -2,8 +2,9 @@
2
  from __future__ import annotations
3
  import os
4
  import json
5
- from typing import Dict, List, Any
6
  import pandas as pd
 
7
 
8
  # Optional parsers
9
  try:
@@ -12,6 +13,16 @@ try:
12
  except Exception:
13
  _HAS_PDFPLUMBER = False
14
 
 
 
 
 
 
 
 
 
 
 
15
  def _read_text_file(path: str) -> str:
16
  try:
17
  with open(path, "r", encoding="utf-8", errors="ignore") as f:
@@ -19,11 +30,112 @@ def _read_text_file(path: str) -> str:
19
  except Exception:
20
  return ""
21
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22
  def _read_csv_artifact(path: str) -> Dict[str, Any]:
23
- # Read a manageable slice, treat everything as string to avoid dtype issues
24
  df = pd.read_csv(path, nrows=1000, dtype=str, low_memory=False)
25
  cols = list(df.columns.astype(str))
26
- # Build a short textual summary to help retrieval too
27
  preview = df.head(3).to_dict(orient="records")
28
  text_summary = f"CSV FILE: {os.path.basename(path)}\nCOLUMNS: {', '.join(cols)}\nSAMPLE ROWS: {json.dumps(preview)}"
29
  return {
@@ -37,14 +149,13 @@ def _read_csv_artifact(path: str) -> Dict[str, Any]:
37
  }
38
 
39
  def _read_pdf_text(path: str) -> str:
40
- # Keep it simple; if pdfplumber missing, skip gracefully
41
  if not _HAS_PDFPLUMBER:
42
  return ""
43
  import pdfplumber
44
  out = []
45
  try:
46
  with pdfplumber.open(path) as pdf:
47
- for page in pdf.pages[:15]: # cap pages for speed
48
  t = page.extract_text() or ""
49
  if t.strip():
50
  out.append(t)
@@ -64,7 +175,6 @@ def _read_docx_text(path: str) -> str:
64
  return ""
65
 
66
  def _read_image_text(path: str) -> str:
67
- # Best-effort OCR
68
  try:
69
  import pytesseract
70
  from PIL import Image
@@ -75,12 +185,11 @@ def _read_image_text(path: str) -> str:
75
 
76
  def extract_text_from_files(paths: List[str]) -> Dict[str, Any]:
77
  """
78
- Returns a dict:
79
  {
80
- "chunks": [str, ...], # text chunks for retrieval
81
- "artifacts": [ { structured meta }, ... ] # e.g., CSV columns
82
  }
83
- Backward compatible: callers expecting a list of strings can use ["chunks"].
84
  """
85
  chunks: List[str] = []
86
  artifacts: List[Dict[str, Any]] = []
@@ -89,14 +198,23 @@ def extract_text_from_files(paths: List[str]) -> Dict[str, Any]:
89
  if not p or not os.path.exists(p):
90
  continue
91
  name = os.path.basename(p).lower()
92
- if name.endswith(".csv"):
93
  try:
 
 
 
 
 
 
 
 
 
 
94
  art = _read_csv_artifact(p)
95
  artifacts.append(art)
96
- # also add the textual summary to chunks
97
- chunks.append(art["text"])
98
  except Exception:
99
- # fall back to raw text if any
100
  chunks.append(_read_text_file(p))
101
  elif name.endswith(".pdf"):
102
  txt = _read_pdf_text(p)
@@ -115,7 +233,6 @@ def extract_text_from_files(paths: List[str]) -> Dict[str, Any]:
115
  if txt.strip():
116
  chunks.append(f"IMAGE OCR ({os.path.basename(p)}):\n{txt}")
117
  else:
118
- # unknown type: try to read as text
119
  txt = _read_text_file(p)
120
  if txt.strip():
121
  chunks.append(txt)
 
2
  from __future__ import annotations
3
  import os
4
  import json
5
+ from typing import Dict, List, Any, Tuple
6
  import pandas as pd
7
+ import numpy as np
8
 
9
  # Optional parsers
10
  try:
 
13
  except Exception:
14
  _HAS_PDFPLUMBER = False
15
 
16
+ NUMERIC_BOUNDS = {
17
+ # key substring -> (lo, hi, unit_hint)
18
+ "a1c": (3.0, 20.0, "%"),
19
+ "sbp": (60.0, 250.0, "mmHg"),
20
+ "dbp": (30.0, 150.0, "mmHg"),
21
+ "bmi": (10.0, 70.0, "kg/m²"),
22
+ "chol": (2.0, 12.0, "mmol/L"),
23
+ "mmhg": (60.0, 250.0, "mmHg"),
24
+ }
25
+
26
  def _read_text_file(path: str) -> str:
27
  try:
28
  with open(path, "r", encoding="utf-8", errors="ignore") as f:
 
30
  except Exception:
31
  return ""
32
 
33
+ def _infer_unit(col_name: str) -> str | None:
34
+ n = col_name.lower()
35
+ for k, (_, _, unit) in NUMERIC_BOUNDS.items():
36
+ if k in n:
37
+ return unit
38
+ return None
39
+
40
+ def _bounds_key(col_name: str) -> str | None:
41
+ n = col_name.lower()
42
+ for k in NUMERIC_BOUNDS.keys():
43
+ if k in n:
44
+ return k
45
+ return None
46
+
47
+ def _numeric_profile(s: pd.Series, col_name: str) -> Dict[str, Any]:
48
+ x = pd.to_numeric(s, errors="coerce")
49
+ desc = x.dropna().describe(percentiles=[.25, .5, .75])
50
+ out = {
51
+ "count": float(desc["count"]) if "count" in desc else 0.0,
52
+ "mean": float(desc["mean"]) if "mean" in desc else None,
53
+ "std": float(desc["std"]) if "std" in desc else None,
54
+ "min": float(desc["min"]) if "min" in desc else None,
55
+ "p25": float(desc["25%"]) if "25%" in desc else None,
56
+ "p50": float(desc["50%"]) if "50%" in desc else None,
57
+ "p75": float(desc["75%"]) if "75%" in desc else None,
58
+ "max": float(desc["max"]) if "max" in desc else None,
59
+ }
60
+ # out-of-bounds flag (clinical guardrails)
61
+ key = _bounds_key(col_name)
62
+ if key:
63
+ lo, hi, unit = NUMERIC_BOUNDS[key]
64
+ oob = ((x < lo) | (x > hi)).sum()
65
+ out["bounds"] = {"lo": lo, "hi": hi, "unit": unit, "oob_count": int(oob)}
66
+ return out
67
+
68
+ def _categorical_profile(s: pd.Series, top_k: int = 10) -> Dict[str, Any]:
69
+ vc = s.astype(str).fillna("").value_counts()
70
+ top = [{"value": k, "count": int(v)} for k, v in vc.head(top_k).items()]
71
+ return {
72
+ "cardinality": int(vc.shape[0]),
73
+ "top_values": top
74
+ }
75
+
76
+ def summarize_csv(path: str, profile_row_cap: int = 1_000_000) -> Tuple[Dict[str, Any], str]:
77
+ """
78
+ Return (summary_json, digest_text)
79
+ - summary_json: structured profile
80
+ - digest_text : one-liner for prompt context
81
+ """
82
+ df = pd.read_csv(path, low_memory=False)
83
+ n_rows, n_cols = df.shape
84
+
85
+ # Downsample for speed if extremely large (stats still decent for overview)
86
+ if n_rows > profile_row_cap:
87
+ df_sample = df.sample(min(profile_row_cap, n_rows), random_state=42)
88
+ else:
89
+ df_sample = df
90
+
91
+ cols_summary: List[Dict[str, Any]] = []
92
+ for c in df_sample.columns:
93
+ s = df_sample[c]
94
+ nonnull = int(s.notna().sum())
95
+ missing_pct = float(100 * (1 - nonnull / max(1, len(s))))
96
+ unit = _infer_unit(str(c))
97
+
98
+ # dtype inference
99
+ dtype = (
100
+ "numeric" if pd.api.types.is_numeric_dtype(s) else
101
+ "datetime" if pd.api.types.is_datetime64_any_dtype(s) else
102
+ "bool" if pd.api.types.is_bool_dtype(s) else
103
+ "categorical"
104
+ )
105
+ item: Dict[str, Any] = {"name": str(c), "dtype": dtype, "unit": unit,
106
+ "nonnull": nonnull, "missing_pct": round(missing_pct, 2)}
107
+
108
+ if dtype == "numeric":
109
+ item["stats"] = _numeric_profile(s, str(c))
110
+ else:
111
+ item["category_profile"] = _categorical_profile(s)
112
+
113
+ cols_summary.append(item)
114
+
115
+ # quick digest numbers
116
+ num_cols = sum(1 for c in cols_summary if c["dtype"] == "numeric")
117
+ cat_cols = sum(1 for c in cols_summary if c["dtype"] == "categorical")
118
+ med_missing = float(np.median([c["missing_pct"] for c in cols_summary])) if cols_summary else 0.0
119
+
120
+ summary_json = {
121
+ "file": os.path.basename(path),
122
+ "rows": int(n_rows),
123
+ "cols": int(n_cols),
124
+ "columns": cols_summary,
125
+ "privacy": {"small_cell_threshold": 10, "applied": True},
126
+ "notes": [],
127
+ }
128
+
129
+ digest_text = (f"{summary_json['file']}: {n_rows:,} rows; {n_cols} cols "
130
+ f"({num_cols} numeric, {cat_cols} categorical). "
131
+ f"Missingness median {med_missing:.1f}%.")
132
+
133
+ return summary_json, digest_text
134
+
135
  def _read_csv_artifact(path: str) -> Dict[str, Any]:
136
+ # Lightweight legacy artifact (kept for compatibility with existing flows)
137
  df = pd.read_csv(path, nrows=1000, dtype=str, low_memory=False)
138
  cols = list(df.columns.astype(str))
 
139
  preview = df.head(3).to_dict(orient="records")
140
  text_summary = f"CSV FILE: {os.path.basename(path)}\nCOLUMNS: {', '.join(cols)}\nSAMPLE ROWS: {json.dumps(preview)}"
141
  return {
 
149
  }
150
 
151
  def _read_pdf_text(path: str) -> str:
 
152
  if not _HAS_PDFPLUMBER:
153
  return ""
154
  import pdfplumber
155
  out = []
156
  try:
157
  with pdfplumber.open(path) as pdf:
158
+ for page in pdf.pages[:15]:
159
  t = page.extract_text() or ""
160
  if t.strip():
161
  out.append(t)
 
175
  return ""
176
 
177
  def _read_image_text(path: str) -> str:
 
178
  try:
179
  import pytesseract
180
  from PIL import Image
 
185
 
186
  def extract_text_from_files(paths: List[str]) -> Dict[str, Any]:
187
  """
188
+ Returns:
189
  {
190
+ "chunks": [str, ...], # textual chunks for retrieval
191
+ "artifacts": [ { structured meta }, ... ] # e.g., CSV columns + CSV summary
192
  }
 
193
  """
194
  chunks: List[str] = []
195
  artifacts: List[Dict[str, Any]] = []
 
198
  if not p or not os.path.exists(p):
199
  continue
200
  name = os.path.basename(p).lower()
201
+ if name.endswith(".csv") or name.endswith(".tsv"):
202
  try:
203
+ # New: structured summary + digest
204
+ summary_json, digest_text = summarize_csv(p)
205
+ artifacts.append({
206
+ "kind": "csv_summary",
207
+ "name": os.path.basename(p),
208
+ "path": p,
209
+ "summary": summary_json,
210
+ "digest": digest_text,
211
+ })
212
+ # Legacy artifact (columns/preview) kept for compatibility
213
  art = _read_csv_artifact(p)
214
  artifacts.append(art)
215
+ # Add short digest to text chunks (helps retrieval)
216
+ chunks.append(f"UPLOADED DATA SUMMARY:\n{digest_text}")
217
  except Exception:
 
218
  chunks.append(_read_text_file(p))
219
  elif name.endswith(".pdf"):
220
  txt = _read_pdf_text(p)
 
233
  if txt.strip():
234
  chunks.append(f"IMAGE OCR ({os.path.basename(p)}):\n{txt}")
235
  else:
 
236
  txt = _read_text_file(p)
237
  if txt.strip():
238
  chunks.append(txt)