Spaces:
Sleeping
Sleeping
File size: 12,233 Bytes
49f10c8 13953b9 49f10c8 13953b9 49f10c8 13953b9 49f10c8 13953b9 49f10c8 13953b9 49f10c8 13953b9 49f10c8 13953b9 49f10c8 13953b9 49f10c8 13953b9 49f10c8 13953b9 49f10c8 13953b9 49f10c8 13953b9 49f10c8 13953b9 49f10c8 13953b9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import Dict, List, Any, Tuple, Optional, Set
import pandas as pd
from data_registry import DataRegistry
# Generic concept patterns that work across domains
UNIVERSAL_CONCEPT_PATTERNS = {
# Entity/grouping concepts
"facility": [r"\bfacilit(y|ies)\b", r"\bhospital\b", r"\bsite\b", r"\bcentre\b", r"\bcenter\b", r"\blocation\b", r"\bprovider\b"],
"organization": [r"\borganization\b", r"\bcompany\b", r"\bbusiness\b", r"\bfirm\b", r"\bentity\b"],
"department": [r"\bdepartment\b", r"\bdivision\b", r"\bunit\b", r"\bsection\b"],
"specialty": [r"\bspecialt(y|ies)\b", r"\bservice\b", r"\btype\b", r"\bcategory\b", r"\bkind\b"],
"region": [r"\bzone\b", r"\bregion\b", r"\barea\b", r"\bdistrict\b", r"\bterritory\b"],
# Time-based metrics
"wait_time": [r"\bwait", r"\bdelay", r"\btime", r"\bduration", r"\blength"],
"wait_median": [r"\bmedian\b.*\bwait", r"\bP50\b", r"\bwait.*\bmedian", r"median.*time"],
"wait_p90": [r"\bp90\b", r"\b90(th)?\s*percentile\b", r"\bwait.*p90", r"90.*wait"],
"response_time": [r"\bresponse\b.*\btime\b", r"\bprocessing\b.*\btime\b"],
# Performance metrics
"score": [r"\bscore\b", r"\brating\b", r"\bindex\b", r"\brank\b"],
"efficiency": [r"\befficiency\b", r"\bthroughput\b", r"\bproductivity\b"],
"quality": [r"\bquality\b", r"\bperformance\b", r"\boutcome\b"],
"satisfaction": [r"\bsatisfaction\b", r"\bfeedback\b", r"\brating\b"],
# Capacity metrics
"capacity": [r"\bcapacity\b", r"\bvolume\b", r"\bsize\b", r"\blimit\b"],
"utilization": [r"\butilization\b", r"\boccupancy\b", r"\busage\b"],
"availability": [r"\bavailab\w+", r"\bopen\b", r"\bfree\b"],
# Cost/financial metrics
"cost": [r"\bcost\b", r"\bprice\b", r"\bexpense\b", r"\bfee\b", r"\bcharge\b"],
"budget": [r"\bbudget\b", r"\bfunding\b", r"\ballocation\b"],
"revenue": [r"\brevenue\b", r"\bincome\b", r"\bearnings\b"],
# Count/volume metrics
"count": [r"\bcount\b", r"\bnumber\b", r"\bquantity\b", r"\btotal\b"],
"rate": [r"\brate\b", r"\bratio\b", r"\bpercent\b", r"\bfrequency\b"],
"volume": [r"\bvolume\b", r"\bamount\b", r"\bquantity\b"]
}
def _extract_key_terms_from_scenario(scenario_text: str) -> Set[str]:
"""Extract important terms from scenario text to guide concept detection."""
if not scenario_text:
return set()
# Extract meaningful words, filtering out common stop words
stop_words = {
'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by',
'is', 'are', 'was', 'were', 'be', 'been', 'have', 'has', 'had', 'do', 'does', 'did',
'a', 'an', 'this', 'that', 'these', 'those', 'i', 'you', 'he', 'she', 'it', 'we', 'they'
}
words = re.findall(r'\b[a-zA-Z]{3,}\b', scenario_text.lower())
key_terms = {word for word in words if word not in stop_words}
return key_terms
def _generate_dynamic_patterns(scenario_terms: Set[str], existing_patterns: Dict[str, List[str]]) -> Dict[str, List[str]]:
"""Generate additional concept patterns based on scenario content."""
dynamic_patterns = existing_patterns.copy()
# Add scenario-specific terms as potential concepts
for term in scenario_terms:
if len(term) >= 4: # Only meaningful terms
# Check if term relates to existing concepts
term_pattern = rf"\b{re.escape(term)}\b"
# Add as potential entity if it sounds like one
if any(indicator in term for indicator in ['hospital', 'clinic', 'school', 'department', 'facility']):
if 'facility' not in dynamic_patterns:
dynamic_patterns['facility'] = []
dynamic_patterns['facility'].append(term_pattern)
# Add as potential metric if it sounds like one
elif any(indicator in term for indicator in ['time', 'score', 'rate', 'cost', 'wait']):
concept_key = f"metric_{term}"
dynamic_patterns[concept_key] = [term_pattern]
return dynamic_patterns
def _score_column_match(col_name: str, patterns: List[str], scenario_terms: Set[str] = None) -> int:
"""Score how well a column matches concept patterns."""
col_lower = col_name.lower()
score = 0
# Pattern matching
for i, pattern in enumerate(patterns):
if re.search(pattern, col_lower):
score += 100 - (i * 10) # Higher score for earlier patterns
break
# Boost score if column name contains scenario-relevant terms
if scenario_terms:
for term in scenario_terms:
if term in col_lower:
score += 25
return score
def _detect_column_types(df: pd.DataFrame) -> Dict[str, str]:
"""Detect the likely type/purpose of each column."""
column_types = {}
for col in df.columns:
col_lower = col.lower()
# Detect numeric columns that could be converted
sample = df[col].dropna().head(50)
numeric_convertible = False
if len(sample) > 0:
try:
numeric_sample = pd.to_numeric(sample, errors='coerce')
if numeric_sample.notna().sum() > len(sample) * 0.7:
numeric_convertible = True
except:
pass
# Categorize columns
if numeric_convertible:
if any(term in col_lower for term in ['id', 'number', 'code', 'index']):
column_types[col] = 'identifier'
elif any(term in col_lower for term in ['time', 'date', 'duration', 'wait', 'delay']):
column_types[col] = 'time_metric'
elif any(term in col_lower for term in ['cost', 'price', 'budget', 'fee', 'expense']):
column_types[col] = 'cost_metric'
elif any(term in col_lower for term in ['count', 'number', 'quantity', 'volume']):
column_types[col] = 'count_metric'
elif any(term in col_lower for term in ['rate', 'ratio', 'percent', 'score']):
column_types[col] = 'performance_metric'
else:
column_types[col] = 'numeric_metric'
else:
# String/categorical columns
unique_ratio = df[col].nunique() / len(df)
if unique_ratio < 0.1:
column_types[col] = 'category'
elif unique_ratio < 0.5:
column_types[col] = 'grouping'
else:
column_types[col] = 'text'
return column_types
@dataclass
class MappingResult:
resolved: Dict[str, Tuple[str, str]] = field(default_factory=dict)
ambiguous: Dict[str, List[Tuple[str, str]]] = field(default_factory=dict)
missing: List[str] = field(default_factory=list)
discovered: Dict[str, str] = field(default_factory=dict) # Discovered column types
def map_concepts(scenario_text: str, registry: DataRegistry) -> MappingResult:
"""Dynamically map concepts based on scenario content and available data."""
result = MappingResult()
if not registry.names():
result.missing = list(UNIVERSAL_CONCEPT_PATTERNS.keys())
return result
# Extract key terms from scenario
scenario_terms = _extract_key_terms_from_scenario(scenario_text)
# Generate dynamic patterns based on scenario
concept_patterns = _generate_dynamic_patterns(scenario_terms, UNIVERSAL_CONCEPT_PATTERNS)
# Collect all available columns
all_columns = []
for table in registry.iter_tables():
# Detect column types for this table
column_types = _detect_column_types(table.df)
result.discovered.update({f"{table.name}.{col}": col_type for col, col_type in column_types.items()})
for col in table.df.columns:
all_columns.append((table.name, str(col)))
# Map concepts to columns
for concept, patterns in concept_patterns.items():
scores = [
((tbl, col), _score_column_match(col, patterns, scenario_terms))
for (tbl, col) in all_columns
]
scores.sort(key=lambda x: x[1], reverse=True)
if not scores or scores[0][1] == 0:
result.missing.append(concept)
continue
top_score = scores[0][1]
# Find all columns with similar high scores (potential ambiguity)
threshold = max(50, top_score - 20)
high_scoring = [pair for pair, score in scores if score >= threshold]
if len(high_scoring) == 1:
tbl, col = high_scoring[0]
result.resolved[concept] = (tbl, col)
else:
# Multiple good matches - mark as ambiguous
result.ambiguous[concept] = high_scoring[:5] # Limit to top 5
return result
def build_phase1_questions(scenario_text: str, registry: DataRegistry, mapping: MappingResult, max_questions: int = 6) -> str:
"""Build clarifying questions based on scenario and data gaps."""
questions = []
scenario_lower = scenario_text.lower() if scenario_text else ""
# Data structure questions
if not mapping.resolved and not mapping.ambiguous:
questions.append("**Data Structure**: I don't see clear patterns in your data. Could you describe what each column represents?")
return "\n".join(questions)
# Ambiguous mappings - ask for clarification
important_concepts = ['facility', 'organization', 'department', 'specialty', 'region']
for concept in important_concepts:
if concept in mapping.ambiguous:
options = [f"{tbl}.{col}" for tbl, col in mapping.ambiguous[concept][:4]]
questions.append(f"**Entity Identification**: Which column represents the main {concept.replace('_', ' ')}? Options: {', '.join(options)}")
if len(questions) >= max_questions:
break
# Metric clarification
metric_concepts = ['wait_time', 'cost', 'score', 'performance', 'quality']
for concept in metric_concepts:
if concept in mapping.ambiguous:
options = [f"{tbl}.{col}" for tbl, col in mapping.ambiguous[concept][:3]]
questions.append(f"**Metric Clarification**: Which column best represents {concept.replace('_', ' ')}? Options: {', '.join(options)}")
if len(questions) >= max_questions:
break
# Missing critical data
if not any(concept in mapping.resolved for concept in ['facility', 'organization', 'department']):
questions.append("**Grouping Variable**: What should I group the analysis by? (e.g., facilities, departments, regions)")
if not any(concept in mapping.resolved for concept in ['wait_time', 'cost', 'score', 'performance']):
questions.append("**Key Metric**: What is the main metric you want to analyze? (e.g., performance scores, wait times, costs)")
# Scenario-specific questions
if any(term in scenario_lower for term in ['resource', 'allocation', 'priority']):
questions.append("**Resource Allocation**: What factors should guide resource prioritization? (e.g., volume, urgency, equity)")
if any(term in scenario_lower for term in ['comparison', 'benchmark', 'performance']):
questions.append("**Comparison Criteria**: How should different entities be compared? What constitutes good vs. poor performance?")
if any(term in scenario_lower for term in ['recommendation', 'decision', 'strategy']):
questions.append("**Decision Context**: What constraints or preferences should influence the recommendations? (e.g., budget limits, operational requirements)")
# Limit questions and format
questions = questions[:max_questions]
if not questions:
return "**Data Analysis Ready**: Your data appears well-structured. Please provide any additional context about your analysis goals."
formatted_questions = ["**Clarification Questions**", ""]
for i, q in enumerate(questions, 1):
formatted_questions.append(f"{i}. {q}")
return "\n".join(formatted_questions) |