from __future__ import annotations import os from dataclasses import dataclass, field from typing import Dict, Any, List, Optional import pandas as pd def _safe_read(path: str) -> Optional[pd.DataFrame]: name = (path or "").lower() try: if name.endswith(".csv"): return pd.read_csv(path, low_memory=False) if name.endswith(".xlsx") or name.endswith(".xls"): return pd.read_excel(path) except Exception: return None return None def _dtype_of_series(s: pd.Series) -> str: if pd.api.types.is_integer_dtype(s): return "int" if pd.api.types.is_float_dtype(s): return "float" if pd.api.types.is_bool_dtype(s): return "bool" if pd.api.types.is_datetime64_any_dtype(s): return "datetime" return "string" def _profile_df(df: pd.DataFrame, max_examples: int = 3) -> Dict[str, Any]: cols = [] for c in df.columns: s = df[c] dtype = _dtype_of_series(s) ex_vals = s.dropna().astype(str).head(max_examples).tolist() if len(s) else [] cols.append({ "name": str(c), "dtype": dtype, "n_non_null": int(s.notna().sum()), "n_unique": int(s.nunique(dropna=True)), "examples": ex_vals }) return {"n_rows": int(len(df)), "n_cols": int(df.shape[1]), "columns": cols} @dataclass class TableEntry: name: str path: str df: pd.DataFrame profile: Dict[str, Any] = field(default_factory=dict) class DataRegistry: def __init__(self): self._tables: Dict[str, TableEntry] = {} def clear(self) -> None: self._tables.clear() def add_path(self, path: str) -> Optional[str]: if not path or not os.path.exists(path): return None df = _safe_read(path) if df is None: return None base = os.path.basename(path) key = base i = 2 while key in self._tables: key = f"{base} ({i})" i += 1 prof = _profile_df(df) self._tables[key] = TableEntry(name=key, path=path, df=df, profile=prof) return key def names(self) -> List[str]: return list(self._tables.keys()) def get(self, name: str) -> Optional[pd.DataFrame]: return self._tables.get(name).df if name in self._tables else None def get_profile(self, name: str) -> Dict[str, Any]: return self._tables.get(name).profile if name in self._tables else {} def iter_tables(self) -> List[TableEntry]: return list(self._tables.values()) def summarize_for_prompt(self, col_cap: int = 600) -> str: lines = [] for t in self.iter_tables(): cols = ", ".join([c["name"] for c in t.profile.get("columns", [])]) if len(cols) > col_cap: cols = cols[:col_cap] + "…" lines.append(f"- {t.name}: rows={t.profile.get('n_rows', 0)} cols=[{cols}]") return "\n".join(lines) if lines else "- "