Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import os | |
| from dataclasses import dataclass, field | |
| from typing import Dict, Any, List, Optional | |
| import pandas as pd | |
| def _safe_read(path: str) -> Optional[pd.DataFrame]: | |
| name = (path or "").lower() | |
| try: | |
| if name.endswith(".csv"): | |
| return pd.read_csv(path, low_memory=False) | |
| if name.endswith(".xlsx") or name.endswith(".xls"): | |
| return pd.read_excel(path) | |
| except Exception: | |
| return None | |
| return None | |
| def _dtype_of_series(s: pd.Series) -> str: | |
| if pd.api.types.is_integer_dtype(s): return "int" | |
| if pd.api.types.is_float_dtype(s): return "float" | |
| if pd.api.types.is_bool_dtype(s): return "bool" | |
| if pd.api.types.is_datetime64_any_dtype(s): return "datetime" | |
| return "string" | |
| def _profile_df(df: pd.DataFrame, max_examples: int = 3) -> Dict[str, Any]: | |
| cols = [] | |
| for c in df.columns: | |
| s = df[c] | |
| dtype = _dtype_of_series(s) | |
| ex_vals = s.dropna().astype(str).head(max_examples).tolist() if len(s) else [] | |
| cols.append({ | |
| "name": str(c), | |
| "dtype": dtype, | |
| "n_non_null": int(s.notna().sum()), | |
| "n_unique": int(s.nunique(dropna=True)), | |
| "examples": ex_vals | |
| }) | |
| return {"n_rows": int(len(df)), "n_cols": int(df.shape[1]), "columns": cols} | |
| class TableEntry: | |
| name: str | |
| path: str | |
| df: pd.DataFrame | |
| profile: Dict[str, Any] = field(default_factory=dict) | |
| class DataRegistry: | |
| def __init__(self): | |
| self._tables: Dict[str, TableEntry] = {} | |
| def clear(self) -> None: | |
| self._tables.clear() | |
| def add_path(self, path: str) -> Optional[str]: | |
| if not path or not os.path.exists(path): | |
| return None | |
| df = _safe_read(path) | |
| if df is None: | |
| return None | |
| base = os.path.basename(path) | |
| key = base | |
| i = 2 | |
| while key in self._tables: | |
| key = f"{base} ({i})" | |
| i += 1 | |
| prof = _profile_df(df) | |
| self._tables[key] = TableEntry(name=key, path=path, df=df, profile=prof) | |
| return key | |
| def names(self) -> List[str]: | |
| return list(self._tables.keys()) | |
| def get(self, name: str) -> Optional[pd.DataFrame]: | |
| return self._tables.get(name).df if name in self._tables else None | |
| def get_profile(self, name: str) -> Dict[str, Any]: | |
| return self._tables.get(name).profile if name in self._tables else {} | |
| def iter_tables(self) -> List[TableEntry]: | |
| return list(self._tables.values()) | |
| def summarize_for_prompt(self, col_cap: int = 600) -> str: | |
| lines = [] | |
| for t in self.iter_tables(): | |
| cols = ", ".join([c["name"] for c in t.profile.get("columns", [])]) | |
| if len(cols) > col_cap: | |
| cols = cols[:col_cap] + "…" | |
| lines.append(f"- {t.name}: rows={t.profile.get('n_rows', 0)} cols=[{cols}]") | |
| return "\n".join(lines) if lines else "- <none>" | |