Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from typing import Dict, Any, Tuple, Optional, List, Union | |
| import pandas as pd | |
| import numpy as np | |
| from data_registry import DataRegistry | |
| from schema_mapper import MappingResult | |
| import re | |
| def _get(reg: DataRegistry, mapping: MappingResult, concept: str) -> Tuple[Optional[pd.DataFrame], Optional[str]]: | |
| if concept not in mapping.resolved: | |
| return None, None | |
| tname, col = mapping.resolved[concept] | |
| return reg.get(tname), col | |
| def _clean_numeric_series(series: pd.Series) -> pd.Series: | |
| """Clean numeric data, handling various missing value representations.""" | |
| cleaned = series.replace(['', '—', '-', 'null', 'NULL', 'N/A', 'n/a', ' ', 'nan'], np.nan) | |
| return pd.to_numeric(cleaned, errors='coerce') | |
| def _fmt_tbl(df: pd.DataFrame, max_rows: int = 20) -> str: | |
| if df is None or df.empty: | |
| return "_<empty table>_" | |
| df2 = df.copy() | |
| if len(df2) > max_rows: | |
| df2 = df2.head(max_rows) | |
| return df2.to_markdown(index=False) | |
| def _detect_numeric_columns(df: pd.DataFrame) -> List[str]: | |
| """Detect columns that contain numeric data (even if stored as strings).""" | |
| numeric_cols = [] | |
| for col in df.columns: | |
| # Try to convert a sample to numeric | |
| sample = df[col].dropna().head(100) | |
| if len(sample) > 0: | |
| numeric_sample = pd.to_numeric(sample, errors='coerce') | |
| # If more than 50% can be converted to numeric, consider it numeric | |
| if numeric_sample.notna().sum() > len(sample) * 0.5: | |
| numeric_cols.append(col) | |
| return numeric_cols | |
| def _detect_categorical_columns(df: pd.DataFrame, max_unique_ratio: float = 0.3) -> List[str]: | |
| """Detect categorical columns with reasonable number of unique values.""" | |
| categorical_cols = [] | |
| for col in df.columns: | |
| if df[col].dtype == 'object': # String-like columns | |
| unique_ratio = df[col].nunique() / len(df) | |
| # If unique ratio is low, likely categorical | |
| if 0 < unique_ratio <= max_unique_ratio: | |
| categorical_cols.append(col) | |
| return categorical_cols | |
| def _find_best_grouping_column(df: pd.DataFrame, preferred_patterns: List[str] = None) -> Optional[str]: | |
| """Find the best column to group by based on healthcare patterns and characteristics.""" | |
| if preferred_patterns is None: | |
| preferred_patterns = [ | |
| r'facility|hospital|clinic|center|centre|institution|provider|site|location', | |
| r'specialty|service|department|unit|division|program|type|category', | |
| r'zone|region|area|district|network|system|catchment', | |
| r'practitioner|physician|doctor|nurse|staff', | |
| r'procedure|treatment|intervention|therapy|service_type', | |
| r'name|id|identifier' | |
| ] | |
| categorical_cols = _detect_categorical_columns(df) | |
| # Score columns based on pattern matching and characteristics | |
| scored_cols = [] | |
| for col in categorical_cols: | |
| score = 0 | |
| col_lower = col.lower() | |
| # Pattern matching score | |
| for i, pattern in enumerate(preferred_patterns): | |
| if re.search(pattern, col_lower): | |
| score += (len(preferred_patterns) - i) * 10 # Higher score for earlier patterns | |
| break | |
| # Characteristics score | |
| unique_count = df[col].nunique() | |
| total_count = len(df) | |
| # Prefer columns with reasonable number of groups (not too few, not too many) | |
| if 2 <= unique_count <= min(50, total_count // 5): | |
| score += 5 | |
| # Prefer columns with less missing data | |
| missing_ratio = df[col].isna().sum() / len(df) | |
| score += (1 - missing_ratio) * 3 | |
| scored_cols.append((col, score)) | |
| if scored_cols: | |
| scored_cols.sort(key=lambda x: x[1], reverse=True) | |
| return scored_cols[0][0] | |
| return None | |
| def _find_best_metric_column(df: pd.DataFrame, grouping_col: str = None) -> Optional[str]: | |
| """Find the best numeric column to analyze as a healthcare metric.""" | |
| numeric_cols = _detect_numeric_columns(df) | |
| if not numeric_cols: | |
| return None | |
| # Healthcare-relevant metric patterns | |
| healthcare_metric_patterns = [ | |
| r'wait|delay|time|duration|length', | |
| r'cost|price|expense|fee|charge|budget', | |
| r'volume|count|number|quantity|throughput|capacity', | |
| r'rate|ratio|percent|percentage|score|index', | |
| r'outcome|result|mortality|morbidity|readmission', | |
| r'satisfaction|quality|performance|efficiency', | |
| r'utilization|occupancy|availability', | |
| r'median|mean|average|percentile|p\d+|90th|95th' | |
| ] | |
| # Score numeric columns | |
| scored_cols = [] | |
| for col in numeric_cols: | |
| score = 0 | |
| col_lower = col.lower() | |
| # Prefer columns with healthcare-relevant names | |
| for pattern in healthcare_metric_patterns: | |
| if re.search(pattern, col_lower): | |
| score += 10 | |
| break | |
| # Prefer columns with reasonable variance | |
| try: | |
| clean_series = _clean_numeric_series(df[col]) | |
| if not clean_series.isna().all(): | |
| std_dev = clean_series.std() | |
| mean_val = clean_series.mean() | |
| if mean_val != 0 and std_dev / abs(mean_val) > 0.1: # Coefficient of variation > 0.1 | |
| score += 5 | |
| except: | |
| pass | |
| # Prefer columns with less missing data | |
| missing_ratio = df[col].isna().sum() / len(df) | |
| score += (1 - missing_ratio) * 3 | |
| scored_cols.append((col, score)) | |
| if scored_cols: | |
| scored_cols.sort(key=lambda x: x[1], reverse=True) | |
| return scored_cols[0][0] | |
| return None | |
| def compute_generic_rankings(reg: DataRegistry, mapping: MappingResult, | |
| entity_concept: str, metric_concept: str, | |
| ranking_name: str) -> Optional[pd.DataFrame]: | |
| """Generic function to compute rankings for any healthcare entity by any metric.""" | |
| df, entity_col = _get(reg, mapping, entity_concept) | |
| if df is None or entity_col is None: | |
| return None | |
| # Find metric column | |
| metric_col = None | |
| df_metric, mapped_metric_col = _get(reg, mapping, metric_concept) | |
| if df_metric is not None and mapped_metric_col is not None and df_metric is df: | |
| metric_col = mapped_metric_col | |
| else: | |
| # Fallback: find best numeric column | |
| metric_col = _find_best_metric_column(df, entity_col) | |
| if metric_col is None: | |
| return None | |
| # Clean the data | |
| df_clean = df[df[entity_col].notna() & (df[entity_col] != '') & (df[entity_col].astype(str).str.strip() != '')].copy() | |
| df_clean[metric_col] = _clean_numeric_series(df_clean[metric_col]) | |
| df_clean = df_clean[df_clean[metric_col].notna()] | |
| if df_clean.empty: | |
| return None | |
| # Group and calculate statistics | |
| grouped = df_clean.groupby(entity_col, dropna=True)[metric_col].agg(['mean', 'count', 'std']).reset_index() | |
| grouped = grouped.rename(columns={ | |
| 'mean': f'avg_{metric_concept}', | |
| 'count': 'record_count', | |
| 'std': f'std_{metric_concept}' | |
| }) | |
| # Sort by average metric (adjust based on whether higher or lower is better) | |
| # For healthcare metrics like wait times, errors, costs - higher is typically worse | |
| grouped = grouped.sort_values(f'avg_{metric_concept}', ascending=False) | |
| grouped['rank'] = np.arange(1, len(grouped) + 1) | |
| # Round numeric columns | |
| numeric_cols = grouped.select_dtypes(include=[np.number]).columns | |
| grouped[numeric_cols] = grouped[numeric_cols].round(1) | |
| return grouped | |
| def compute_comparative_analysis(reg: DataRegistry, mapping: MappingResult, | |
| grouping_concept: str, metric_concept: str) -> Optional[pd.DataFrame]: | |
| """Generic function to compare healthcare metrics across different groups.""" | |
| df, group_col = _get(reg, mapping, grouping_concept) | |
| if df is None or group_col is None: | |
| return None | |
| # Find metric column | |
| metric_col = None | |
| df_metric, mapped_metric_col = _get(reg, mapping, metric_concept) | |
| if df_metric is not None and mapped_metric_col is not None and df_metric is df: | |
| metric_col = mapped_metric_col | |
| else: | |
| metric_col = _find_best_metric_column(df, group_col) | |
| if metric_col is None: | |
| return None | |
| # Clean data | |
| df_clean = df[df[group_col].notna() & (df[group_col] != '')].copy() | |
| df_clean[metric_col] = _clean_numeric_series(df_clean[metric_col]) | |
| df_clean = df_clean[df_clean[metric_col].notna()] | |
| if df_clean.empty: | |
| return None | |
| # Group and analyze | |
| grouped = df_clean.groupby(group_col, dropna=True)[metric_col].agg(['mean', 'count', 'std']).reset_index() | |
| grouped = grouped.rename(columns={ | |
| 'mean': f'avg_{metric_concept}', | |
| 'count': 'record_count', | |
| 'std': f'std_{metric_concept}' | |
| }) | |
| # Calculate overall average for comparison | |
| overall_avg = df_clean[metric_col].mean() | |
| grouped['vs_overall_avg'] = (grouped[f'avg_{metric_concept}'] - overall_avg).round(1) | |
| # Sort by average metric | |
| grouped = grouped.sort_values(f'avg_{metric_concept}', ascending=False) | |
| # Round numeric columns | |
| numeric_cols = grouped.select_dtypes(include=[np.number]).columns | |
| grouped[numeric_cols] = grouped[numeric_cols].round(1) | |
| return grouped | |
| def compute_capacity_metrics(reg: DataRegistry, mapping: MappingResult) -> Optional[pd.DataFrame]: | |
| """Compute healthcare capacity-related metrics if available.""" | |
| capacity_concepts = [ | |
| 'capacity', 'beds', 'staffed_beds', 'occupied_beds', 'available_beds', | |
| 'volume', 'throughput', 'utilization', 'occupancy', | |
| 'appointments', 'procedures', 'admissions', 'discharges', | |
| 'staffing', 'fte', 'personnel' | |
| ] | |
| results = [] | |
| for concept in capacity_concepts: | |
| df, col = _get(reg, mapping, concept) | |
| if df is not None and col is not None: | |
| clean_series = _clean_numeric_series(df[col]) | |
| if not clean_series.isna().all(): | |
| results.append({ | |
| 'metric': f'{concept}_total', | |
| 'value': float(np.nansum(clean_series)) | |
| }) | |
| results.append({ | |
| 'metric': f'{concept}_average', | |
| 'value': float(np.nanmean(clean_series)) | |
| }) | |
| results.append({ | |
| 'metric': f'{concept}_records', | |
| 'value': int((~clean_series.isna()).sum()) | |
| }) | |
| if results: | |
| return pd.DataFrame(results) | |
| return None | |
| def compute_cost_metrics(reg: DataRegistry, mapping: MappingResult) -> Optional[pd.DataFrame]: | |
| """Compute healthcare cost-related metrics if available.""" | |
| cost_concepts = [ | |
| 'cost', 'price', 'expense', 'fee', 'charge', 'budget', 'funding', | |
| 'fixed_cost', 'variable_cost', 'operational_cost', 'capital_cost', | |
| 'reimbursement', 'revenue', 'billing', 'payment' | |
| ] | |
| results = [] | |
| for concept in cost_concepts: | |
| df, col = _get(reg, mapping, concept) | |
| if df is not None and col is not None: | |
| clean_series = _clean_numeric_series(df[col]) | |
| if not clean_series.isna().all(): | |
| results.append({ | |
| 'component': f'{concept}_total', | |
| 'value': float(np.nansum(clean_series)) | |
| }) | |
| results.append({ | |
| 'component': f'{concept}_average', | |
| 'value': float(np.nanmean(clean_series)) | |
| }) | |
| if results: | |
| return pd.DataFrame(results) | |
| return None | |
| def auto_discover_healthcare_analysis_opportunities(reg: DataRegistry) -> Dict[str, List[str]]: | |
| """Automatically discover what healthcare analyses are possible with the available data.""" | |
| opportunities = { | |
| 'provider_rankings': [], | |
| 'service_comparisons': [], | |
| 'regional_analysis': [], | |
| 'outcome_metrics': [], | |
| 'efficiency_metrics': [] | |
| } | |
| for table_name, df in reg._tables.items(): | |
| if df.empty: | |
| continue | |
| # Find potential healthcare grouping columns | |
| categorical_cols = _detect_categorical_columns(df) | |
| numeric_cols = _detect_numeric_columns(df) | |
| # Healthcare-specific categorization | |
| provider_cols = [col for col in categorical_cols if re.search(r'facility|hospital|clinic|provider', col.lower())] | |
| service_cols = [col for col in categorical_cols if re.search(r'specialty|service|department|procedure', col.lower())] | |
| regional_cols = [col for col in categorical_cols if re.search(r'zone|region|area|district', col.lower())] | |
| outcome_cols = [col for col in numeric_cols if re.search(r'outcome|mortality|readmission|infection|complication', col.lower())] | |
| efficiency_cols = [col for col in numeric_cols if re.search(r'wait|time|throughput|utilization|length_of_stay', col.lower())] | |
| # Suggest healthcare-specific analyses | |
| for provider_col in provider_cols[:2]: | |
| for metric_col in (efficiency_cols + outcome_cols)[:2]: | |
| opportunities['provider_rankings'].append(f"{provider_col} by {metric_col}") | |
| for service_col in service_cols[:2]: | |
| for metric_col in (efficiency_cols + outcome_cols)[:2]: | |
| opportunities['service_comparisons'].append(f"{metric_col} across {service_col}") | |
| for regional_col in regional_cols[:2]: | |
| for metric_col in (efficiency_cols + outcome_cols)[:2]: | |
| opportunities['regional_analysis'].append(f"{metric_col} by {regional_col}") | |
| opportunities['outcome_metrics'].extend(outcome_cols[:3]) | |
| opportunities['efficiency_metrics'].extend(efficiency_cols[:3]) | |
| return opportunities | |
| def build_data_findings_markdown(reg: DataRegistry, mapping: MappingResult, topn: int = 5): | |
| """Build generic healthcare data analysis report based on available data and mappings.""" | |
| missing: List[str] = [] | |
| sections = [] | |
| # Auto-discover healthcare analysis opportunities | |
| opportunities = auto_discover_healthcare_analysis_opportunities(reg) | |
| # Healthcare-specific analysis patterns | |
| analysis_patterns = [ | |
| ('provider rankings', ['facility', 'provider', 'hospital', 'clinic'], ['wait_time', 'wait_median', 'wait_days', 'wait_p90', 'cost', 'outcome']), | |
| ('service analysis', ['specialty', 'service', 'department', 'procedure', 'treatment'], ['wait_time', 'wait_median', 'wait_days', 'cost', 'outcome']), | |
| ('regional comparison', ['zone', 'region', 'area', 'district', 'network'], ['wait_time', 'wait_median', 'cost', 'outcome']), | |
| ('quality metrics', ['facility', 'service'], ['mortality', 'readmission', 'infection', 'complication', 'satisfaction']), | |
| ] | |
| for analysis_name, entity_concepts, metric_concepts in analysis_patterns: | |
| found_analysis = False | |
| for entity_concept in entity_concepts: | |
| for metric_concept in metric_concepts: | |
| result = compute_generic_rankings(reg, mapping, entity_concept, metric_concept, analysis_name) | |
| if result is not None and not result.empty: | |
| sections.append(f"**Top {entity_concept.title()} by {metric_concept.replace('_', ' ').title()}**\n\n{_fmt_tbl(result.head(topn))}") | |
| found_analysis = True | |
| break | |
| if found_analysis: | |
| break | |
| if not found_analysis: | |
| missing.append(analysis_name) | |
| # Healthcare-specific comparative analyses | |
| comparison_patterns = [ | |
| ('regional_performance', ['zone', 'region', 'area', 'district'], ['wait_time', 'wait_median', 'cost', 'outcome']), | |
| ('service_performance', ['specialty', 'service', 'department'], ['wait_time', 'wait_median', 'cost', 'outcome']), | |
| ('provider_comparison', ['facility', 'hospital', 'clinic'], ['efficiency', 'utilization', 'throughput']), | |
| ] | |
| for analysis_name, group_concepts, metric_concepts in comparison_patterns: | |
| found_analysis = False | |
| for group_concept in group_concepts: | |
| for metric_concept in metric_concepts: | |
| result = compute_comparative_analysis(reg, mapping, group_concept, metric_concept) | |
| if result is not None and not result.empty: | |
| sections.append(f"**{group_concept.title()} Performance Comparison**\n\n{_fmt_tbl(result)}") | |
| found_analysis = True | |
| break | |
| if found_analysis: | |
| break | |
| if not found_analysis: | |
| missing.append(analysis_name) | |
| # Healthcare capacity analysis | |
| capacity = compute_capacity_metrics(reg, mapping) | |
| if capacity is not None and not capacity.empty: | |
| sections.append(f"**Healthcare Capacity Analysis**\n\n{_fmt_tbl(capacity)}") | |
| else: | |
| missing.append("capacity_analysis") | |
| # Healthcare cost analysis | |
| costs = compute_cost_metrics(reg, mapping) | |
| if costs is not None and not costs.empty: | |
| sections.append(f"**Healthcare Cost Analysis**\n\n{_fmt_tbl(costs)}") | |
| else: | |
| missing.append("cost_analysis") | |
| # Build final healthcare report | |
| if sections: | |
| md = ( | |
| "### Healthcare Data Analysis Results\n\n" + | |
| "\n\n".join(sections) + | |
| "\n\n**Clinical Data Quality Notes**\n" | |
| "- Analysis performed on available healthcare data columns\n" | |
| "- Missing values and empty entries excluded from calculations\n" | |
| "- Numeric values rounded to 1 decimal place for clinical relevance\n" | |
| "- Rankings prioritize areas that may require clinical attention or resource allocation\n" | |
| "- Record counts indicate data volume and statistical reliability\n" | |
| ) | |
| else: | |
| md = "### Healthcare Data Analysis Results\n\nNo analyzable healthcare patterns found in the provided data. Consider uploading data with healthcare facility, service, or outcome metrics." | |
| return md, missing |