Rajan Sharma commited on
Commit
8af0435
·
verified ·
1 Parent(s): 467b40f

Update data_registry.py

Browse files
Files changed (1) hide show
  1. data_registry.py +101 -359
data_registry.py CHANGED
@@ -1,379 +1,121 @@
1
- from __future__ import annotations
2
- import os
3
- import re
4
- from dataclasses import dataclass, field
5
- from typing import Dict, Any, List, Optional, Set, Tuple
6
  import pandas as pd
7
-
8
- def saferead(path: str) -> Optional[pd.DataFrame]:
9
- """Safely read various file formats into DataFrames."""
10
- name = (path or "").lower()
11
- try:
12
- if name.endswith(".csv"):
13
- return pd.read_csv(path, low_memory=False)
14
- if name.endswith(".xlsx") or name.endswith(".xls"):
15
- return pd.read_excel(path)
16
- if name.endswith(".tsv"):
17
- return pd.read_csv(path, sep='\t', low_memory=False)
18
- if name.endswith(".json"):
19
- return pd.read_json(path)
20
- if name.endswith(".parquet"):
21
- return pd.read_parquet(path)
22
- except Exception as e:
23
- print(f"Warning: Could not read {path}: {e}")
24
- return None
25
- return None
26
-
27
- def dtypeof_series(s: pd.Series) -> str:
28
- """Determine the semantic type of a pandas Series."""
29
- if pd.api.types.is_integer_dtype(s):
30
- return "int"
31
- if pd.api.types.is_float_dtype(s):
32
- return "float"
33
- if pd.api.types.is_bool_dtype(s):
34
- return "bool"
35
- if pd.api.types.is_datetime64_any_dtype(s):
36
- return "datetime"
37
-
38
- # Check if string column could be numeric
39
- if s.dtype == 'object':
40
- sample = s.dropna().head(100)
41
- if len(sample) > 0:
42
- try:
43
- numeric_sample = pd.to_numeric(sample, errors='coerce')
44
- if numeric_sample.notna().sum() > len(sample) * 0.7:
45
- return "numeric_as_string"
46
- except:
47
- pass
48
-
49
- return "string"
50
-
51
- def detect_column_purpose(col_name: str, series: pd.Series) -> str:
52
- """Detect the likely purpose/semantic meaning of a column."""
53
- col_lower = col_name.lower()
54
-
55
- # ID/Key patterns
56
- if re.search(r'\bid\b|identifier|key|code', col_lower):
57
- return "identifier"
58
-
59
- # Time/Date patterns
60
- if re.search(r'\btime\b|date|duration|wait|delay|length', col_lower):
61
- if dtypeof_series(series) in ['int', 'float', 'numeric_as_string']:
62
- return "time_metric"
63
- else:
64
- return "temporal"
65
-
66
- # Financial patterns
67
- if re.search(r'\bcost\b|price|budget|fee|expense|revenue|income', col_lower):
68
- return "financial_metric"
69
-
70
- # Location/Geographic patterns
71
- if re.search(r'\bzone\b|region|area|district|location|address|city|state', col_lower):
72
- return "geographic"
73
-
74
- # Entity/Organization patterns
75
- if re.search(r'\bfacility\b|hospital|clinic|organization|company|department', col_lower):
76
- return "entity"
77
-
78
- # Category/Classification patterns
79
- if re.search(r'\btype\b|category|specialty|service|class|group', col_lower):
80
- return "category"
81
-
82
- # Performance/Quality patterns
83
- if re.search(r'\bscore\b|rating|quality|performance|satisfaction|outcome', col_lower):
84
- return "performance_metric"
85
-
86
- # Count/Volume patterns
87
- if re.search(r'\bcount\b|number|quantity|volume|total|sum', col_lower):
88
- return "count_metric"
89
-
90
- # Rate/Percentage patterns
91
- if re.search(r'\brate\b|ratio|percent|frequency|proportion', col_lower):
92
- return "rate_metric"
93
-
94
- # Capacity patterns
95
- if re.search(r'\bcapacity\b|beds|seats|slots|availability|utilization', col_lower):
96
- return "capacity_metric"
97
-
98
- # Generic categorization based on data characteristics
99
- unique_ratio = series.nunique() / len(series) if len(series) > 0 else 0
100
-
101
- if dtypeof_series(series) in ['int', 'float', 'numeric_as_string']:
102
- return "numeric_metric"
103
- elif unique_ratio < 0.1:
104
- return "low_cardinality_category"
105
- elif unique_ratio < 0.5:
106
- return "category"
107
- else:
108
- return "text"
109
-
110
- def profiledf(df: pd.DataFrame, max_examples: int = 3) -> Dict[str, Any]:
111
- """Generate a comprehensive profile of a DataFrame."""
112
- cols = []
113
- numeric_cols = []
114
- categorical_cols = []
115
-
116
- for c in df.columns:
117
- s = df[c]
118
- dtype = dtypeof_series(s)
119
- purpose = detect_column_purpose(str(c), s)
120
- ex_vals = s.dropna().astype(str).head(max_examples).tolist() if len(s) else []
121
-
122
- col_profile = {
123
- "name": str(c),
124
- "dtype": dtype,
125
- "purpose": purpose,
126
- "n_non_null": int(s.notna().sum()),
127
- "n_unique": int(s.nunique(dropna=True)),
128
- "examples": ex_vals,
129
- "missing_ratio": round(s.isna().sum() / len(s), 3) if len(s) > 0 else 0
130
- }
131
-
132
- # Add statistics for numeric columns
133
- if dtype in ['int', 'float', 'numeric_as_string']:
134
- try:
135
- if dtype == 'numeric_as_string':
136
- numeric_series = pd.to_numeric(s, errors='coerce')
137
- else:
138
- numeric_series = s
139
-
140
- col_profile.update({
141
- "min": float(numeric_series.min()) if not numeric_series.isna().all() else None,
142
- "max": float(numeric_series.max()) if not numeric_series.isna().all() else None,
143
- "mean": float(numeric_series.mean()) if not numeric_series.isna().all() else None,
144
- "std": float(numeric_series.std()) if not numeric_series.isna().all() else None
145
- })
146
- numeric_cols.append(str(c))
147
- except:
148
- pass
149
-
150
- # Track categorical columns
151
- if purpose in ['category', 'entity', 'geographic', 'low_cardinality_category']:
152
- categorical_cols.append(str(c))
153
-
154
- cols.append(col_profile)
155
-
156
- return {
157
- "n_rows": int(len(df)),
158
- "n_cols": int(df.shape[1]),
159
- "columns": cols,
160
- "numeric_columns": numeric_cols,
161
- "categorical_columns": categorical_cols,
162
- "analysis_potential": _assess_analysis_potential(cols)
163
- }
164
-
165
- def _assess_analysis_potential(column_profiles: List[Dict[str, Any]]) -> Dict[str, Any]:
166
- """Assess what types of analysis are possible with this data."""
167
- potential = {
168
- "can_rank": False,
169
- "can_compare_groups": False,
170
- "can_analyze_trends": False,
171
- "has_entities": False,
172
- "has_metrics": False,
173
- "suggested_grouping_cols": [],
174
- "suggested_metric_cols": []
175
- }
176
-
177
- entity_cols = []
178
- metric_cols = []
179
-
180
- for col in column_profiles:
181
- purpose = col.get("purpose", "")
182
-
183
- # Identify grouping/entity columns
184
- if purpose in ["entity", "category", "geographic", "low_cardinality_category"]:
185
- entity_cols.append(col["name"])
186
- if col.get("n_unique", 0) >= 2: # At least 2 groups needed
187
- potential["suggested_grouping_cols"].append(col["name"])
188
-
189
- # Identify metric columns
190
- if purpose.endswith("_metric") or purpose in ["numeric_metric"]:
191
- metric_cols.append(col["name"])
192
- if col.get("n_non_null", 0) > 0: # Has actual data
193
- potential["suggested_metric_cols"].append(col["name"])
194
-
195
- # Assess capabilities
196
- potential["has_entities"] = len(entity_cols) > 0
197
- potential["has_metrics"] = len(metric_cols) > 0
198
- potential["can_rank"] = len(potential["suggested_grouping_cols"]) > 0 and len(potential["suggested_metric_cols"]) > 0
199
- potential["can_compare_groups"] = potential["can_rank"]
200
- potential["can_analyze_trends"] = any(col.get("purpose") == "temporal" for col in column_profiles)
201
-
202
- return potential
203
-
204
- @dataclass
205
- class TableEntry:
206
- name: str
207
- path: str
208
- df: pd.DataFrame
209
- profile: Dict[str, Any] = field(default_factory=dict)
210
-
211
- def get_grouping_columns(self) -> List[str]:
212
- """Get columns suitable for grouping analysis."""
213
- return self.profile.get("analysis_potential", {}).get("suggested_grouping_cols", [])
214
-
215
- def get_metric_columns(self) -> List[str]:
216
- """Get columns suitable as metrics."""
217
- return self.profile.get("analysis_potential", {}).get("suggested_metric_cols", [])
218
-
219
- def can_support_ranking(self) -> bool:
220
- """Check if this table can support ranking analysis."""
221
- return self.profile.get("analysis_potential", {}).get("can_rank", False)
222
 
223
  class DataRegistry:
224
- """Registry for managing multiple data tables with analysis capabilities."""
225
-
226
  def __init__(self):
227
- self._tables: Dict[str, TableEntry] = {}
228
-
229
- def clear(self) -> None:
230
- """Clear all tables from the registry."""
231
- self._tables.clear()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
232
 
233
- def add_path(self, path: str) -> Optional[str]:
234
- """Add a data file to the registry."""
235
- if not path or not os.path.exists(path):
236
- return None
237
 
238
- df = saferead(path)
239
- if df is None:
240
- return None
 
 
 
 
241
 
242
- # Generate unique name
243
- base = os.path.splitext(os.path.basename(path))[0] # Remove extension for cleaner names
244
- key = base
245
- i = 2
246
- while key in self._tables:
247
- key = f"{base}_{i}"
248
- i += 1
 
 
 
 
 
 
249
 
250
- # Profile the dataframe
251
- prof = profiledf(df)
252
- self._tables[key] = TableEntry(name=key, path=path, df=df, profile=prof)
253
- return key
254
-
255
- def add_dataframe(self, df: pd.DataFrame, name: str) -> str:
256
- """Add a DataFrame directly to the registry."""
257
- # Ensure unique name
258
- key = name
259
- i = 2
260
- while key in self._tables:
261
- key = f"{name}_{i}"
262
- i += 1
263
 
264
- prof = profiledf(df)
265
- self._tables[key] = TableEntry(name=key, path="", df=df, profile=prof)
266
- return key
267
-
268
- def names(self) -> List[str]:
269
- """Get names of all tables."""
270
- return list(self._tables.keys())
271
-
272
- def get(self, name: str) -> Optional[pd.DataFrame]:
273
- """Get a DataFrame by name."""
274
- return self._tables.get(name).df if name in self._tables else None
275
-
276
- def get_table(self, name: str) -> Optional[TableEntry]:
277
- """Get a TableEntry by name."""
278
- return self._tables.get(name)
279
-
280
- def get_profile(self, name: str) -> Dict[str, Any]:
281
- """Get the profile of a table."""
282
- return self._tables.get(name).profile if name in self._tables else {}
283
 
284
- def iter_tables(self) -> List[TableEntry]:
285
- """Iterate over all table entries."""
286
- return list(self._tables.values())
287
 
288
- def get_analysis_ready_tables(self) -> List[TableEntry]:
289
- """Get tables that are ready for analysis (have both grouping and metric columns)."""
290
- return [t for t in self._tables.values() if t.can_support_ranking()]
 
291
 
292
- def find_tables_with_column_purpose(self, purpose: str) -> List[Tuple[str, str]]:
293
- """Find tables and columns that match a specific purpose."""
294
- matches = []
295
- for table in self._tables.values():
296
- for col in table.profile.get("columns", []):
297
- if col.get("purpose") == purpose:
298
- matches.append((table.name, col["name"]))
299
- return matches
300
 
301
- def get_all_numeric_columns(self) -> Dict[str, List[str]]:
302
- """Get all numeric columns across all tables."""
303
- numeric_cols = {}
304
- for table in self._tables.values():
305
- numeric_cols[table.name] = table.profile.get("numeric_columns", [])
306
- return numeric_cols
307
 
308
- def get_all_categorical_columns(self) -> Dict[str, List[str]]:
309
- """Get all categorical columns across all tables."""
310
- categorical_cols = {}
311
- for table in self._tables.values():
312
- categorical_cols[table.name] = table.profile.get("categorical_columns", [])
313
- return categorical_cols
314
-
315
- def summarize_for_prompt(self, col_cap: int = 600) -> str:
316
- """Generate a summary suitable for LLM prompts."""
317
- if not self._tables:
318
- return "No data tables available."
319
 
320
- lines = []
321
- for t in self.iter_tables():
322
- # Basic info
323
- n_rows = t.profile.get('n_rows', 0)
324
- n_cols = t.profile.get('n_cols', 0)
325
-
326
- # Column info with purposes
327
- cols_with_purpose = []
328
- for col in t.profile.get("columns", []):
329
- name = col["name"]
330
- purpose = col.get("purpose", "unknown")
331
- if purpose != "text": # Skip generic text columns for brevity
332
- cols_with_purpose.append(f"{name}({purpose})")
333
- else:
334
- cols_with_purpose.append(name)
335
 
336
- cols_str = ", ".join(cols_with_purpose)
337
- if len(cols_str) > col_cap:
338
- cols_str = cols_str[:col_cap] + "…"
 
339
 
340
- # Analysis potential
341
- potential = t.profile.get("analysis_potential", {})
342
- capabilities = []
343
- if potential.get("can_rank"):
344
- capabilities.append("can_rank")
345
- if potential.get("can_compare_groups"):
346
- capabilities.append("can_compare")
347
- if potential.get("can_analyze_trends"):
348
- capabilities.append("can_trend")
349
 
350
- cap_str = f" [{','.join(capabilities)}]" if capabilities else ""
351
-
352
- lines.append(f"- {t.name}: {n_rows} rows, {n_cols} cols{cap_str}")
353
- lines.append(f" Columns: {cols_str}")
354
 
355
- return "\n".join(lines)
356
 
357
- def get_analysis_suggestions(self) -> Dict[str, List[str]]:
358
- """Get suggestions for possible analyses based on available data."""
359
- suggestions = {
360
- "rankings": [],
361
- "comparisons": [],
362
- "trends": []
363
- }
364
-
365
- for table in self._tables.values():
366
- grouping_cols = table.get_grouping_columns()
367
- metric_cols = table.get_metric_columns()
368
-
369
- # Ranking suggestions
370
- for group_col in grouping_cols[:2]: # Limit to avoid overwhelming
371
- for metric_col in metric_cols[:2]:
372
- suggestions["rankings"].append(f"Rank {group_col} by {metric_col} (table: {table.name})")
373
-
374
- # Comparison suggestions
375
- for group_col in grouping_cols[:2]:
376
- for metric_col in metric_cols[:2]:
377
- suggestions["comparisons"].append(f"Compare {metric_col} across {group_col} (table: {table.name})")
378
-
379
- return suggestions
 
1
+ # data_registry.py
 
 
 
 
2
  import pandas as pd
3
+ import numpy as np
4
+ from typing import Dict, Any, List, Optional
5
+ import os
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6
 
7
  class DataRegistry:
 
 
8
  def __init__(self):
9
+ self.data = {}
10
+ self.metadata = {}
11
+ self.healthcare_metadata = {}
12
+
13
+ def add_path(self, path: str) -> bool:
14
+ """Add a data file to the registry with healthcare-specific handling."""
15
+ try:
16
+ file_name = os.path.basename(path)
17
+
18
+ if file_name.endswith('.csv'):
19
+ df = pd.read_csv(path)
20
+
21
+ # Standardize column names
22
+ df.columns = [col.strip().lower().replace(' ', '_').replace('-', '_') for col in df.columns]
23
+
24
+ self.data[file_name] = df
25
+
26
+ # Basic metadata
27
+ self.metadata[file_name] = {
28
+ 'type': 'csv',
29
+ 'columns': list(df.columns),
30
+ 'shape': df.shape,
31
+ 'sample': df.head(3).to_dict('records')
32
+ }
33
+
34
+ # Healthcare-specific metadata extraction
35
+ self._extract_healthcare_metadata(file_name, df)
36
+
37
+ return True
38
+ return False
39
+
40
+ except Exception as e:
41
+ print(f"Error adding {path}: {e}")
42
+ return False
43
 
44
+ def _extract_healthcare_metadata(self, file_name: str, df: pd.DataFrame):
45
+ """Extract healthcare-specific metadata from the dataframe."""
46
+ healthcare_meta = {}
 
47
 
48
+ # Check for healthcare facility data
49
+ if any(col in df.columns for col in ['facility_name', 'facility_type', 'odhf_facility_type']):
50
+ healthcare_meta['data_type'] = 'healthcare_facilities'
51
+ if 'facility_type' in df.columns:
52
+ healthcare_meta['facility_types'] = df['facility_type'].value_counts().to_dict()
53
+ if 'city' in df.columns:
54
+ healthcare_meta['cities'] = df['city'].value_counts().head(10).to_dict()
55
 
56
+ # Check for bed capacity data
57
+ if any(col in df.columns for col in ['beds_current', 'beds_prev', 'bed_count']):
58
+ healthcare_meta['data_type'] = 'bed_capacity'
59
+ if 'zone' in df.columns:
60
+ healthcare_meta['zones'] = df['zone'].unique().tolist()
61
+ if 'teaching_status' in df.columns:
62
+ healthcare_meta['teaching_status_counts'] = df['teaching_status'].value_counts().to_dict()
63
+
64
+ # Calculate derived metrics
65
+ if 'beds_current' in df.columns and 'beds_prev' in df.columns:
66
+ df['bed_change'] = df['beds_current'] - df['beds_prev']
67
+ df['percent_change'] = (df['bed_change'] / df['beds_prev']) * 100
68
+ healthcare_meta['has_derived_metrics'] = True
69
 
70
+ # Check for patient data (with privacy warning)
71
+ if any(col in df.columns for col in ['patient_id', 'patient_name', 'mrn']):
72
+ healthcare_meta['data_type'] = 'patient_data'
73
+ healthcare_meta['privacy_warning'] = "This file contains patient identifiers. Ensure proper handling."
 
 
 
 
 
 
 
 
 
74
 
75
+ if healthcare_meta:
76
+ self.healthcare_metadata[file_name] = healthcare_meta
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
77
 
78
+ def get_healthcare_metadata(self, name: str) -> Dict[str, Any]:
79
+ """Get healthcare-specific metadata for a file."""
80
+ return self.healthcare_metadata.get(name, {})
81
 
82
+ def get_data_type(self, name: str) -> str:
83
+ """Get the healthcare data type of a file."""
84
+ meta = self.get_healthcare_metadata(name)
85
+ return meta.get('data_type', 'unknown')
86
 
87
+ def names(self):
88
+ return list(self.data.keys())
 
 
 
 
 
 
89
 
90
+ def get(self, name):
91
+ return self.data.get(name)
 
 
 
 
92
 
93
+ def summarize_for_prompt(self) -> str:
94
+ """Generate a summary of all data for prompt inclusion."""
95
+ if not self.data:
96
+ return "No data files registered."
 
 
 
 
 
 
 
97
 
98
+ summary_parts = []
99
+ for file_name in self.names():
100
+ meta = self.metadata.get(file_name, {})
101
+ health_meta = self.get_healthcare_metadata(file_name)
 
 
 
 
 
 
 
 
 
 
 
102
 
103
+ summary_parts.append(f"File: {file_name}")
104
+ summary_parts.append(f"Type: {meta.get('type', 'unknown')}")
105
+ summary_parts.append(f"Columns: {', '.join(meta.get('columns', []))}")
106
+ summary_parts.append(f"Shape: {meta.get('shape', 'unknown')}")
107
 
108
+ if health_meta:
109
+ summary_parts.append("Healthcare Context:")
110
+ for key, value in health_meta.items():
111
+ if key != 'privacy_warning': # Don't include warnings in prompt
112
+ summary_parts.append(f" {key}: {value}")
 
 
 
 
113
 
114
+ summary_parts.append("")
 
 
 
115
 
116
+ return "\n".join(summary_parts)
117
 
118
+ def clear(self):
119
+ self.data.clear()
120
+ self.metadata.clear()
121
+ self.healthcare_metadata.clear()