Rajan Sharma commited on
Commit
5613174
·
verified ·
1 Parent(s): 13953b9

Update data_registry.py

Browse files
Files changed (1) hide show
  1. data_registry.py +320 -30
data_registry.py CHANGED
@@ -1,41 +1,205 @@
1
  from __future__ import annotations
2
  import os
 
3
  from dataclasses import dataclass, field
4
- from typing import Dict, Any, List, Optional
5
  import pandas as pd
6
 
7
- def _safe_read(path: str) -> Optional[pd.DataFrame]:
 
8
  name = (path or "").lower()
9
  try:
10
  if name.endswith(".csv"):
11
  return pd.read_csv(path, low_memory=False)
12
  if name.endswith(".xlsx") or name.endswith(".xls"):
13
  return pd.read_excel(path)
14
- except Exception:
 
 
 
 
 
 
 
15
  return None
16
  return None
17
 
18
- def _dtype_of_series(s: pd.Series) -> str:
19
- if pd.api.types.is_integer_dtype(s): return "int"
20
- if pd.api.types.is_float_dtype(s): return "float"
21
- if pd.api.types.is_bool_dtype(s): return "bool"
22
- if pd.api.types.is_datetime64_any_dtype(s): return "datetime"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
  return "string"
24
 
25
- def _profile_df(df: pd.DataFrame, max_examples: int = 3) -> Dict[str, Any]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
  cols = []
 
 
 
27
  for c in df.columns:
28
  s = df[c]
29
- dtype = _dtype_of_series(s)
 
30
  ex_vals = s.dropna().astype(str).head(max_examples).tolist() if len(s) else []
31
- cols.append({
 
32
  "name": str(c),
33
  "dtype": dtype,
 
34
  "n_non_null": int(s.notna().sum()),
35
  "n_unique": int(s.nunique(dropna=True)),
36
- "examples": ex_vals
37
- })
38
- return {"n_rows": int(len(df)), "n_cols": int(df.shape[1]), "columns": cols}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39
 
40
  @dataclass
41
  class TableEntry:
@@ -43,47 +207,173 @@ class TableEntry:
43
  path: str
44
  df: pd.DataFrame
45
  profile: Dict[str, Any] = field(default_factory=dict)
 
 
 
 
 
 
 
 
 
 
 
 
46
 
47
  class DataRegistry:
 
 
48
  def __init__(self):
49
  self._tables: Dict[str, TableEntry] = {}
50
-
51
  def clear(self) -> None:
 
52
  self._tables.clear()
53
-
54
  def add_path(self, path: str) -> Optional[str]:
 
55
  if not path or not os.path.exists(path):
56
  return None
57
- df = _safe_read(path)
 
58
  if df is None:
59
  return None
60
- base = os.path.basename(path)
 
 
61
  key = base
62
  i = 2
63
  while key in self._tables:
64
- key = f"{base} ({i})"
65
  i += 1
66
- prof = _profile_df(df)
 
 
67
  self._tables[key] = TableEntry(name=key, path=path, df=df, profile=prof)
68
  return key
69
-
 
 
 
 
 
 
 
 
 
 
 
 
 
70
  def names(self) -> List[str]:
 
71
  return list(self._tables.keys())
72
-
73
  def get(self, name: str) -> Optional[pd.DataFrame]:
 
74
  return self._tables.get(name).df if name in self._tables else None
75
-
 
 
 
 
76
  def get_profile(self, name: str) -> Dict[str, Any]:
 
77
  return self._tables.get(name).profile if name in self._tables else {}
78
-
79
  def iter_tables(self) -> List[TableEntry]:
 
80
  return list(self._tables.values())
81
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82
  def summarize_for_prompt(self, col_cap: int = 600) -> str:
 
 
 
 
83
  lines = []
84
  for t in self.iter_tables():
85
- cols = ", ".join([c["name"] for c in t.profile.get("columns", [])])
86
- if len(cols) > col_cap:
87
- cols = cols[:col_cap] + "…"
88
- lines.append(f"- {t.name}: rows={t.profile.get('n_rows', 0)} cols=[{cols}]")
89
- return "\n".join(lines) if lines else "- <none>"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  from __future__ import annotations
2
  import os
3
+ import re
4
  from dataclasses import dataclass, field
5
+ from typing import Dict, Any, List, Optional, Set, Tuple
6
  import pandas as pd
7
 
8
+ def saferead(path: str) -> Optional[pd.DataFrame]:
9
+ """Safely read various file formats into DataFrames."""
10
  name = (path or "").lower()
11
  try:
12
  if name.endswith(".csv"):
13
  return pd.read_csv(path, low_memory=False)
14
  if name.endswith(".xlsx") or name.endswith(".xls"):
15
  return pd.read_excel(path)
16
+ if name.endswith(".tsv"):
17
+ return pd.read_csv(path, sep='\t', low_memory=False)
18
+ if name.endswith(".json"):
19
+ return pd.read_json(path)
20
+ if name.endswith(".parquet"):
21
+ return pd.read_parquet(path)
22
+ except Exception as e:
23
+ print(f"Warning: Could not read {path}: {e}")
24
  return None
25
  return None
26
 
27
+ def dtypeof_series(s: pd.Series) -> str:
28
+ """Determine the semantic type of a pandas Series."""
29
+ if pd.api.types.is_integer_dtype(s):
30
+ return "int"
31
+ if pd.api.types.is_float_dtype(s):
32
+ return "float"
33
+ if pd.api.types.is_bool_dtype(s):
34
+ return "bool"
35
+ if pd.api.types.is_datetime64_any_dtype(s):
36
+ return "datetime"
37
+
38
+ # Check if string column could be numeric
39
+ if s.dtype == 'object':
40
+ sample = s.dropna().head(100)
41
+ if len(sample) > 0:
42
+ try:
43
+ numeric_sample = pd.to_numeric(sample, errors='coerce')
44
+ if numeric_sample.notna().sum() > len(sample) * 0.7:
45
+ return "numeric_as_string"
46
+ except:
47
+ pass
48
+
49
  return "string"
50
 
51
+ def detect_column_purpose(col_name: str, series: pd.Series) -> str:
52
+ """Detect the likely purpose/semantic meaning of a column."""
53
+ col_lower = col_name.lower()
54
+
55
+ # ID/Key patterns
56
+ if re.search(r'\bid\b|identifier|key|code', col_lower):
57
+ return "identifier"
58
+
59
+ # Time/Date patterns
60
+ if re.search(r'\btime\b|date|duration|wait|delay|length', col_lower):
61
+ if dtypeof_series(series) in ['int', 'float', 'numeric_as_string']:
62
+ return "time_metric"
63
+ else:
64
+ return "temporal"
65
+
66
+ # Financial patterns
67
+ if re.search(r'\bcost\b|price|budget|fee|expense|revenue|income', col_lower):
68
+ return "financial_metric"
69
+
70
+ # Location/Geographic patterns
71
+ if re.search(r'\bzone\b|region|area|district|location|address|city|state', col_lower):
72
+ return "geographic"
73
+
74
+ # Entity/Organization patterns
75
+ if re.search(r'\bfacility\b|hospital|clinic|organization|company|department', col_lower):
76
+ return "entity"
77
+
78
+ # Category/Classification patterns
79
+ if re.search(r'\btype\b|category|specialty|service|class|group', col_lower):
80
+ return "category"
81
+
82
+ # Performance/Quality patterns
83
+ if re.search(r'\bscore\b|rating|quality|performance|satisfaction|outcome', col_lower):
84
+ return "performance_metric"
85
+
86
+ # Count/Volume patterns
87
+ if re.search(r'\bcount\b|number|quantity|volume|total|sum', col_lower):
88
+ return "count_metric"
89
+
90
+ # Rate/Percentage patterns
91
+ if re.search(r'\brate\b|ratio|percent|frequency|proportion', col_lower):
92
+ return "rate_metric"
93
+
94
+ # Capacity patterns
95
+ if re.search(r'\bcapacity\b|beds|seats|slots|availability|utilization', col_lower):
96
+ return "capacity_metric"
97
+
98
+ # Generic categorization based on data characteristics
99
+ unique_ratio = series.nunique() / len(series) if len(series) > 0 else 0
100
+
101
+ if dtypeof_series(series) in ['int', 'float', 'numeric_as_string']:
102
+ return "numeric_metric"
103
+ elif unique_ratio < 0.1:
104
+ return "low_cardinality_category"
105
+ elif unique_ratio < 0.5:
106
+ return "category"
107
+ else:
108
+ return "text"
109
+
110
+ def profiledf(df: pd.DataFrame, max_examples: int = 3) -> Dict[str, Any]:
111
+ """Generate a comprehensive profile of a DataFrame."""
112
  cols = []
113
+ numeric_cols = []
114
+ categorical_cols = []
115
+
116
  for c in df.columns:
117
  s = df[c]
118
+ dtype = dtypeof_series(s)
119
+ purpose = detect_column_purpose(str(c), s)
120
  ex_vals = s.dropna().astype(str).head(max_examples).tolist() if len(s) else []
121
+
122
+ col_profile = {
123
  "name": str(c),
124
  "dtype": dtype,
125
+ "purpose": purpose,
126
  "n_non_null": int(s.notna().sum()),
127
  "n_unique": int(s.nunique(dropna=True)),
128
+ "examples": ex_vals,
129
+ "missing_ratio": round(s.isna().sum() / len(s), 3) if len(s) > 0 else 0
130
+ }
131
+
132
+ # Add statistics for numeric columns
133
+ if dtype in ['int', 'float', 'numeric_as_string']:
134
+ try:
135
+ if dtype == 'numeric_as_string':
136
+ numeric_series = pd.to_numeric(s, errors='coerce')
137
+ else:
138
+ numeric_series = s
139
+
140
+ col_profile.update({
141
+ "min": float(numeric_series.min()) if not numeric_series.isna().all() else None,
142
+ "max": float(numeric_series.max()) if not numeric_series.isna().all() else None,
143
+ "mean": float(numeric_series.mean()) if not numeric_series.isna().all() else None,
144
+ "std": float(numeric_series.std()) if not numeric_series.isna().all() else None
145
+ })
146
+ numeric_cols.append(str(c))
147
+ except:
148
+ pass
149
+
150
+ # Track categorical columns
151
+ if purpose in ['category', 'entity', 'geographic', 'low_cardinality_category']:
152
+ categorical_cols.append(str(c))
153
+
154
+ cols.append(col_profile)
155
+
156
+ return {
157
+ "n_rows": int(len(df)),
158
+ "n_cols": int(df.shape[1]),
159
+ "columns": cols,
160
+ "numeric_columns": numeric_cols,
161
+ "categorical_columns": categorical_cols,
162
+ "analysis_potential": _assess_analysis_potential(cols)
163
+ }
164
+
165
+ def _assess_analysis_potential(column_profiles: List[Dict[str, Any]]) -> Dict[str, Any]:
166
+ """Assess what types of analysis are possible with this data."""
167
+ potential = {
168
+ "can_rank": False,
169
+ "can_compare_groups": False,
170
+ "can_analyze_trends": False,
171
+ "has_entities": False,
172
+ "has_metrics": False,
173
+ "suggested_grouping_cols": [],
174
+ "suggested_metric_cols": []
175
+ }
176
+
177
+ entity_cols = []
178
+ metric_cols = []
179
+
180
+ for col in column_profiles:
181
+ purpose = col.get("purpose", "")
182
+
183
+ # Identify grouping/entity columns
184
+ if purpose in ["entity", "category", "geographic", "low_cardinality_category"]:
185
+ entity_cols.append(col["name"])
186
+ if col.get("n_unique", 0) >= 2: # At least 2 groups needed
187
+ potential["suggested_grouping_cols"].append(col["name"])
188
+
189
+ # Identify metric columns
190
+ if purpose.endswith("_metric") or purpose in ["numeric_metric"]:
191
+ metric_cols.append(col["name"])
192
+ if col.get("n_non_null", 0) > 0: # Has actual data
193
+ potential["suggested_metric_cols"].append(col["name"])
194
+
195
+ # Assess capabilities
196
+ potential["has_entities"] = len(entity_cols) > 0
197
+ potential["has_metrics"] = len(metric_cols) > 0
198
+ potential["can_rank"] = len(potential["suggested_grouping_cols"]) > 0 and len(potential["suggested_metric_cols"]) > 0
199
+ potential["can_compare_groups"] = potential["can_rank"]
200
+ potential["can_analyze_trends"] = any(col.get("purpose") == "temporal" for col in column_profiles)
201
+
202
+ return potential
203
 
204
  @dataclass
205
  class TableEntry:
 
207
  path: str
208
  df: pd.DataFrame
209
  profile: Dict[str, Any] = field(default_factory=dict)
210
+
211
+ def get_grouping_columns(self) -> List[str]:
212
+ """Get columns suitable for grouping analysis."""
213
+ return self.profile.get("analysis_potential", {}).get("suggested_grouping_cols", [])
214
+
215
+ def get_metric_columns(self) -> List[str]:
216
+ """Get columns suitable as metrics."""
217
+ return self.profile.get("analysis_potential", {}).get("suggested_metric_cols", [])
218
+
219
+ def can_support_ranking(self) -> bool:
220
+ """Check if this table can support ranking analysis."""
221
+ return self.profile.get("analysis_potential", {}).get("can_rank", False)
222
 
223
  class DataRegistry:
224
+ """Registry for managing multiple data tables with analysis capabilities."""
225
+
226
  def __init__(self):
227
  self._tables: Dict[str, TableEntry] = {}
228
+
229
  def clear(self) -> None:
230
+ """Clear all tables from the registry."""
231
  self._tables.clear()
232
+
233
  def add_path(self, path: str) -> Optional[str]:
234
+ """Add a data file to the registry."""
235
  if not path or not os.path.exists(path):
236
  return None
237
+
238
+ df = saferead(path)
239
  if df is None:
240
  return None
241
+
242
+ # Generate unique name
243
+ base = os.path.splitext(os.path.basename(path))[0] # Remove extension for cleaner names
244
  key = base
245
  i = 2
246
  while key in self._tables:
247
+ key = f"{base}_{i}"
248
  i += 1
249
+
250
+ # Profile the dataframe
251
+ prof = profiledf(df)
252
  self._tables[key] = TableEntry(name=key, path=path, df=df, profile=prof)
253
  return key
254
+
255
+ def add_dataframe(self, df: pd.DataFrame, name: str) -> str:
256
+ """Add a DataFrame directly to the registry."""
257
+ # Ensure unique name
258
+ key = name
259
+ i = 2
260
+ while key in self._tables:
261
+ key = f"{name}_{i}"
262
+ i += 1
263
+
264
+ prof = profiledf(df)
265
+ self._tables[key] = TableEntry(name=key, path="", df=df, profile=prof)
266
+ return key
267
+
268
  def names(self) -> List[str]:
269
+ """Get names of all tables."""
270
  return list(self._tables.keys())
271
+
272
  def get(self, name: str) -> Optional[pd.DataFrame]:
273
+ """Get a DataFrame by name."""
274
  return self._tables.get(name).df if name in self._tables else None
275
+
276
+ def get_table(self, name: str) -> Optional[TableEntry]:
277
+ """Get a TableEntry by name."""
278
+ return self._tables.get(name)
279
+
280
  def get_profile(self, name: str) -> Dict[str, Any]:
281
+ """Get the profile of a table."""
282
  return self._tables.get(name).profile if name in self._tables else {}
283
+
284
  def iter_tables(self) -> List[TableEntry]:
285
+ """Iterate over all table entries."""
286
  return list(self._tables.values())
287
+
288
+ def get_analysis_ready_tables(self) -> List[TableEntry]:
289
+ """Get tables that are ready for analysis (have both grouping and metric columns)."""
290
+ return [t for t in self._tables.values() if t.can_support_ranking()]
291
+
292
+ def find_tables_with_column_purpose(self, purpose: str) -> List[Tuple[str, str]]:
293
+ """Find tables and columns that match a specific purpose."""
294
+ matches = []
295
+ for table in self._tables.values():
296
+ for col in table.profile.get("columns", []):
297
+ if col.get("purpose") == purpose:
298
+ matches.append((table.name, col["name"]))
299
+ return matches
300
+
301
+ def get_all_numeric_columns(self) -> Dict[str, List[str]]:
302
+ """Get all numeric columns across all tables."""
303
+ numeric_cols = {}
304
+ for table in self._tables.values():
305
+ numeric_cols[table.name] = table.profile.get("numeric_columns", [])
306
+ return numeric_cols
307
+
308
+ def get_all_categorical_columns(self) -> Dict[str, List[str]]:
309
+ """Get all categorical columns across all tables."""
310
+ categorical_cols = {}
311
+ for table in self._tables.values():
312
+ categorical_cols[table.name] = table.profile.get("categorical_columns", [])
313
+ return categorical_cols
314
+
315
  def summarize_for_prompt(self, col_cap: int = 600) -> str:
316
+ """Generate a summary suitable for LLM prompts."""
317
+ if not self._tables:
318
+ return "No data tables available."
319
+
320
  lines = []
321
  for t in self.iter_tables():
322
+ # Basic info
323
+ n_rows = t.profile.get('n_rows', 0)
324
+ n_cols = t.profile.get('n_cols', 0)
325
+
326
+ # Column info with purposes
327
+ cols_with_purpose = []
328
+ for col in t.profile.get("columns", []):
329
+ name = col["name"]
330
+ purpose = col.get("purpose", "unknown")
331
+ if purpose != "text": # Skip generic text columns for brevity
332
+ cols_with_purpose.append(f"{name}({purpose})")
333
+ else:
334
+ cols_with_purpose.append(name)
335
+
336
+ cols_str = ", ".join(cols_with_purpose)
337
+ if len(cols_str) > col_cap:
338
+ cols_str = cols_str[:col_cap] + "…"
339
+
340
+ # Analysis potential
341
+ potential = t.profile.get("analysis_potential", {})
342
+ capabilities = []
343
+ if potential.get("can_rank"):
344
+ capabilities.append("can_rank")
345
+ if potential.get("can_compare_groups"):
346
+ capabilities.append("can_compare")
347
+ if potential.get("can_analyze_trends"):
348
+ capabilities.append("can_trend")
349
+
350
+ cap_str = f" [{','.join(capabilities)}]" if capabilities else ""
351
+
352
+ lines.append(f"- {t.name}: {n_rows} rows, {n_cols} cols{cap_str}")
353
+ lines.append(f" Columns: {cols_str}")
354
+
355
+ return "\n".join(lines)
356
+
357
+ def get_analysis_suggestions(self) -> Dict[str, List[str]]:
358
+ """Get suggestions for possible analyses based on available data."""
359
+ suggestions = {
360
+ "rankings": [],
361
+ "comparisons": [],
362
+ "trends": []
363
+ }
364
+
365
+ for table in self._tables.values():
366
+ grouping_cols = table.get_grouping_columns()
367
+ metric_cols = table.get_metric_columns()
368
+
369
+ # Ranking suggestions
370
+ for group_col in grouping_cols[:2]: # Limit to avoid overwhelming
371
+ for metric_col in metric_cols[:2]:
372
+ suggestions["rankings"].append(f"Rank {group_col} by {metric_col} (table: {table.name})")
373
+
374
+ # Comparison suggestions
375
+ for group_col in grouping_cols[:2]:
376
+ for metric_col in metric_cols[:2]:
377
+ suggestions["comparisons"].append(f"Compare {metric_col} across {group_col} (table: {table.name})")
378
+
379
+ return suggestions