from __future__ import annotations from typing import Dict, Any, Tuple, Optional, List, Union import pandas as pd import numpy as np from data_registry import DataRegistry from schema_mapper import MappingResult import re def _get(reg: DataRegistry, mapping: MappingResult, concept: str) -> Tuple[Optional[pd.DataFrame], Optional[str]]: if concept not in mapping.resolved: return None, None tname, col = mapping.resolved[concept] return reg.get(tname), col def _clean_numeric_series(series: pd.Series) -> pd.Series: """Clean numeric data, handling various missing value representations.""" cleaned = series.replace(['', '—', '-', 'null', 'NULL', 'N/A', 'n/a', ' ', 'nan'], np.nan) return pd.to_numeric(cleaned, errors='coerce') def _fmt_tbl(df: pd.DataFrame, max_rows: int = 20) -> str: if df is None or df.empty: return "__" df2 = df.copy() if len(df2) > max_rows: df2 = df2.head(max_rows) return df2.to_markdown(index=False) def _detect_numeric_columns(df: pd.DataFrame) -> List[str]: """Detect columns that contain numeric data (even if stored as strings).""" numeric_cols = [] for col in df.columns: # Try to convert a sample to numeric sample = df[col].dropna().head(100) if len(sample) > 0: numeric_sample = pd.to_numeric(sample, errors='coerce') # If more than 50% can be converted to numeric, consider it numeric if numeric_sample.notna().sum() > len(sample) * 0.5: numeric_cols.append(col) return numeric_cols def _detect_categorical_columns(df: pd.DataFrame, max_unique_ratio: float = 0.3) -> List[str]: """Detect categorical columns with reasonable number of unique values.""" categorical_cols = [] for col in df.columns: if df[col].dtype == 'object': # String-like columns unique_ratio = df[col].nunique() / len(df) # If unique ratio is low, likely categorical if 0 < unique_ratio <= max_unique_ratio: categorical_cols.append(col) return categorical_cols def _find_best_grouping_column(df: pd.DataFrame, preferred_patterns: List[str] = None) -> Optional[str]: """Find the best column to group by based on healthcare patterns and characteristics.""" if preferred_patterns is None: preferred_patterns = [ r'facility|hospital|clinic|center|centre|institution|provider|site|location', r'specialty|service|department|unit|division|program|type|category', r'zone|region|area|district|network|system|catchment', r'practitioner|physician|doctor|nurse|staff', r'procedure|treatment|intervention|therapy|service_type', r'name|id|identifier' ] categorical_cols = _detect_categorical_columns(df) # Score columns based on pattern matching and characteristics scored_cols = [] for col in categorical_cols: score = 0 col_lower = col.lower() # Pattern matching score for i, pattern in enumerate(preferred_patterns): if re.search(pattern, col_lower): score += (len(preferred_patterns) - i) * 10 # Higher score for earlier patterns break # Characteristics score unique_count = df[col].nunique() total_count = len(df) # Prefer columns with reasonable number of groups (not too few, not too many) if 2 <= unique_count <= min(50, total_count // 5): score += 5 # Prefer columns with less missing data missing_ratio = df[col].isna().sum() / len(df) score += (1 - missing_ratio) * 3 scored_cols.append((col, score)) if scored_cols: scored_cols.sort(key=lambda x: x[1], reverse=True) return scored_cols[0][0] return None def _find_best_metric_column(df: pd.DataFrame, grouping_col: str = None) -> Optional[str]: """Find the best numeric column to analyze as a healthcare metric.""" numeric_cols = _detect_numeric_columns(df) if not numeric_cols: return None # Healthcare-relevant metric patterns healthcare_metric_patterns = [ r'wait|delay|time|duration|length', r'cost|price|expense|fee|charge|budget', r'volume|count|number|quantity|throughput|capacity', r'rate|ratio|percent|percentage|score|index', r'outcome|result|mortality|morbidity|readmission', r'satisfaction|quality|performance|efficiency', r'utilization|occupancy|availability', r'median|mean|average|percentile|p\d+|90th|95th' ] # Score numeric columns scored_cols = [] for col in numeric_cols: score = 0 col_lower = col.lower() # Prefer columns with healthcare-relevant names for pattern in healthcare_metric_patterns: if re.search(pattern, col_lower): score += 10 break # Prefer columns with reasonable variance try: clean_series = _clean_numeric_series(df[col]) if not clean_series.isna().all(): std_dev = clean_series.std() mean_val = clean_series.mean() if mean_val != 0 and std_dev / abs(mean_val) > 0.1: # Coefficient of variation > 0.1 score += 5 except: pass # Prefer columns with less missing data missing_ratio = df[col].isna().sum() / len(df) score += (1 - missing_ratio) * 3 scored_cols.append((col, score)) if scored_cols: scored_cols.sort(key=lambda x: x[1], reverse=True) return scored_cols[0][0] return None def compute_generic_rankings(reg: DataRegistry, mapping: MappingResult, entity_concept: str, metric_concept: str, ranking_name: str) -> Optional[pd.DataFrame]: """Generic function to compute rankings for any healthcare entity by any metric.""" df, entity_col = _get(reg, mapping, entity_concept) if df is None or entity_col is None: return None # Find metric column metric_col = None df_metric, mapped_metric_col = _get(reg, mapping, metric_concept) if df_metric is not None and mapped_metric_col is not None and df_metric is df: metric_col = mapped_metric_col else: # Fallback: find best numeric column metric_col = _find_best_metric_column(df, entity_col) if metric_col is None: return None # Clean the data df_clean = df[df[entity_col].notna() & (df[entity_col] != '') & (df[entity_col].astype(str).str.strip() != '')].copy() df_clean[metric_col] = _clean_numeric_series(df_clean[metric_col]) df_clean = df_clean[df_clean[metric_col].notna()] if df_clean.empty: return None # Group and calculate statistics grouped = df_clean.groupby(entity_col, dropna=True)[metric_col].agg(['mean', 'count', 'std']).reset_index() grouped = grouped.rename(columns={ 'mean': f'avg_{metric_concept}', 'count': 'record_count', 'std': f'std_{metric_concept}' }) # Sort by average metric (adjust based on whether higher or lower is better) # For healthcare metrics like wait times, errors, costs - higher is typically worse grouped = grouped.sort_values(f'avg_{metric_concept}', ascending=False) grouped['rank'] = np.arange(1, len(grouped) + 1) # Round numeric columns numeric_cols = grouped.select_dtypes(include=[np.number]).columns grouped[numeric_cols] = grouped[numeric_cols].round(1) return grouped def compute_comparative_analysis(reg: DataRegistry, mapping: MappingResult, grouping_concept: str, metric_concept: str) -> Optional[pd.DataFrame]: """Generic function to compare healthcare metrics across different groups.""" df, group_col = _get(reg, mapping, grouping_concept) if df is None or group_col is None: return None # Find metric column metric_col = None df_metric, mapped_metric_col = _get(reg, mapping, metric_concept) if df_metric is not None and mapped_metric_col is not None and df_metric is df: metric_col = mapped_metric_col else: metric_col = _find_best_metric_column(df, group_col) if metric_col is None: return None # Clean data df_clean = df[df[group_col].notna() & (df[group_col] != '')].copy() df_clean[metric_col] = _clean_numeric_series(df_clean[metric_col]) df_clean = df_clean[df_clean[metric_col].notna()] if df_clean.empty: return None # Group and analyze grouped = df_clean.groupby(group_col, dropna=True)[metric_col].agg(['mean', 'count', 'std']).reset_index() grouped = grouped.rename(columns={ 'mean': f'avg_{metric_concept}', 'count': 'record_count', 'std': f'std_{metric_concept}' }) # Calculate overall average for comparison overall_avg = df_clean[metric_col].mean() grouped['vs_overall_avg'] = (grouped[f'avg_{metric_concept}'] - overall_avg).round(1) # Sort by average metric grouped = grouped.sort_values(f'avg_{metric_concept}', ascending=False) # Round numeric columns numeric_cols = grouped.select_dtypes(include=[np.number]).columns grouped[numeric_cols] = grouped[numeric_cols].round(1) return grouped def compute_capacity_metrics(reg: DataRegistry, mapping: MappingResult) -> Optional[pd.DataFrame]: """Compute healthcare capacity-related metrics if available.""" capacity_concepts = [ 'capacity', 'beds', 'staffed_beds', 'occupied_beds', 'available_beds', 'volume', 'throughput', 'utilization', 'occupancy', 'appointments', 'procedures', 'admissions', 'discharges', 'staffing', 'fte', 'personnel' ] results = [] for concept in capacity_concepts: df, col = _get(reg, mapping, concept) if df is not None and col is not None: clean_series = _clean_numeric_series(df[col]) if not clean_series.isna().all(): results.append({ 'metric': f'{concept}_total', 'value': float(np.nansum(clean_series)) }) results.append({ 'metric': f'{concept}_average', 'value': float(np.nanmean(clean_series)) }) results.append({ 'metric': f'{concept}_records', 'value': int((~clean_series.isna()).sum()) }) if results: return pd.DataFrame(results) return None def compute_cost_metrics(reg: DataRegistry, mapping: MappingResult) -> Optional[pd.DataFrame]: """Compute healthcare cost-related metrics if available.""" cost_concepts = [ 'cost', 'price', 'expense', 'fee', 'charge', 'budget', 'funding', 'fixed_cost', 'variable_cost', 'operational_cost', 'capital_cost', 'reimbursement', 'revenue', 'billing', 'payment' ] results = [] for concept in cost_concepts: df, col = _get(reg, mapping, concept) if df is not None and col is not None: clean_series = _clean_numeric_series(df[col]) if not clean_series.isna().all(): results.append({ 'component': f'{concept}_total', 'value': float(np.nansum(clean_series)) }) results.append({ 'component': f'{concept}_average', 'value': float(np.nanmean(clean_series)) }) if results: return pd.DataFrame(results) return None def auto_discover_healthcare_analysis_opportunities(reg: DataRegistry) -> Dict[str, List[str]]: """Automatically discover what healthcare analyses are possible with the available data.""" opportunities = { 'provider_rankings': [], 'service_comparisons': [], 'regional_analysis': [], 'outcome_metrics': [], 'efficiency_metrics': [] } for table_name, df in reg._tables.items(): if df.empty: continue # Find potential healthcare grouping columns categorical_cols = _detect_categorical_columns(df) numeric_cols = _detect_numeric_columns(df) # Healthcare-specific categorization provider_cols = [col for col in categorical_cols if re.search(r'facility|hospital|clinic|provider', col.lower())] service_cols = [col for col in categorical_cols if re.search(r'specialty|service|department|procedure', col.lower())] regional_cols = [col for col in categorical_cols if re.search(r'zone|region|area|district', col.lower())] outcome_cols = [col for col in numeric_cols if re.search(r'outcome|mortality|readmission|infection|complication', col.lower())] efficiency_cols = [col for col in numeric_cols if re.search(r'wait|time|throughput|utilization|length_of_stay', col.lower())] # Suggest healthcare-specific analyses for provider_col in provider_cols[:2]: for metric_col in (efficiency_cols + outcome_cols)[:2]: opportunities['provider_rankings'].append(f"{provider_col} by {metric_col}") for service_col in service_cols[:2]: for metric_col in (efficiency_cols + outcome_cols)[:2]: opportunities['service_comparisons'].append(f"{metric_col} across {service_col}") for regional_col in regional_cols[:2]: for metric_col in (efficiency_cols + outcome_cols)[:2]: opportunities['regional_analysis'].append(f"{metric_col} by {regional_col}") opportunities['outcome_metrics'].extend(outcome_cols[:3]) opportunities['efficiency_metrics'].extend(efficiency_cols[:3]) return opportunities def build_data_findings_markdown(reg: DataRegistry, mapping: MappingResult, topn: int = 5): """Build generic healthcare data analysis report based on available data and mappings.""" missing: List[str] = [] sections = [] # Auto-discover healthcare analysis opportunities opportunities = auto_discover_healthcare_analysis_opportunities(reg) # Healthcare-specific analysis patterns analysis_patterns = [ ('provider rankings', ['facility', 'provider', 'hospital', 'clinic'], ['wait_time', 'wait_median', 'wait_days', 'wait_p90', 'cost', 'outcome']), ('service analysis', ['specialty', 'service', 'department', 'procedure', 'treatment'], ['wait_time', 'wait_median', 'wait_days', 'cost', 'outcome']), ('regional comparison', ['zone', 'region', 'area', 'district', 'network'], ['wait_time', 'wait_median', 'cost', 'outcome']), ('quality metrics', ['facility', 'service'], ['mortality', 'readmission', 'infection', 'complication', 'satisfaction']), ] for analysis_name, entity_concepts, metric_concepts in analysis_patterns: found_analysis = False for entity_concept in entity_concepts: for metric_concept in metric_concepts: result = compute_generic_rankings(reg, mapping, entity_concept, metric_concept, analysis_name) if result is not None and not result.empty: sections.append(f"**Top {entity_concept.title()} by {metric_concept.replace('_', ' ').title()}**\n\n{_fmt_tbl(result.head(topn))}") found_analysis = True break if found_analysis: break if not found_analysis: missing.append(analysis_name) # Healthcare-specific comparative analyses comparison_patterns = [ ('regional_performance', ['zone', 'region', 'area', 'district'], ['wait_time', 'wait_median', 'cost', 'outcome']), ('service_performance', ['specialty', 'service', 'department'], ['wait_time', 'wait_median', 'cost', 'outcome']), ('provider_comparison', ['facility', 'hospital', 'clinic'], ['efficiency', 'utilization', 'throughput']), ] for analysis_name, group_concepts, metric_concepts in comparison_patterns: found_analysis = False for group_concept in group_concepts: for metric_concept in metric_concepts: result = compute_comparative_analysis(reg, mapping, group_concept, metric_concept) if result is not None and not result.empty: sections.append(f"**{group_concept.title()} Performance Comparison**\n\n{_fmt_tbl(result)}") found_analysis = True break if found_analysis: break if not found_analysis: missing.append(analysis_name) # Healthcare capacity analysis capacity = compute_capacity_metrics(reg, mapping) if capacity is not None and not capacity.empty: sections.append(f"**Healthcare Capacity Analysis**\n\n{_fmt_tbl(capacity)}") else: missing.append("capacity_analysis") # Healthcare cost analysis costs = compute_cost_metrics(reg, mapping) if costs is not None and not costs.empty: sections.append(f"**Healthcare Cost Analysis**\n\n{_fmt_tbl(costs)}") else: missing.append("cost_analysis") # Build final healthcare report if sections: md = ( "### Healthcare Data Analysis Results\n\n" + "\n\n".join(sections) + "\n\n**Clinical Data Quality Notes**\n" "- Analysis performed on available healthcare data columns\n" "- Missing values and empty entries excluded from calculations\n" "- Numeric values rounded to 1 decimal place for clinical relevance\n" "- Rankings prioritize areas that may require clinical attention or resource allocation\n" "- Record counts indicate data volume and statistical reliability\n" ) else: md = "### Healthcare Data Analysis Results\n\nNo analyzable healthcare patterns found in the provided data. Consider uploading data with healthcare facility, service, or outcome metrics." return md, missing