Medica_DecisionSupportAI / auto_metrics.py
Rajan Sharma
Update auto_metrics.py
1c8ef92 verified
raw
history blame
18.4 kB
from __future__ import annotations
from typing import Dict, Any, Tuple, Optional, List, Union
import pandas as pd
import numpy as np
from data_registry import DataRegistry
from schema_mapper import MappingResult
import re
def _get(reg: DataRegistry, mapping: MappingResult, concept: str) -> Tuple[Optional[pd.DataFrame], Optional[str]]:
if concept not in mapping.resolved:
return None, None
tname, col = mapping.resolved[concept]
return reg.get(tname), col
def _clean_numeric_series(series: pd.Series) -> pd.Series:
"""Clean numeric data, handling various missing value representations."""
cleaned = series.replace(['', '—', '-', 'null', 'NULL', 'N/A', 'n/a', ' ', 'nan'], np.nan)
return pd.to_numeric(cleaned, errors='coerce')
def _fmt_tbl(df: pd.DataFrame, max_rows: int = 20) -> str:
if df is None or df.empty:
return "_<empty table>_"
df2 = df.copy()
if len(df2) > max_rows:
df2 = df2.head(max_rows)
return df2.to_markdown(index=False)
def _detect_numeric_columns(df: pd.DataFrame) -> List[str]:
"""Detect columns that contain numeric data (even if stored as strings)."""
numeric_cols = []
for col in df.columns:
# Try to convert a sample to numeric
sample = df[col].dropna().head(100)
if len(sample) > 0:
numeric_sample = pd.to_numeric(sample, errors='coerce')
# If more than 50% can be converted to numeric, consider it numeric
if numeric_sample.notna().sum() > len(sample) * 0.5:
numeric_cols.append(col)
return numeric_cols
def _detect_categorical_columns(df: pd.DataFrame, max_unique_ratio: float = 0.3) -> List[str]:
"""Detect categorical columns with reasonable number of unique values."""
categorical_cols = []
for col in df.columns:
if df[col].dtype == 'object': # String-like columns
unique_ratio = df[col].nunique() / len(df)
# If unique ratio is low, likely categorical
if 0 < unique_ratio <= max_unique_ratio:
categorical_cols.append(col)
return categorical_cols
def _find_best_grouping_column(df: pd.DataFrame, preferred_patterns: List[str] = None) -> Optional[str]:
"""Find the best column to group by based on healthcare patterns and characteristics."""
if preferred_patterns is None:
preferred_patterns = [
r'facility|hospital|clinic|center|centre|institution|provider|site|location',
r'specialty|service|department|unit|division|program|type|category',
r'zone|region|area|district|network|system|catchment',
r'practitioner|physician|doctor|nurse|staff',
r'procedure|treatment|intervention|therapy|service_type',
r'name|id|identifier'
]
categorical_cols = _detect_categorical_columns(df)
# Score columns based on pattern matching and characteristics
scored_cols = []
for col in categorical_cols:
score = 0
col_lower = col.lower()
# Pattern matching score
for i, pattern in enumerate(preferred_patterns):
if re.search(pattern, col_lower):
score += (len(preferred_patterns) - i) * 10 # Higher score for earlier patterns
break
# Characteristics score
unique_count = df[col].nunique()
total_count = len(df)
# Prefer columns with reasonable number of groups (not too few, not too many)
if 2 <= unique_count <= min(50, total_count // 5):
score += 5
# Prefer columns with less missing data
missing_ratio = df[col].isna().sum() / len(df)
score += (1 - missing_ratio) * 3
scored_cols.append((col, score))
if scored_cols:
scored_cols.sort(key=lambda x: x[1], reverse=True)
return scored_cols[0][0]
return None
def _find_best_metric_column(df: pd.DataFrame, grouping_col: str = None) -> Optional[str]:
"""Find the best numeric column to analyze as a healthcare metric."""
numeric_cols = _detect_numeric_columns(df)
if not numeric_cols:
return None
# Healthcare-relevant metric patterns
healthcare_metric_patterns = [
r'wait|delay|time|duration|length',
r'cost|price|expense|fee|charge|budget',
r'volume|count|number|quantity|throughput|capacity',
r'rate|ratio|percent|percentage|score|index',
r'outcome|result|mortality|morbidity|readmission',
r'satisfaction|quality|performance|efficiency',
r'utilization|occupancy|availability',
r'median|mean|average|percentile|p\d+|90th|95th'
]
# Score numeric columns
scored_cols = []
for col in numeric_cols:
score = 0
col_lower = col.lower()
# Prefer columns with healthcare-relevant names
for pattern in healthcare_metric_patterns:
if re.search(pattern, col_lower):
score += 10
break
# Prefer columns with reasonable variance
try:
clean_series = _clean_numeric_series(df[col])
if not clean_series.isna().all():
std_dev = clean_series.std()
mean_val = clean_series.mean()
if mean_val != 0 and std_dev / abs(mean_val) > 0.1: # Coefficient of variation > 0.1
score += 5
except:
pass
# Prefer columns with less missing data
missing_ratio = df[col].isna().sum() / len(df)
score += (1 - missing_ratio) * 3
scored_cols.append((col, score))
if scored_cols:
scored_cols.sort(key=lambda x: x[1], reverse=True)
return scored_cols[0][0]
return None
def compute_generic_rankings(reg: DataRegistry, mapping: MappingResult,
entity_concept: str, metric_concept: str,
ranking_name: str) -> Optional[pd.DataFrame]:
"""Generic function to compute rankings for any healthcare entity by any metric."""
df, entity_col = _get(reg, mapping, entity_concept)
if df is None or entity_col is None:
return None
# Find metric column
metric_col = None
df_metric, mapped_metric_col = _get(reg, mapping, metric_concept)
if df_metric is not None and mapped_metric_col is not None and df_metric is df:
metric_col = mapped_metric_col
else:
# Fallback: find best numeric column
metric_col = _find_best_metric_column(df, entity_col)
if metric_col is None:
return None
# Clean the data
df_clean = df[df[entity_col].notna() & (df[entity_col] != '') & (df[entity_col].astype(str).str.strip() != '')].copy()
df_clean[metric_col] = _clean_numeric_series(df_clean[metric_col])
df_clean = df_clean[df_clean[metric_col].notna()]
if df_clean.empty:
return None
# Group and calculate statistics
grouped = df_clean.groupby(entity_col, dropna=True)[metric_col].agg(['mean', 'count', 'std']).reset_index()
grouped = grouped.rename(columns={
'mean': f'avg_{metric_concept}',
'count': 'record_count',
'std': f'std_{metric_concept}'
})
# Sort by average metric (adjust based on whether higher or lower is better)
# For healthcare metrics like wait times, errors, costs - higher is typically worse
grouped = grouped.sort_values(f'avg_{metric_concept}', ascending=False)
grouped['rank'] = np.arange(1, len(grouped) + 1)
# Round numeric columns
numeric_cols = grouped.select_dtypes(include=[np.number]).columns
grouped[numeric_cols] = grouped[numeric_cols].round(1)
return grouped
def compute_comparative_analysis(reg: DataRegistry, mapping: MappingResult,
grouping_concept: str, metric_concept: str) -> Optional[pd.DataFrame]:
"""Generic function to compare healthcare metrics across different groups."""
df, group_col = _get(reg, mapping, grouping_concept)
if df is None or group_col is None:
return None
# Find metric column
metric_col = None
df_metric, mapped_metric_col = _get(reg, mapping, metric_concept)
if df_metric is not None and mapped_metric_col is not None and df_metric is df:
metric_col = mapped_metric_col
else:
metric_col = _find_best_metric_column(df, group_col)
if metric_col is None:
return None
# Clean data
df_clean = df[df[group_col].notna() & (df[group_col] != '')].copy()
df_clean[metric_col] = _clean_numeric_series(df_clean[metric_col])
df_clean = df_clean[df_clean[metric_col].notna()]
if df_clean.empty:
return None
# Group and analyze
grouped = df_clean.groupby(group_col, dropna=True)[metric_col].agg(['mean', 'count', 'std']).reset_index()
grouped = grouped.rename(columns={
'mean': f'avg_{metric_concept}',
'count': 'record_count',
'std': f'std_{metric_concept}'
})
# Calculate overall average for comparison
overall_avg = df_clean[metric_col].mean()
grouped['vs_overall_avg'] = (grouped[f'avg_{metric_concept}'] - overall_avg).round(1)
# Sort by average metric
grouped = grouped.sort_values(f'avg_{metric_concept}', ascending=False)
# Round numeric columns
numeric_cols = grouped.select_dtypes(include=[np.number]).columns
grouped[numeric_cols] = grouped[numeric_cols].round(1)
return grouped
def compute_capacity_metrics(reg: DataRegistry, mapping: MappingResult) -> Optional[pd.DataFrame]:
"""Compute healthcare capacity-related metrics if available."""
capacity_concepts = [
'capacity', 'beds', 'staffed_beds', 'occupied_beds', 'available_beds',
'volume', 'throughput', 'utilization', 'occupancy',
'appointments', 'procedures', 'admissions', 'discharges',
'staffing', 'fte', 'personnel'
]
results = []
for concept in capacity_concepts:
df, col = _get(reg, mapping, concept)
if df is not None and col is not None:
clean_series = _clean_numeric_series(df[col])
if not clean_series.isna().all():
results.append({
'metric': f'{concept}_total',
'value': float(np.nansum(clean_series))
})
results.append({
'metric': f'{concept}_average',
'value': float(np.nanmean(clean_series))
})
results.append({
'metric': f'{concept}_records',
'value': int((~clean_series.isna()).sum())
})
if results:
return pd.DataFrame(results)
return None
def compute_cost_metrics(reg: DataRegistry, mapping: MappingResult) -> Optional[pd.DataFrame]:
"""Compute healthcare cost-related metrics if available."""
cost_concepts = [
'cost', 'price', 'expense', 'fee', 'charge', 'budget', 'funding',
'fixed_cost', 'variable_cost', 'operational_cost', 'capital_cost',
'reimbursement', 'revenue', 'billing', 'payment'
]
results = []
for concept in cost_concepts:
df, col = _get(reg, mapping, concept)
if df is not None and col is not None:
clean_series = _clean_numeric_series(df[col])
if not clean_series.isna().all():
results.append({
'component': f'{concept}_total',
'value': float(np.nansum(clean_series))
})
results.append({
'component': f'{concept}_average',
'value': float(np.nanmean(clean_series))
})
if results:
return pd.DataFrame(results)
return None
def auto_discover_healthcare_analysis_opportunities(reg: DataRegistry) -> Dict[str, List[str]]:
"""Automatically discover what healthcare analyses are possible with the available data."""
opportunities = {
'provider_rankings': [],
'service_comparisons': [],
'regional_analysis': [],
'outcome_metrics': [],
'efficiency_metrics': []
}
for table_name, df in reg._tables.items():
if df.empty:
continue
# Find potential healthcare grouping columns
categorical_cols = _detect_categorical_columns(df)
numeric_cols = _detect_numeric_columns(df)
# Healthcare-specific categorization
provider_cols = [col for col in categorical_cols if re.search(r'facility|hospital|clinic|provider', col.lower())]
service_cols = [col for col in categorical_cols if re.search(r'specialty|service|department|procedure', col.lower())]
regional_cols = [col for col in categorical_cols if re.search(r'zone|region|area|district', col.lower())]
outcome_cols = [col for col in numeric_cols if re.search(r'outcome|mortality|readmission|infection|complication', col.lower())]
efficiency_cols = [col for col in numeric_cols if re.search(r'wait|time|throughput|utilization|length_of_stay', col.lower())]
# Suggest healthcare-specific analyses
for provider_col in provider_cols[:2]:
for metric_col in (efficiency_cols + outcome_cols)[:2]:
opportunities['provider_rankings'].append(f"{provider_col} by {metric_col}")
for service_col in service_cols[:2]:
for metric_col in (efficiency_cols + outcome_cols)[:2]:
opportunities['service_comparisons'].append(f"{metric_col} across {service_col}")
for regional_col in regional_cols[:2]:
for metric_col in (efficiency_cols + outcome_cols)[:2]:
opportunities['regional_analysis'].append(f"{metric_col} by {regional_col}")
opportunities['outcome_metrics'].extend(outcome_cols[:3])
opportunities['efficiency_metrics'].extend(efficiency_cols[:3])
return opportunities
def build_data_findings_markdown(reg: DataRegistry, mapping: MappingResult, topn: int = 5):
"""Build generic healthcare data analysis report based on available data and mappings."""
missing: List[str] = []
sections = []
# Auto-discover healthcare analysis opportunities
opportunities = auto_discover_healthcare_analysis_opportunities(reg)
# Healthcare-specific analysis patterns
analysis_patterns = [
('provider rankings', ['facility', 'provider', 'hospital', 'clinic'], ['wait_time', 'wait_median', 'wait_days', 'wait_p90', 'cost', 'outcome']),
('service analysis', ['specialty', 'service', 'department', 'procedure', 'treatment'], ['wait_time', 'wait_median', 'wait_days', 'cost', 'outcome']),
('regional comparison', ['zone', 'region', 'area', 'district', 'network'], ['wait_time', 'wait_median', 'cost', 'outcome']),
('quality metrics', ['facility', 'service'], ['mortality', 'readmission', 'infection', 'complication', 'satisfaction']),
]
for analysis_name, entity_concepts, metric_concepts in analysis_patterns:
found_analysis = False
for entity_concept in entity_concepts:
for metric_concept in metric_concepts:
result = compute_generic_rankings(reg, mapping, entity_concept, metric_concept, analysis_name)
if result is not None and not result.empty:
sections.append(f"**Top {entity_concept.title()} by {metric_concept.replace('_', ' ').title()}**\n\n{_fmt_tbl(result.head(topn))}")
found_analysis = True
break
if found_analysis:
break
if not found_analysis:
missing.append(analysis_name)
# Healthcare-specific comparative analyses
comparison_patterns = [
('regional_performance', ['zone', 'region', 'area', 'district'], ['wait_time', 'wait_median', 'cost', 'outcome']),
('service_performance', ['specialty', 'service', 'department'], ['wait_time', 'wait_median', 'cost', 'outcome']),
('provider_comparison', ['facility', 'hospital', 'clinic'], ['efficiency', 'utilization', 'throughput']),
]
for analysis_name, group_concepts, metric_concepts in comparison_patterns:
found_analysis = False
for group_concept in group_concepts:
for metric_concept in metric_concepts:
result = compute_comparative_analysis(reg, mapping, group_concept, metric_concept)
if result is not None and not result.empty:
sections.append(f"**{group_concept.title()} Performance Comparison**\n\n{_fmt_tbl(result)}")
found_analysis = True
break
if found_analysis:
break
if not found_analysis:
missing.append(analysis_name)
# Healthcare capacity analysis
capacity = compute_capacity_metrics(reg, mapping)
if capacity is not None and not capacity.empty:
sections.append(f"**Healthcare Capacity Analysis**\n\n{_fmt_tbl(capacity)}")
else:
missing.append("capacity_analysis")
# Healthcare cost analysis
costs = compute_cost_metrics(reg, mapping)
if costs is not None and not costs.empty:
sections.append(f"**Healthcare Cost Analysis**\n\n{_fmt_tbl(costs)}")
else:
missing.append("cost_analysis")
# Build final healthcare report
if sections:
md = (
"### Healthcare Data Analysis Results\n\n" +
"\n\n".join(sections) +
"\n\n**Clinical Data Quality Notes**\n"
"- Analysis performed on available healthcare data columns\n"
"- Missing values and empty entries excluded from calculations\n"
"- Numeric values rounded to 1 decimal place for clinical relevance\n"
"- Rankings prioritize areas that may require clinical attention or resource allocation\n"
"- Record counts indicate data volume and statistical reliability\n"
)
else:
md = "### Healthcare Data Analysis Results\n\nNo analyzable healthcare patterns found in the provided data. Consider uploading data with healthcare facility, service, or outcome metrics."
return md, missing