Medica_DecisionSupportAI / schema_mapper.py
Rajan Sharma
Update schema_mapper.py
13953b9 verified
raw
history blame
12.2 kB
from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import Dict, List, Any, Tuple, Optional, Set
import pandas as pd
from data_registry import DataRegistry
# Generic concept patterns that work across domains
UNIVERSAL_CONCEPT_PATTERNS = {
# Entity/grouping concepts
"facility": [r"\bfacilit(y|ies)\b", r"\bhospital\b", r"\bsite\b", r"\bcentre\b", r"\bcenter\b", r"\blocation\b", r"\bprovider\b"],
"organization": [r"\borganization\b", r"\bcompany\b", r"\bbusiness\b", r"\bfirm\b", r"\bentity\b"],
"department": [r"\bdepartment\b", r"\bdivision\b", r"\bunit\b", r"\bsection\b"],
"specialty": [r"\bspecialt(y|ies)\b", r"\bservice\b", r"\btype\b", r"\bcategory\b", r"\bkind\b"],
"region": [r"\bzone\b", r"\bregion\b", r"\barea\b", r"\bdistrict\b", r"\bterritory\b"],
# Time-based metrics
"wait_time": [r"\bwait", r"\bdelay", r"\btime", r"\bduration", r"\blength"],
"wait_median": [r"\bmedian\b.*\bwait", r"\bP50\b", r"\bwait.*\bmedian", r"median.*time"],
"wait_p90": [r"\bp90\b", r"\b90(th)?\s*percentile\b", r"\bwait.*p90", r"90.*wait"],
"response_time": [r"\bresponse\b.*\btime\b", r"\bprocessing\b.*\btime\b"],
# Performance metrics
"score": [r"\bscore\b", r"\brating\b", r"\bindex\b", r"\brank\b"],
"efficiency": [r"\befficiency\b", r"\bthroughput\b", r"\bproductivity\b"],
"quality": [r"\bquality\b", r"\bperformance\b", r"\boutcome\b"],
"satisfaction": [r"\bsatisfaction\b", r"\bfeedback\b", r"\brating\b"],
# Capacity metrics
"capacity": [r"\bcapacity\b", r"\bvolume\b", r"\bsize\b", r"\blimit\b"],
"utilization": [r"\butilization\b", r"\boccupancy\b", r"\busage\b"],
"availability": [r"\bavailab\w+", r"\bopen\b", r"\bfree\b"],
# Cost/financial metrics
"cost": [r"\bcost\b", r"\bprice\b", r"\bexpense\b", r"\bfee\b", r"\bcharge\b"],
"budget": [r"\bbudget\b", r"\bfunding\b", r"\ballocation\b"],
"revenue": [r"\brevenue\b", r"\bincome\b", r"\bearnings\b"],
# Count/volume metrics
"count": [r"\bcount\b", r"\bnumber\b", r"\bquantity\b", r"\btotal\b"],
"rate": [r"\brate\b", r"\bratio\b", r"\bpercent\b", r"\bfrequency\b"],
"volume": [r"\bvolume\b", r"\bamount\b", r"\bquantity\b"]
}
def _extract_key_terms_from_scenario(scenario_text: str) -> Set[str]:
"""Extract important terms from scenario text to guide concept detection."""
if not scenario_text:
return set()
# Extract meaningful words, filtering out common stop words
stop_words = {
'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by',
'is', 'are', 'was', 'were', 'be', 'been', 'have', 'has', 'had', 'do', 'does', 'did',
'a', 'an', 'this', 'that', 'these', 'those', 'i', 'you', 'he', 'she', 'it', 'we', 'they'
}
words = re.findall(r'\b[a-zA-Z]{3,}\b', scenario_text.lower())
key_terms = {word for word in words if word not in stop_words}
return key_terms
def _generate_dynamic_patterns(scenario_terms: Set[str], existing_patterns: Dict[str, List[str]]) -> Dict[str, List[str]]:
"""Generate additional concept patterns based on scenario content."""
dynamic_patterns = existing_patterns.copy()
# Add scenario-specific terms as potential concepts
for term in scenario_terms:
if len(term) >= 4: # Only meaningful terms
# Check if term relates to existing concepts
term_pattern = rf"\b{re.escape(term)}\b"
# Add as potential entity if it sounds like one
if any(indicator in term for indicator in ['hospital', 'clinic', 'school', 'department', 'facility']):
if 'facility' not in dynamic_patterns:
dynamic_patterns['facility'] = []
dynamic_patterns['facility'].append(term_pattern)
# Add as potential metric if it sounds like one
elif any(indicator in term for indicator in ['time', 'score', 'rate', 'cost', 'wait']):
concept_key = f"metric_{term}"
dynamic_patterns[concept_key] = [term_pattern]
return dynamic_patterns
def _score_column_match(col_name: str, patterns: List[str], scenario_terms: Set[str] = None) -> int:
"""Score how well a column matches concept patterns."""
col_lower = col_name.lower()
score = 0
# Pattern matching
for i, pattern in enumerate(patterns):
if re.search(pattern, col_lower):
score += 100 - (i * 10) # Higher score for earlier patterns
break
# Boost score if column name contains scenario-relevant terms
if scenario_terms:
for term in scenario_terms:
if term in col_lower:
score += 25
return score
def _detect_column_types(df: pd.DataFrame) -> Dict[str, str]:
"""Detect the likely type/purpose of each column."""
column_types = {}
for col in df.columns:
col_lower = col.lower()
# Detect numeric columns that could be converted
sample = df[col].dropna().head(50)
numeric_convertible = False
if len(sample) > 0:
try:
numeric_sample = pd.to_numeric(sample, errors='coerce')
if numeric_sample.notna().sum() > len(sample) * 0.7:
numeric_convertible = True
except:
pass
# Categorize columns
if numeric_convertible:
if any(term in col_lower for term in ['id', 'number', 'code', 'index']):
column_types[col] = 'identifier'
elif any(term in col_lower for term in ['time', 'date', 'duration', 'wait', 'delay']):
column_types[col] = 'time_metric'
elif any(term in col_lower for term in ['cost', 'price', 'budget', 'fee', 'expense']):
column_types[col] = 'cost_metric'
elif any(term in col_lower for term in ['count', 'number', 'quantity', 'volume']):
column_types[col] = 'count_metric'
elif any(term in col_lower for term in ['rate', 'ratio', 'percent', 'score']):
column_types[col] = 'performance_metric'
else:
column_types[col] = 'numeric_metric'
else:
# String/categorical columns
unique_ratio = df[col].nunique() / len(df)
if unique_ratio < 0.1:
column_types[col] = 'category'
elif unique_ratio < 0.5:
column_types[col] = 'grouping'
else:
column_types[col] = 'text'
return column_types
@dataclass
class MappingResult:
resolved: Dict[str, Tuple[str, str]] = field(default_factory=dict)
ambiguous: Dict[str, List[Tuple[str, str]]] = field(default_factory=dict)
missing: List[str] = field(default_factory=list)
discovered: Dict[str, str] = field(default_factory=dict) # Discovered column types
def map_concepts(scenario_text: str, registry: DataRegistry) -> MappingResult:
"""Dynamically map concepts based on scenario content and available data."""
result = MappingResult()
if not registry.names():
result.missing = list(UNIVERSAL_CONCEPT_PATTERNS.keys())
return result
# Extract key terms from scenario
scenario_terms = _extract_key_terms_from_scenario(scenario_text)
# Generate dynamic patterns based on scenario
concept_patterns = _generate_dynamic_patterns(scenario_terms, UNIVERSAL_CONCEPT_PATTERNS)
# Collect all available columns
all_columns = []
for table in registry.iter_tables():
# Detect column types for this table
column_types = _detect_column_types(table.df)
result.discovered.update({f"{table.name}.{col}": col_type for col, col_type in column_types.items()})
for col in table.df.columns:
all_columns.append((table.name, str(col)))
# Map concepts to columns
for concept, patterns in concept_patterns.items():
scores = [
((tbl, col), _score_column_match(col, patterns, scenario_terms))
for (tbl, col) in all_columns
]
scores.sort(key=lambda x: x[1], reverse=True)
if not scores or scores[0][1] == 0:
result.missing.append(concept)
continue
top_score = scores[0][1]
# Find all columns with similar high scores (potential ambiguity)
threshold = max(50, top_score - 20)
high_scoring = [pair for pair, score in scores if score >= threshold]
if len(high_scoring) == 1:
tbl, col = high_scoring[0]
result.resolved[concept] = (tbl, col)
else:
# Multiple good matches - mark as ambiguous
result.ambiguous[concept] = high_scoring[:5] # Limit to top 5
return result
def build_phase1_questions(scenario_text: str, registry: DataRegistry, mapping: MappingResult, max_questions: int = 6) -> str:
"""Build clarifying questions based on scenario and data gaps."""
questions = []
scenario_lower = scenario_text.lower() if scenario_text else ""
# Data structure questions
if not mapping.resolved and not mapping.ambiguous:
questions.append("**Data Structure**: I don't see clear patterns in your data. Could you describe what each column represents?")
return "\n".join(questions)
# Ambiguous mappings - ask for clarification
important_concepts = ['facility', 'organization', 'department', 'specialty', 'region']
for concept in important_concepts:
if concept in mapping.ambiguous:
options = [f"{tbl}.{col}" for tbl, col in mapping.ambiguous[concept][:4]]
questions.append(f"**Entity Identification**: Which column represents the main {concept.replace('_', ' ')}? Options: {', '.join(options)}")
if len(questions) >= max_questions:
break
# Metric clarification
metric_concepts = ['wait_time', 'cost', 'score', 'performance', 'quality']
for concept in metric_concepts:
if concept in mapping.ambiguous:
options = [f"{tbl}.{col}" for tbl, col in mapping.ambiguous[concept][:3]]
questions.append(f"**Metric Clarification**: Which column best represents {concept.replace('_', ' ')}? Options: {', '.join(options)}")
if len(questions) >= max_questions:
break
# Missing critical data
if not any(concept in mapping.resolved for concept in ['facility', 'organization', 'department']):
questions.append("**Grouping Variable**: What should I group the analysis by? (e.g., facilities, departments, regions)")
if not any(concept in mapping.resolved for concept in ['wait_time', 'cost', 'score', 'performance']):
questions.append("**Key Metric**: What is the main metric you want to analyze? (e.g., performance scores, wait times, costs)")
# Scenario-specific questions
if any(term in scenario_lower for term in ['resource', 'allocation', 'priority']):
questions.append("**Resource Allocation**: What factors should guide resource prioritization? (e.g., volume, urgency, equity)")
if any(term in scenario_lower for term in ['comparison', 'benchmark', 'performance']):
questions.append("**Comparison Criteria**: How should different entities be compared? What constitutes good vs. poor performance?")
if any(term in scenario_lower for term in ['recommendation', 'decision', 'strategy']):
questions.append("**Decision Context**: What constraints or preferences should influence the recommendations? (e.g., budget limits, operational requirements)")
# Limit questions and format
questions = questions[:max_questions]
if not questions:
return "**Data Analysis Ready**: Your data appears well-structured. Please provide any additional context about your analysis goals."
formatted_questions = ["**Clarification Questions**", ""]
for i, q in enumerate(questions, 1):
formatted_questions.append(f"{i}. {q}")
return "\n".join(formatted_questions)