Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import re | |
| from dataclasses import dataclass, field | |
| from typing import Dict, List, Any, Tuple, Optional, Set | |
| import pandas as pd | |
| from data_registry import DataRegistry | |
| # Generic concept patterns that work across domains | |
| UNIVERSAL_CONCEPT_PATTERNS = { | |
| # Entity/grouping concepts | |
| "facility": [r"\bfacilit(y|ies)\b", r"\bhospital\b", r"\bsite\b", r"\bcentre\b", r"\bcenter\b", r"\blocation\b", r"\bprovider\b"], | |
| "organization": [r"\borganization\b", r"\bcompany\b", r"\bbusiness\b", r"\bfirm\b", r"\bentity\b"], | |
| "department": [r"\bdepartment\b", r"\bdivision\b", r"\bunit\b", r"\bsection\b"], | |
| "specialty": [r"\bspecialt(y|ies)\b", r"\bservice\b", r"\btype\b", r"\bcategory\b", r"\bkind\b"], | |
| "region": [r"\bzone\b", r"\bregion\b", r"\barea\b", r"\bdistrict\b", r"\bterritory\b"], | |
| # Time-based metrics | |
| "wait_time": [r"\bwait", r"\bdelay", r"\btime", r"\bduration", r"\blength"], | |
| "wait_median": [r"\bmedian\b.*\bwait", r"\bP50\b", r"\bwait.*\bmedian", r"median.*time"], | |
| "wait_p90": [r"\bp90\b", r"\b90(th)?\s*percentile\b", r"\bwait.*p90", r"90.*wait"], | |
| "response_time": [r"\bresponse\b.*\btime\b", r"\bprocessing\b.*\btime\b"], | |
| # Performance metrics | |
| "score": [r"\bscore\b", r"\brating\b", r"\bindex\b", r"\brank\b"], | |
| "efficiency": [r"\befficiency\b", r"\bthroughput\b", r"\bproductivity\b"], | |
| "quality": [r"\bquality\b", r"\bperformance\b", r"\boutcome\b"], | |
| "satisfaction": [r"\bsatisfaction\b", r"\bfeedback\b", r"\brating\b"], | |
| # Capacity metrics | |
| "capacity": [r"\bcapacity\b", r"\bvolume\b", r"\bsize\b", r"\blimit\b"], | |
| "utilization": [r"\butilization\b", r"\boccupancy\b", r"\busage\b"], | |
| "availability": [r"\bavailab\w+", r"\bopen\b", r"\bfree\b"], | |
| # Cost/financial metrics | |
| "cost": [r"\bcost\b", r"\bprice\b", r"\bexpense\b", r"\bfee\b", r"\bcharge\b"], | |
| "budget": [r"\bbudget\b", r"\bfunding\b", r"\ballocation\b"], | |
| "revenue": [r"\brevenue\b", r"\bincome\b", r"\bearnings\b"], | |
| # Count/volume metrics | |
| "count": [r"\bcount\b", r"\bnumber\b", r"\bquantity\b", r"\btotal\b"], | |
| "rate": [r"\brate\b", r"\bratio\b", r"\bpercent\b", r"\bfrequency\b"], | |
| "volume": [r"\bvolume\b", r"\bamount\b", r"\bquantity\b"] | |
| } | |
| def _extract_key_terms_from_scenario(scenario_text: str) -> Set[str]: | |
| """Extract important terms from scenario text to guide concept detection.""" | |
| if not scenario_text: | |
| return set() | |
| # Extract meaningful words, filtering out common stop words | |
| stop_words = { | |
| 'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', | |
| 'is', 'are', 'was', 'were', 'be', 'been', 'have', 'has', 'had', 'do', 'does', 'did', | |
| 'a', 'an', 'this', 'that', 'these', 'those', 'i', 'you', 'he', 'she', 'it', 'we', 'they' | |
| } | |
| words = re.findall(r'\b[a-zA-Z]{3,}\b', scenario_text.lower()) | |
| key_terms = {word for word in words if word not in stop_words} | |
| return key_terms | |
| def _generate_dynamic_patterns(scenario_terms: Set[str], existing_patterns: Dict[str, List[str]]) -> Dict[str, List[str]]: | |
| """Generate additional concept patterns based on scenario content.""" | |
| dynamic_patterns = existing_patterns.copy() | |
| # Add scenario-specific terms as potential concepts | |
| for term in scenario_terms: | |
| if len(term) >= 4: # Only meaningful terms | |
| # Check if term relates to existing concepts | |
| term_pattern = rf"\b{re.escape(term)}\b" | |
| # Add as potential entity if it sounds like one | |
| if any(indicator in term for indicator in ['hospital', 'clinic', 'school', 'department', 'facility']): | |
| if 'facility' not in dynamic_patterns: | |
| dynamic_patterns['facility'] = [] | |
| dynamic_patterns['facility'].append(term_pattern) | |
| # Add as potential metric if it sounds like one | |
| elif any(indicator in term for indicator in ['time', 'score', 'rate', 'cost', 'wait']): | |
| concept_key = f"metric_{term}" | |
| dynamic_patterns[concept_key] = [term_pattern] | |
| return dynamic_patterns | |
| def _score_column_match(col_name: str, patterns: List[str], scenario_terms: Set[str] = None) -> int: | |
| """Score how well a column matches concept patterns.""" | |
| col_lower = col_name.lower() | |
| score = 0 | |
| # Pattern matching | |
| for i, pattern in enumerate(patterns): | |
| if re.search(pattern, col_lower): | |
| score += 100 - (i * 10) # Higher score for earlier patterns | |
| break | |
| # Boost score if column name contains scenario-relevant terms | |
| if scenario_terms: | |
| for term in scenario_terms: | |
| if term in col_lower: | |
| score += 25 | |
| return score | |
| def _detect_column_types(df: pd.DataFrame) -> Dict[str, str]: | |
| """Detect the likely type/purpose of each column.""" | |
| column_types = {} | |
| for col in df.columns: | |
| col_lower = col.lower() | |
| # Detect numeric columns that could be converted | |
| sample = df[col].dropna().head(50) | |
| numeric_convertible = False | |
| if len(sample) > 0: | |
| try: | |
| numeric_sample = pd.to_numeric(sample, errors='coerce') | |
| if numeric_sample.notna().sum() > len(sample) * 0.7: | |
| numeric_convertible = True | |
| except: | |
| pass | |
| # Categorize columns | |
| if numeric_convertible: | |
| if any(term in col_lower for term in ['id', 'number', 'code', 'index']): | |
| column_types[col] = 'identifier' | |
| elif any(term in col_lower for term in ['time', 'date', 'duration', 'wait', 'delay']): | |
| column_types[col] = 'time_metric' | |
| elif any(term in col_lower for term in ['cost', 'price', 'budget', 'fee', 'expense']): | |
| column_types[col] = 'cost_metric' | |
| elif any(term in col_lower for term in ['count', 'number', 'quantity', 'volume']): | |
| column_types[col] = 'count_metric' | |
| elif any(term in col_lower for term in ['rate', 'ratio', 'percent', 'score']): | |
| column_types[col] = 'performance_metric' | |
| else: | |
| column_types[col] = 'numeric_metric' | |
| else: | |
| # String/categorical columns | |
| unique_ratio = df[col].nunique() / len(df) | |
| if unique_ratio < 0.1: | |
| column_types[col] = 'category' | |
| elif unique_ratio < 0.5: | |
| column_types[col] = 'grouping' | |
| else: | |
| column_types[col] = 'text' | |
| return column_types | |
| class MappingResult: | |
| resolved: Dict[str, Tuple[str, str]] = field(default_factory=dict) | |
| ambiguous: Dict[str, List[Tuple[str, str]]] = field(default_factory=dict) | |
| missing: List[str] = field(default_factory=list) | |
| discovered: Dict[str, str] = field(default_factory=dict) # Discovered column types | |
| def map_concepts(scenario_text: str, registry: DataRegistry) -> MappingResult: | |
| """Dynamically map concepts based on scenario content and available data.""" | |
| result = MappingResult() | |
| if not registry.names(): | |
| result.missing = list(UNIVERSAL_CONCEPT_PATTERNS.keys()) | |
| return result | |
| # Extract key terms from scenario | |
| scenario_terms = _extract_key_terms_from_scenario(scenario_text) | |
| # Generate dynamic patterns based on scenario | |
| concept_patterns = _generate_dynamic_patterns(scenario_terms, UNIVERSAL_CONCEPT_PATTERNS) | |
| # Collect all available columns | |
| all_columns = [] | |
| for table in registry.iter_tables(): | |
| # Detect column types for this table | |
| column_types = _detect_column_types(table.df) | |
| result.discovered.update({f"{table.name}.{col}": col_type for col, col_type in column_types.items()}) | |
| for col in table.df.columns: | |
| all_columns.append((table.name, str(col))) | |
| # Map concepts to columns | |
| for concept, patterns in concept_patterns.items(): | |
| scores = [ | |
| ((tbl, col), _score_column_match(col, patterns, scenario_terms)) | |
| for (tbl, col) in all_columns | |
| ] | |
| scores.sort(key=lambda x: x[1], reverse=True) | |
| if not scores or scores[0][1] == 0: | |
| result.missing.append(concept) | |
| continue | |
| top_score = scores[0][1] | |
| # Find all columns with similar high scores (potential ambiguity) | |
| threshold = max(50, top_score - 20) | |
| high_scoring = [pair for pair, score in scores if score >= threshold] | |
| if len(high_scoring) == 1: | |
| tbl, col = high_scoring[0] | |
| result.resolved[concept] = (tbl, col) | |
| else: | |
| # Multiple good matches - mark as ambiguous | |
| result.ambiguous[concept] = high_scoring[:5] # Limit to top 5 | |
| return result | |
| def build_phase1_questions(scenario_text: str, registry: DataRegistry, mapping: MappingResult, max_questions: int = 6) -> str: | |
| """Build clarifying questions based on scenario and data gaps.""" | |
| questions = [] | |
| scenario_lower = scenario_text.lower() if scenario_text else "" | |
| # Data structure questions | |
| if not mapping.resolved and not mapping.ambiguous: | |
| questions.append("**Data Structure**: I don't see clear patterns in your data. Could you describe what each column represents?") | |
| return "\n".join(questions) | |
| # Ambiguous mappings - ask for clarification | |
| important_concepts = ['facility', 'organization', 'department', 'specialty', 'region'] | |
| for concept in important_concepts: | |
| if concept in mapping.ambiguous: | |
| options = [f"{tbl}.{col}" for tbl, col in mapping.ambiguous[concept][:4]] | |
| questions.append(f"**Entity Identification**: Which column represents the main {concept.replace('_', ' ')}? Options: {', '.join(options)}") | |
| if len(questions) >= max_questions: | |
| break | |
| # Metric clarification | |
| metric_concepts = ['wait_time', 'cost', 'score', 'performance', 'quality'] | |
| for concept in metric_concepts: | |
| if concept in mapping.ambiguous: | |
| options = [f"{tbl}.{col}" for tbl, col in mapping.ambiguous[concept][:3]] | |
| questions.append(f"**Metric Clarification**: Which column best represents {concept.replace('_', ' ')}? Options: {', '.join(options)}") | |
| if len(questions) >= max_questions: | |
| break | |
| # Missing critical data | |
| if not any(concept in mapping.resolved for concept in ['facility', 'organization', 'department']): | |
| questions.append("**Grouping Variable**: What should I group the analysis by? (e.g., facilities, departments, regions)") | |
| if not any(concept in mapping.resolved for concept in ['wait_time', 'cost', 'score', 'performance']): | |
| questions.append("**Key Metric**: What is the main metric you want to analyze? (e.g., performance scores, wait times, costs)") | |
| # Scenario-specific questions | |
| if any(term in scenario_lower for term in ['resource', 'allocation', 'priority']): | |
| questions.append("**Resource Allocation**: What factors should guide resource prioritization? (e.g., volume, urgency, equity)") | |
| if any(term in scenario_lower for term in ['comparison', 'benchmark', 'performance']): | |
| questions.append("**Comparison Criteria**: How should different entities be compared? What constitutes good vs. poor performance?") | |
| if any(term in scenario_lower for term in ['recommendation', 'decision', 'strategy']): | |
| questions.append("**Decision Context**: What constraints or preferences should influence the recommendations? (e.g., budget limits, operational requirements)") | |
| # Limit questions and format | |
| questions = questions[:max_questions] | |
| if not questions: | |
| return "**Data Analysis Ready**: Your data appears well-structured. Please provide any additional context about your analysis goals." | |
| formatted_questions = ["**Clarification Questions**", ""] | |
| for i, q in enumerate(questions, 1): | |
| formatted_questions.append(f"{i}. {q}") | |
| return "\n".join(formatted_questions) |