File size: 8,207 Bytes
8af0435
aff5a07
8af0435
4073913
8af0435
4073913
aff5a07
 
 
8af0435
 
 
4073913
8af0435
 
4073913
8af0435
 
4073913
8af0435
4073913
 
8af0435
4073913
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8af0435
 
 
5613174
4073913
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8af0435
4073913
8af0435
5613174
4073913
 
 
 
 
 
8af0435
 
 
 
5613174
4073913
 
8af0435
 
 
 
 
4073913
 
8af0435
5613174
8af0435
 
5613174
4073913
 
 
5613174
4073913
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5613174
8af0435
 
5613174
8af0435
 
5613174
8af0435
 
 
 
5613174
8af0435
 
 
 
5613174
8af0435
 
 
 
5613174
8af0435
 
 
4073913
8af0435
5613174
8af0435
5613174
8af0435
5613174
4073913
 
 
 
8af0435
 
 
4073913
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# data_registry.py
import pandas as pd
import numpy as np
from typing import Dict, Any, List, Optional, Union
import os
import json

class DataRegistry:
    def __init__(self):
        self.data = {}
        self.metadata = {}
        self.healthcare_metadata = {}
        self.derived_columns = {}  # Track derived columns per file
    
    def add_path(self, path: str) -> bool:
        """Add a data file to the registry with dynamic processing."""
        try:
            file_name = os.path.basename(path)
            file_ext = os.path.splitext(file_name)[1].lower()
            
            # Read file based on extension
            if file_ext == '.csv':
                df = pd.read_csv(path)
            elif file_ext in ['.xlsx', '.xls']:
                df = pd.read_excel(path)
            elif file_ext == '.json':
                with open(path, 'r') as f:
                    data = json.load(f)
                df = pd.json_normalize(data)
            elif file_ext in ['.parquet']:
                df = pd.read_parquet(path)
            else:
                print(f"Unsupported file type: {file_ext}")
                return False
            
            # Standardize column names
            df.columns = [col.strip().lower().replace(' ', '_').replace('-', '_').replace('.', '_') for col in df.columns]
            
            # Store original dataframe
            self.data[file_name] = df.copy()
            
            # Initialize derived columns tracking
            self.derived_columns[file_name] = set()
            
            # Process healthcare data dynamically
            self._process_healthcare_data(file_name, df)
            
            # Basic metadata
            self.metadata[file_name] = {
                'type': file_ext,
                'columns': list(df.columns),
                'shape': df.shape,
                'sample': df.head(3).to_dict('records')
            }
            
            # Healthcare-specific metadata extraction
            self._extract_healthcare_metadata(file_name, df)
            
            return True
        except Exception as e:
            print(f"Error adding {path}: {e}")
            return False
    
    def _process_healthcare_data(self, file_name: str, df: pd.DataFrame):
        """Dynamically process healthcare data based on available columns."""
        # Dynamic column pattern matching
        column_patterns = {
            'facility_name': ['facility', 'name', 'hospital', 'site', 'location'],
            'facility_type': ['type', 'category', 'class', 'facility_type', 'odhf_facility_type'],
            'beds_current': ['current', '2023', '2024', 'beds_current', 'staffed_beds', 'capacity'],
            'beds_prev': ['prev', 'previous', '2022', 'beds_prev', 'previous_beds'],
            'zone': ['zone', 'region', 'area', 'district'],
            'province': ['province', 'state', 'territory'],
            'city': ['city', 'municipality', 'town'],
            'teaching_status': ['teaching', 'status', 'type', 'hospital_type']
        }
        
        # Map actual columns to standard names
        column_map = {}
        for standard_col, patterns in column_patterns.items():
            for col in df.columns:
                if any(pattern in col for pattern in patterns):
                    column_map[standard_col] = col
                    break
        
        # Create derived columns if we have the necessary base columns
        if 'beds_current' in column_map and 'beds_prev' in column_map:
            current_col = column_map['beds_current']
            prev_col = column_map['beds_prev']
            
            # Calculate bed change
            df['bed_change'] = df[current_col] - df[prev_col]
            self.derived_columns[file_name].add('bed_change')
            
            # Calculate percentage change (avoid division by zero)
            df['percent_change'] = df.apply(
                lambda row: (row['bed_change'] / row[prev_col] * 100) if row[prev_col] != 0 else 0, 
                axis=1
            )
            self.derived_columns[file_name].add('percent_change')
        
        # If we have facility_type but not in standard form, map it
        if 'facility_type' in column_map and column_map['facility_type'] != 'facility_type':
            df['facility_type'] = df[column_map['facility_type']]
            self.derived_columns[file_name].add('facility_type')
    
    def _extract_healthcare_metadata(self, file_name: str, df: pd.DataFrame):
        """Extract healthcare-specific metadata dynamically."""
        healthcare_meta = {}
        
        # Detect data type based on columns
        facility_cols = [col for col in df.columns if any(pattern in col for pattern in ['facility', 'name', 'site'])]
        bed_cols = [col for col in df.columns if any(pattern in col for pattern in ['bed', 'capacity'])]
        
        if facility_cols:
            healthcare_meta['data_type'] = 'facility_data'
            if 'facility_type' in df.columns:
                healthcare_meta['facility_types'] = df['facility_type'].value_counts().to_dict()
            if 'city' in df.columns:
                healthcare_meta['cities'] = df['city'].value_counts().head(10).to_dict()
        
        if bed_cols:
            healthcare_meta['data_type'] = 'bed_data'
            if 'zone' in df.columns:
                healthcare_meta['zones'] = df['zone'].unique().tolist()
            if 'teaching_status' in df.columns:
                healthcare_meta['teaching_status_counts'] = df['teaching_status'].value_counts().to_dict()
            
            # Check for derived metrics
            if 'bed_change' in df.columns:
                healthcare_meta['has_derived_metrics'] = True
        
        if healthcare_meta:
            self.healthcare_metadata[file_name] = healthcare_meta
    
    def get_derived_columns(self, file_name: str) -> set:
        """Get derived columns for a file."""
        return self.derived_columns.get(file_name, set())
    
    def find_column(self, file_name: str, patterns: List[str]) -> Optional[str]:
        """Find a column matching any of the given patterns."""
        df = self.get(file_name)
        if df is None:
            return None
        
        for col in df.columns:
            if any(pattern.lower() in col.lower() for pattern in patterns):
                return col
        return None
    
    def get_data_by_type(self, data_type: str) -> List[str]:
        """Get all files of a specific data type."""
        return [
            file_name for file_name, meta in self.healthcare_metadata.items()
            if meta.get('data_type') == data_type
        ]
    
    def names(self):
        return list(self.data.keys())
    
    def get(self, name):
        return self.data.get(name)
    
    def summarize_for_prompt(self) -> str:
        """Generate a summary of all data for prompt inclusion."""
        if not self.data:
            return "No data files registered."
        
        summary_parts = []
        for file_name in self.names():
            meta = self.metadata.get(file_name, {})
            health_meta = self.get_healthcare_metadata(file_name)
            
            summary_parts.append(f"File: {file_name}")
            summary_parts.append(f"Type: {meta.get('type', 'unknown')}")
            summary_parts.append(f"Columns: {', '.join(meta.get('columns', []))}")
            summary_parts.append(f"Shape: {meta.get('shape', 'unknown')}")
            
            if health_meta:
                summary_parts.append("Healthcare Context:")
                for key, value in health_meta.items():
                    if key != 'privacy_warning':
                        summary_parts.append(f"  {key}: {value}")
            
            summary_parts.append("")
        
        return "\n".join(summary_parts)
    
    def get_healthcare_metadata(self, name: str) -> Dict[str, Any]:
        """Get healthcare-specific metadata for a file."""
        return self.healthcare_metadata.get(name, {})
    
    def clear(self):
        self.data.clear()
        self.metadata.clear()
        self.healthcare_metadata.clear()
        self.derived_columns.clear()