Medica_DecisionSupportAI / data_registry.py
Rajan Sharma
Create data_registry.py
aff5a07 verified
raw
history blame
2.99 kB
from __future__ import annotations
import os
from dataclasses import dataclass, field
from typing import Dict, Any, List, Optional
import pandas as pd
def _safe_read(path: str) -> Optional[pd.DataFrame]:
name = (path or "").lower()
try:
if name.endswith(".csv"):
return pd.read_csv(path, low_memory=False)
if name.endswith(".xlsx") or name.endswith(".xls"):
return pd.read_excel(path)
except Exception:
return None
return None
def _dtype_of_series(s: pd.Series) -> str:
if pd.api.types.is_integer_dtype(s): return "int"
if pd.api.types.is_float_dtype(s): return "float"
if pd.api.types.is_bool_dtype(s): return "bool"
if pd.api.types.is_datetime64_any_dtype(s): return "datetime"
return "string"
def _profile_df(df: pd.DataFrame, max_examples: int = 3) -> Dict[str, Any]:
cols = []
for c in df.columns:
s = df[c]
dtype = _dtype_of_series(s)
ex_vals = s.dropna().astype(str).head(max_examples).tolist() if len(s) else []
cols.append({
"name": str(c),
"dtype": dtype,
"n_non_null": int(s.notna().sum()),
"n_unique": int(s.nunique(dropna=True)),
"examples": ex_vals
})
return {"n_rows": int(len(df)), "n_cols": int(df.shape[1]), "columns": cols}
@dataclass
class TableEntry:
name: str
path: str
df: pd.DataFrame
profile: Dict[str, Any] = field(default_factory=dict)
class DataRegistry:
def __init__(self):
self._tables: Dict[str, TableEntry] = {}
def clear(self) -> None:
self._tables.clear()
def add_path(self, path: str) -> Optional[str]:
if not path or not os.path.exists(path):
return None
df = _safe_read(path)
if df is None:
return None
base = os.path.basename(path)
key = base
i = 2
while key in self._tables:
key = f"{base} ({i})"
i += 1
prof = _profile_df(df)
self._tables[key] = TableEntry(name=key, path=path, df=df, profile=prof)
return key
def names(self) -> List[str]:
return list(self._tables.keys())
def get(self, name: str) -> Optional[pd.DataFrame]:
return self._tables.get(name).df if name in self._tables else None
def get_profile(self, name: str) -> Dict[str, Any]:
return self._tables.get(name).profile if name in self._tables else {}
def iter_tables(self) -> List[TableEntry]:
return list(self._tables.values())
def summarize_for_prompt(self, col_cap: int = 600) -> str:
lines = []
for t in self.iter_tables():
cols = ", ".join([c["name"] for c in t.profile.get("columns", [])])
if len(cols) > col_cap:
cols = cols[:col_cap] + "…"
lines.append(f"- {t.name}: rows={t.profile.get('n_rows', 0)} cols=[{cols}]")
return "\n".join(lines) if lines else "- <none>"