from __future__ import annotations import re from dataclasses import dataclass, field from typing import Dict, List, Any, Tuple, Optional, Set import pandas as pd from data_registry import DataRegistry # Generic concept patterns that work across domains UNIVERSAL_CONCEPT_PATTERNS = { # Entity/grouping concepts "facility": [r"\bfacilit(y|ies)\b", r"\bhospital\b", r"\bsite\b", r"\bcentre\b", r"\bcenter\b", r"\blocation\b", r"\bprovider\b"], "organization": [r"\borganization\b", r"\bcompany\b", r"\bbusiness\b", r"\bfirm\b", r"\bentity\b"], "department": [r"\bdepartment\b", r"\bdivision\b", r"\bunit\b", r"\bsection\b"], "specialty": [r"\bspecialt(y|ies)\b", r"\bservice\b", r"\btype\b", r"\bcategory\b", r"\bkind\b"], "region": [r"\bzone\b", r"\bregion\b", r"\barea\b", r"\bdistrict\b", r"\bterritory\b"], # Time-based metrics "wait_time": [r"\bwait", r"\bdelay", r"\btime", r"\bduration", r"\blength"], "wait_median": [r"\bmedian\b.*\bwait", r"\bP50\b", r"\bwait.*\bmedian", r"median.*time"], "wait_p90": [r"\bp90\b", r"\b90(th)?\s*percentile\b", r"\bwait.*p90", r"90.*wait"], "response_time": [r"\bresponse\b.*\btime\b", r"\bprocessing\b.*\btime\b"], # Performance metrics "score": [r"\bscore\b", r"\brating\b", r"\bindex\b", r"\brank\b"], "efficiency": [r"\befficiency\b", r"\bthroughput\b", r"\bproductivity\b"], "quality": [r"\bquality\b", r"\bperformance\b", r"\boutcome\b"], "satisfaction": [r"\bsatisfaction\b", r"\bfeedback\b", r"\brating\b"], # Capacity metrics "capacity": [r"\bcapacity\b", r"\bvolume\b", r"\bsize\b", r"\blimit\b"], "utilization": [r"\butilization\b", r"\boccupancy\b", r"\busage\b"], "availability": [r"\bavailab\w+", r"\bopen\b", r"\bfree\b"], # Cost/financial metrics "cost": [r"\bcost\b", r"\bprice\b", r"\bexpense\b", r"\bfee\b", r"\bcharge\b"], "budget": [r"\bbudget\b", r"\bfunding\b", r"\ballocation\b"], "revenue": [r"\brevenue\b", r"\bincome\b", r"\bearnings\b"], # Count/volume metrics "count": [r"\bcount\b", r"\bnumber\b", r"\bquantity\b", r"\btotal\b"], "rate": [r"\brate\b", r"\bratio\b", r"\bpercent\b", r"\bfrequency\b"], "volume": [r"\bvolume\b", r"\bamount\b", r"\bquantity\b"] } def _extract_key_terms_from_scenario(scenario_text: str) -> Set[str]: """Extract important terms from scenario text to guide concept detection.""" if not scenario_text: return set() # Extract meaningful words, filtering out common stop words stop_words = { 'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'is', 'are', 'was', 'were', 'be', 'been', 'have', 'has', 'had', 'do', 'does', 'did', 'a', 'an', 'this', 'that', 'these', 'those', 'i', 'you', 'he', 'she', 'it', 'we', 'they' } words = re.findall(r'\b[a-zA-Z]{3,}\b', scenario_text.lower()) key_terms = {word for word in words if word not in stop_words} return key_terms def _generate_dynamic_patterns(scenario_terms: Set[str], existing_patterns: Dict[str, List[str]]) -> Dict[str, List[str]]: """Generate additional concept patterns based on scenario content.""" dynamic_patterns = existing_patterns.copy() # Add scenario-specific terms as potential concepts for term in scenario_terms: if len(term) >= 4: # Only meaningful terms # Check if term relates to existing concepts term_pattern = rf"\b{re.escape(term)}\b" # Add as potential entity if it sounds like one if any(indicator in term for indicator in ['hospital', 'clinic', 'school', 'department', 'facility']): if 'facility' not in dynamic_patterns: dynamic_patterns['facility'] = [] dynamic_patterns['facility'].append(term_pattern) # Add as potential metric if it sounds like one elif any(indicator in term for indicator in ['time', 'score', 'rate', 'cost', 'wait']): concept_key = f"metric_{term}" dynamic_patterns[concept_key] = [term_pattern] return dynamic_patterns def _score_column_match(col_name: str, patterns: List[str], scenario_terms: Set[str] = None) -> int: """Score how well a column matches concept patterns.""" col_lower = col_name.lower() score = 0 # Pattern matching for i, pattern in enumerate(patterns): if re.search(pattern, col_lower): score += 100 - (i * 10) # Higher score for earlier patterns break # Boost score if column name contains scenario-relevant terms if scenario_terms: for term in scenario_terms: if term in col_lower: score += 25 return score def _detect_column_types(df: pd.DataFrame) -> Dict[str, str]: """Detect the likely type/purpose of each column.""" column_types = {} for col in df.columns: col_lower = col.lower() # Detect numeric columns that could be converted sample = df[col].dropna().head(50) numeric_convertible = False if len(sample) > 0: try: numeric_sample = pd.to_numeric(sample, errors='coerce') if numeric_sample.notna().sum() > len(sample) * 0.7: numeric_convertible = True except: pass # Categorize columns if numeric_convertible: if any(term in col_lower for term in ['id', 'number', 'code', 'index']): column_types[col] = 'identifier' elif any(term in col_lower for term in ['time', 'date', 'duration', 'wait', 'delay']): column_types[col] = 'time_metric' elif any(term in col_lower for term in ['cost', 'price', 'budget', 'fee', 'expense']): column_types[col] = 'cost_metric' elif any(term in col_lower for term in ['count', 'number', 'quantity', 'volume']): column_types[col] = 'count_metric' elif any(term in col_lower for term in ['rate', 'ratio', 'percent', 'score']): column_types[col] = 'performance_metric' else: column_types[col] = 'numeric_metric' else: # String/categorical columns unique_ratio = df[col].nunique() / len(df) if unique_ratio < 0.1: column_types[col] = 'category' elif unique_ratio < 0.5: column_types[col] = 'grouping' else: column_types[col] = 'text' return column_types @dataclass class MappingResult: resolved: Dict[str, Tuple[str, str]] = field(default_factory=dict) ambiguous: Dict[str, List[Tuple[str, str]]] = field(default_factory=dict) missing: List[str] = field(default_factory=list) discovered: Dict[str, str] = field(default_factory=dict) # Discovered column types def map_concepts(scenario_text: str, registry: DataRegistry) -> MappingResult: """Dynamically map concepts based on scenario content and available data.""" result = MappingResult() if not registry.names(): result.missing = list(UNIVERSAL_CONCEPT_PATTERNS.keys()) return result # Extract key terms from scenario scenario_terms = _extract_key_terms_from_scenario(scenario_text) # Generate dynamic patterns based on scenario concept_patterns = _generate_dynamic_patterns(scenario_terms, UNIVERSAL_CONCEPT_PATTERNS) # Collect all available columns all_columns = [] for table in registry.iter_tables(): # Detect column types for this table column_types = _detect_column_types(table.df) result.discovered.update({f"{table.name}.{col}": col_type for col, col_type in column_types.items()}) for col in table.df.columns: all_columns.append((table.name, str(col))) # Map concepts to columns for concept, patterns in concept_patterns.items(): scores = [ ((tbl, col), _score_column_match(col, patterns, scenario_terms)) for (tbl, col) in all_columns ] scores.sort(key=lambda x: x[1], reverse=True) if not scores or scores[0][1] == 0: result.missing.append(concept) continue top_score = scores[0][1] # Find all columns with similar high scores (potential ambiguity) threshold = max(50, top_score - 20) high_scoring = [pair for pair, score in scores if score >= threshold] if len(high_scoring) == 1: tbl, col = high_scoring[0] result.resolved[concept] = (tbl, col) else: # Multiple good matches - mark as ambiguous result.ambiguous[concept] = high_scoring[:5] # Limit to top 5 return result def build_phase1_questions(scenario_text: str, registry: DataRegistry, mapping: MappingResult, max_questions: int = 6) -> str: """Build clarifying questions based on scenario and data gaps.""" questions = [] scenario_lower = scenario_text.lower() if scenario_text else "" # Data structure questions if not mapping.resolved and not mapping.ambiguous: questions.append("**Data Structure**: I don't see clear patterns in your data. Could you describe what each column represents?") return "\n".join(questions) # Ambiguous mappings - ask for clarification important_concepts = ['facility', 'organization', 'department', 'specialty', 'region'] for concept in important_concepts: if concept in mapping.ambiguous: options = [f"{tbl}.{col}" for tbl, col in mapping.ambiguous[concept][:4]] questions.append(f"**Entity Identification**: Which column represents the main {concept.replace('_', ' ')}? Options: {', '.join(options)}") if len(questions) >= max_questions: break # Metric clarification metric_concepts = ['wait_time', 'cost', 'score', 'performance', 'quality'] for concept in metric_concepts: if concept in mapping.ambiguous: options = [f"{tbl}.{col}" for tbl, col in mapping.ambiguous[concept][:3]] questions.append(f"**Metric Clarification**: Which column best represents {concept.replace('_', ' ')}? Options: {', '.join(options)}") if len(questions) >= max_questions: break # Missing critical data if not any(concept in mapping.resolved for concept in ['facility', 'organization', 'department']): questions.append("**Grouping Variable**: What should I group the analysis by? (e.g., facilities, departments, regions)") if not any(concept in mapping.resolved for concept in ['wait_time', 'cost', 'score', 'performance']): questions.append("**Key Metric**: What is the main metric you want to analyze? (e.g., performance scores, wait times, costs)") # Scenario-specific questions if any(term in scenario_lower for term in ['resource', 'allocation', 'priority']): questions.append("**Resource Allocation**: What factors should guide resource prioritization? (e.g., volume, urgency, equity)") if any(term in scenario_lower for term in ['comparison', 'benchmark', 'performance']): questions.append("**Comparison Criteria**: How should different entities be compared? What constitutes good vs. poor performance?") if any(term in scenario_lower for term in ['recommendation', 'decision', 'strategy']): questions.append("**Decision Context**: What constraints or preferences should influence the recommendations? (e.g., budget limits, operational requirements)") # Limit questions and format questions = questions[:max_questions] if not questions: return "**Data Analysis Ready**: Your data appears well-structured. Please provide any additional context about your analysis goals." formatted_questions = ["**Clarification Questions**", ""] for i, q in enumerate(questions, 1): formatted_questions.append(f"{i}. {q}") return "\n".join(formatted_questions)