Rajan Sharma commited on
Commit
ef7ab85
·
verified ·
1 Parent(s): b1ec06f

Update data_registry.py

Browse files
Files changed (1) hide show
  1. data_registry.py +67 -163
data_registry.py CHANGED
@@ -1,197 +1,101 @@
1
  # data_registry.py
2
  import pandas as pd
3
- import numpy as np
4
- from typing import Dict, Any, List, Optional, Union
5
  import os
6
- import json
 
 
 
 
7
 
8
  class DataRegistry:
9
  def __init__(self):
10
  self.data = {}
11
  self.metadata = {}
12
- self.healthcare_metadata = {}
13
- self.derived_columns = {} # Track derived columns per file
14
 
15
- def add_path(self, path: str) -> bool:
16
- """Add a data file to the registry with dynamic processing."""
17
  try:
18
- file_name = os.path.basename(path)
19
- file_ext = os.path.splitext(file_name)[1].lower()
20
 
21
- # Read file based on extension
22
  if file_ext == '.csv':
23
- df = pd.read_csv(path)
24
  elif file_ext in ['.xlsx', '.xls']:
25
- df = pd.read_excel(path)
26
  elif file_ext == '.json':
27
- with open(path, 'r') as f:
28
- data = json.load(f)
29
- df = pd.json_normalize(data)
30
- elif file_ext in ['.parquet']:
31
- df = pd.read_parquet(path)
32
  else:
33
- print(f"Unsupported file type: {file_ext}")
34
  return False
35
 
36
- # Standardize column names
37
- df.columns = [col.strip().lower().replace(' ', '_').replace('-', '_').replace('.', '_') for col in df.columns]
38
-
39
- # Store original dataframe
40
- self.data[file_name] = df.copy()
41
-
42
- # Initialize derived columns tracking
43
- self.derived_columns[file_name] = set()
44
 
45
- # Process healthcare data dynamically
46
- self._process_healthcare_data(file_name, df)
47
-
48
- # Basic metadata
49
- self.metadata[file_name] = {
50
- 'type': file_ext,
51
- 'columns': list(df.columns),
52
- 'shape': df.shape,
53
- 'sample': df.head(3).to_dict('records')
54
  }
55
 
56
- # Healthcare-specific metadata extraction
57
- self._extract_healthcare_metadata(file_name, df)
58
-
59
  return True
 
60
  except Exception as e:
61
- print(f"Error adding {path}: {e}")
62
  return False
63
 
64
- def _process_healthcare_data(self, file_name: str, df: pd.DataFrame):
65
- """Dynamically process healthcare data based on available columns."""
66
- # Dynamic column pattern matching
67
- column_patterns = {
68
- 'facility_name': ['facility', 'name', 'hospital', 'site', 'location'],
69
- 'facility_type': ['type', 'category', 'class', 'facility_type', 'odhf_facility_type'],
70
- 'beds_current': ['current', '2023', '2024', 'beds_current', 'staffed_beds', 'capacity'],
71
- 'beds_prev': ['prev', 'previous', '2022', 'beds_prev', 'previous_beds'],
72
- 'zone': ['zone', 'region', 'area', 'district'],
73
- 'province': ['province', 'state', 'territory'],
74
- 'city': ['city', 'municipality', 'town'],
75
- 'teaching_status': ['teaching', 'status', 'type', 'hospital_type']
76
- }
77
-
78
- # Map actual columns to standard names
79
- column_map = {}
80
- for standard_col, patterns in column_patterns.items():
81
- for col in df.columns:
82
- if any(pattern in col for pattern in patterns):
83
- column_map[standard_col] = col
84
- break
85
-
86
- # Create derived columns if we have the necessary base columns
87
- if 'beds_current' in column_map and 'beds_prev' in column_map:
88
- current_col = column_map['beds_current']
89
- prev_col = column_map['beds_prev']
90
-
91
- # Calculate bed change
92
- df['bed_change'] = df[current_col] - df[prev_col]
93
- self.derived_columns[file_name].add('bed_change')
94
-
95
- # Calculate percentage change (avoid division by zero)
96
- df['percent_change'] = df.apply(
97
- lambda row: (row['bed_change'] / row[prev_col] * 100) if row[prev_col] != 0 else 0,
98
- axis=1
99
- )
100
- self.derived_columns[file_name].add('percent_change')
101
-
102
- # If we have facility_type but not in standard form, map it
103
- if 'facility_type' in column_map and column_map['facility_type'] != 'facility_type':
104
- df['facility_type'] = df[column_map['facility_type']]
105
- self.derived_columns[file_name].add('facility_type')
106
-
107
- def _extract_healthcare_metadata(self, file_name: str, df: pd.DataFrame):
108
- """Extract healthcare-specific metadata dynamically."""
109
- healthcare_meta = {}
110
-
111
- # Detect data type based on columns
112
- facility_cols = [col for col in df.columns if any(pattern in col for pattern in ['facility', 'name', 'site'])]
113
- bed_cols = [col for col in df.columns if any(pattern in col for pattern in ['bed', 'capacity'])]
114
-
115
- if facility_cols:
116
- healthcare_meta['data_type'] = 'facility_data'
117
- if 'facility_type' in df.columns:
118
- healthcare_meta['facility_types'] = df['facility_type'].value_counts().to_dict()
119
- if 'city' in df.columns:
120
- healthcare_meta['cities'] = df['city'].value_counts().head(10).to_dict()
121
-
122
- if bed_cols:
123
- healthcare_meta['data_type'] = 'bed_data'
124
- if 'zone' in df.columns:
125
- healthcare_meta['zones'] = df['zone'].unique().tolist()
126
- if 'teaching_status' in df.columns:
127
- healthcare_meta['teaching_status_counts'] = df['teaching_status'].value_counts().to_dict()
128
-
129
- # Check for derived metrics
130
- if 'bed_change' in df.columns:
131
- healthcare_meta['has_derived_metrics'] = True
132
-
133
- if healthcare_meta:
134
- self.healthcare_metadata[file_name] = healthcare_meta
135
-
136
- def get_derived_columns(self, file_name: str) -> set:
137
- """Get derived columns for a file."""
138
- return self.derived_columns.get(file_name, set())
139
 
140
- def find_column(self, file_name: str, patterns: List[str]) -> Optional[str]:
141
- """Find a column matching any of the given patterns."""
142
- df = self.get(file_name)
143
- if df is None:
144
- return None
145
-
146
- for col in df.columns:
147
- if any(pattern.lower() in col.lower() for pattern in patterns):
148
- return col
149
- return None
150
 
151
  def get_data_by_type(self, data_type: str) -> List[str]:
152
- """Get all files of a specific data type."""
153
- return [
154
- file_name for file_name, meta in self.healthcare_metadata.items()
155
- if meta.get('data_type') == data_type
156
- ]
157
-
158
- def names(self):
159
- return list(self.data.keys())
160
 
161
- def get(self, name):
162
- return self.data.get(name)
 
163
 
164
- def summarize_for_prompt(self) -> str:
165
- """Generate a summary of all data for prompt inclusion."""
166
- if not self.data:
167
- return "No data files registered."
168
-
169
- summary_parts = []
170
- for file_name in self.names():
171
- meta = self.metadata.get(file_name, {})
172
- health_meta = self.get_healthcare_metadata(file_name)
173
-
174
- summary_parts.append(f"File: {file_name}")
175
- summary_parts.append(f"Type: {meta.get('type', 'unknown')}")
176
- summary_parts.append(f"Columns: {', '.join(meta.get('columns', []))}")
177
- summary_parts.append(f"Shape: {meta.get('shape', 'unknown')}")
178
 
179
- if health_meta:
180
- summary_parts.append("Healthcare Context:")
181
- for key, value in health_meta.items():
182
- if key != 'privacy_warning':
183
- summary_parts.append(f" {key}: {value}")
 
184
 
185
- summary_parts.append("")
186
-
187
- return "\n".join(summary_parts)
188
-
189
- def get_healthcare_metadata(self, name: str) -> Dict[str, Any]:
190
- """Get healthcare-specific metadata for a file."""
191
- return self.healthcare_metadata.get(name, {})
192
 
193
  def clear(self):
 
194
  self.data.clear()
195
- self.metadata.clear()
196
- self.healthcare_metadata.clear()
197
- self.derived_columns.clear()
 
1
  # data_registry.py
2
  import pandas as pd
 
 
3
  import os
4
+ from typing import Dict, List, Any, Optional, Union
5
+ import logging
6
+
7
+ logging.basicConfig(level=logging.INFO)
8
+ logger = logging.getLogger(__name__)
9
 
10
  class DataRegistry:
11
  def __init__(self):
12
  self.data = {}
13
  self.metadata = {}
 
 
14
 
15
+ def add_path(self, file_path: str) -> bool:
16
+ """Add a file to the registry and return success status"""
17
  try:
18
+ file_ext = os.path.splitext(file_path)[1].lower()
 
19
 
 
20
  if file_ext == '.csv':
21
+ df = pd.read_csv(file_path)
22
  elif file_ext in ['.xlsx', '.xls']:
23
+ df = pd.read_excel(file_path)
24
  elif file_ext == '.json':
25
+ df = pd.read_json(file_path)
 
 
 
 
26
  else:
27
+ logger.warning(f"Unsupported file type: {file_ext}")
28
  return False
29
 
30
+ # Store with filename as key
31
+ filename = os.path.basename(file_path)
32
+ self.data[filename] = df
 
 
 
 
 
33
 
34
+ # Store metadata
35
+ self.metadata[filename] = {
36
+ "path": file_path,
37
+ "type": file_ext,
38
+ "shape": df.shape,
39
+ "columns": list(df.columns),
40
+ "data_types": df.dtypes.to_dict(),
41
+ "null_counts": df.isnull().sum().to_dict(),
42
+ "sample_data": df.head(3).to_dict()
43
  }
44
 
45
+ logger.info(f"Successfully loaded {filename} with shape {df.shape}")
 
 
46
  return True
47
+
48
  except Exception as e:
49
+ logger.error(f"Error loading {file_path}: {str(e)}")
50
  return False
51
 
52
+ def get(self, name: str) -> Optional[pd.DataFrame]:
53
+ """Get a dataset by name"""
54
+ return self.data.get(name)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55
 
56
+ def names(self) -> List[str]:
57
+ """Get all dataset names"""
58
+ return list(self.data.keys())
 
 
 
 
 
 
 
59
 
60
  def get_data_by_type(self, data_type: str) -> List[str]:
61
+ """Get datasets matching a type pattern"""
62
+ matching = []
63
+ for name, meta in self.metadata.items():
64
+ if data_type.lower() in name.lower():
65
+ matching.append(name)
66
+ return matching
 
 
67
 
68
+ def get_data_summary(self) -> Dict[str, Any]:
69
+ """Generate a summary of all loaded datasets"""
70
+ return self.metadata
71
 
72
+ def find_related_datasets(self, keywords: List[str]) -> List[Dict[str, Any]]:
73
+ """Find datasets containing specific keywords in columns or data"""
74
+ related = []
75
+ for name in self.names():
76
+ df = self.get(name)
77
+ if df is None:
78
+ continue
79
+
80
+ # Check column names
81
+ col_matches = [col for col in df.columns if any(kw in col.lower() for kw in keywords)]
 
 
 
 
82
 
83
+ # Check data content
84
+ data_matches = False
85
+ for col in df.select_dtypes(include=['object']).columns:
86
+ if any(df[col].str.contains('|'.join(keywords), case=False, na=False).any()):
87
+ data_matches = True
88
+ break
89
 
90
+ if col_matches or data_matches:
91
+ related.append({
92
+ "name": name,
93
+ "matching_columns": col_matches,
94
+ "has_matching_data": data_matches
95
+ })
96
+ return related
97
 
98
  def clear(self):
99
+ """Clear all data"""
100
  self.data.clear()
101
+ self.metadata.clear()