File size: 2,987 Bytes
aff5a07
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from __future__ import annotations
import os
from dataclasses import dataclass, field
from typing import Dict, Any, List, Optional
import pandas as pd

def _safe_read(path: str) -> Optional[pd.DataFrame]:
    name = (path or "").lower()
    try:
        if name.endswith(".csv"):
            return pd.read_csv(path, low_memory=False)
        if name.endswith(".xlsx") or name.endswith(".xls"):
            return pd.read_excel(path)
    except Exception:
        return None
    return None

def _dtype_of_series(s: pd.Series) -> str:
    if pd.api.types.is_integer_dtype(s): return "int"
    if pd.api.types.is_float_dtype(s): return "float"
    if pd.api.types.is_bool_dtype(s): return "bool"
    if pd.api.types.is_datetime64_any_dtype(s): return "datetime"
    return "string"

def _profile_df(df: pd.DataFrame, max_examples: int = 3) -> Dict[str, Any]:
    cols = []
    for c in df.columns:
        s = df[c]
        dtype = _dtype_of_series(s)
        ex_vals = s.dropna().astype(str).head(max_examples).tolist() if len(s) else []
        cols.append({
            "name": str(c),
            "dtype": dtype,
            "n_non_null": int(s.notna().sum()),
            "n_unique": int(s.nunique(dropna=True)),
            "examples": ex_vals
        })
    return {"n_rows": int(len(df)), "n_cols": int(df.shape[1]), "columns": cols}

@dataclass
class TableEntry:
    name: str
    path: str
    df: pd.DataFrame
    profile: Dict[str, Any] = field(default_factory=dict)

class DataRegistry:
    def __init__(self):
        self._tables: Dict[str, TableEntry] = {}

    def clear(self) -> None:
        self._tables.clear()

    def add_path(self, path: str) -> Optional[str]:
        if not path or not os.path.exists(path):
            return None
        df = _safe_read(path)
        if df is None:
            return None
        base = os.path.basename(path)
        key = base
        i = 2
        while key in self._tables:
            key = f"{base} ({i})"
            i += 1
        prof = _profile_df(df)
        self._tables[key] = TableEntry(name=key, path=path, df=df, profile=prof)
        return key

    def names(self) -> List[str]:
        return list(self._tables.keys())

    def get(self, name: str) -> Optional[pd.DataFrame]:
        return self._tables.get(name).df if name in self._tables else None

    def get_profile(self, name: str) -> Dict[str, Any]:
        return self._tables.get(name).profile if name in self._tables else {}

    def iter_tables(self) -> List[TableEntry]:
        return list(self._tables.values())

    def summarize_for_prompt(self, col_cap: int = 600) -> str:
        lines = []
        for t in self.iter_tables():
            cols = ", ".join([c["name"] for c in t.profile.get("columns", [])])
            if len(cols) > col_cap:
                cols = cols[:col_cap] + "…"
            lines.append(f"- {t.name}: rows={t.profile.get('n_rows', 0)} cols=[{cols}]")
        return "\n".join(lines) if lines else "- <none>"