Rajan Sharma commited on
Commit
13953b9
·
verified ·
1 Parent(s): 1c8ef92

Update schema_mapper.py

Browse files
Files changed (1) hide show
  1. schema_mapper.py +232 -77
schema_mapper.py CHANGED
@@ -1,107 +1,262 @@
1
  from __future__ import annotations
2
  import re
3
  from dataclasses import dataclass, field
4
- from typing import Dict, List, Any, Tuple, Optional
5
  import pandas as pd
6
  from data_registry import DataRegistry
7
 
8
- CONCEPT_HINTS = {
9
- "facility": [r"\bfacilit(y|ies)\b", r"\bhospital\b", r"\bsite\b", r"\bcentre\b", r"\bcenter\b"],
10
- "specialty": [r"\bspecialt(y|ies)\b", r"\bservice\b", r"\bdepartment\b"],
11
- "zone": [r"\bzone\b", r"\bregion\b", r"\bhealth zone\b"],
12
- "wait_median": [r"\bmedian\b.*\bwait", r"\bP50\b.*\bwait", r"\bwait.*\bmedian"],
13
- "wait_p90": [r"\bp90\b.*\bwait", r"\b90(th)? percentile\b.*\bwait", r"\bwait.*p90"],
14
- "wait_days": [r"\bwait\b.*\bdays?\b", r"\bdays?\b.*\bwait\b"],
15
- "capacity_beds": [r"\bstaffed\b.*\bbeds?\b", r"\bbeds?\b"],
16
- "cost_fixed": [r"\bfixed\b.*\bcost", r"\bstartup\b.*\bcost"],
17
- "cost_variable": [r"\bvariable\b.*\bcost", r"\bcost.*per\b(client|case|visit)\b"],
18
- "clients_per_day": [r"\bclients?\b.*\bday\b", r"\bper[-_\s]?day\b.*clients?"],
19
- "teams": [r"\bteams?\b", r"\bscreen(ing)? team\b"],
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
  }
21
 
22
- def _score_col(col_name: str, patterns: List[str]) -> int:
23
- c = col_name.lower()
24
- for i, pat in enumerate(patterns):
25
- if re.search(pat, c):
26
- return 100 - i
27
- return 0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
 
29
  @dataclass
30
  class MappingResult:
31
  resolved: Dict[str, Tuple[str, str]] = field(default_factory=dict)
32
  ambiguous: Dict[str, List[Tuple[str, str]]] = field(default_factory=dict)
33
  missing: List[str] = field(default_factory=list)
 
34
 
35
  def map_concepts(scenario_text: str, registry: DataRegistry) -> MappingResult:
 
36
  result = MappingResult()
 
37
  if not registry.names():
38
- result.missing = list(CONCEPT_HINTS.keys())
39
  return result
40
 
41
- all_cols = []
42
- for t in registry.iter_tables():
43
- for c in t.df.columns:
44
- all_cols.append((t.name, str(c)))
 
 
 
 
 
 
 
 
 
 
 
45
 
46
- for concept, patterns in CONCEPT_HINTS.items():
47
- scores = [(((tbl, col)), _score_col(col, patterns)) for (tbl, col) in all_cols]
 
 
 
 
 
48
  scores.sort(key=lambda x: x[1], reverse=True)
 
49
  if not scores or scores[0][1] == 0:
50
  result.missing.append(concept)
51
  continue
 
52
  top_score = scores[0][1]
53
- near = [pair for pair, s in scores if s >= max(50, top_score - 5)]
54
- if len(near) == 1:
55
- tbl, col = near[0]
 
 
 
 
56
  result.resolved[concept] = (tbl, col)
57
  else:
58
- result.ambiguous[concept] = near
 
 
59
  return result
60
 
61
- def build_phase1_questions(scenario_text: str, registry: DataRegistry, mapping: MappingResult, max_groups: int = 5) -> str:
62
- groups: List[Tuple[str, str]] = []
63
-
64
- def _ask_disamb(concept: str, pairs: List[Tuple[str, str]], group: str, lead: str):
65
- opts = "; ".join([f"{t}.{c}" for t, c in pairs[:6]])
66
- groups.append((group, f"{lead} **Which column matches** `{concept}`? Options: {opts}"))
67
-
68
- if "facility" in mapping.ambiguous:
69
- _ask_disamb("facility", mapping.ambiguous["facility"], "Prioritization", "We need the facility identifier to aggregate by site.")
70
- elif "facility" in mapping.missing:
71
- groups.append(("Prioritization", "Provide the **facility/site** column to group results (name or ID)."))
72
-
73
- if "specialty" in mapping.ambiguous:
74
- _ask_disamb("specialty", mapping.ambiguous["specialty"], "Prioritization", "We need the specialty/service field to rank by specialty.")
75
- elif "specialty" in mapping.missing:
76
- groups.append(("Prioritization", "Provide the **specialty/service** column (e.g., General Surgery, Ortho)."))
77
-
78
- if "capacity_beds" in mapping.ambiguous:
79
- _ask_disamb("capacity_beds", mapping.ambiguous["capacity_beds"], "Capacity", "To estimate staffed capacity, confirm the **staffed beds** column.")
80
- elif "capacity_beds" in mapping.missing:
81
- groups.append(("Capacity", "Provide **staffed beds** (or equivalent capacity) column."))
82
-
83
- if "clients_per_day" in mapping.missing:
84
- groups.append(("Capacity", "What is the **clients per day** rate per team?"))
85
- if "teams" in mapping.missing:
86
- groups.append(("Capacity", "How many **teams** operate concurrently?"))
87
-
88
- if "cost_fixed" in mapping.missing:
89
- groups.append(("Cost", "Provide **fixed/startup cost** (or confirm none)."))
90
- if "cost_variable" in mapping.missing:
91
- groups.append(("Cost", "Provide **variable cost per client/case** (or unit definition)."))
92
-
93
- any_wait = ("wait_median" in mapping.resolved) or ("wait_p90" in mapping.resolved) or ("wait_days" in mapping.resolved)
94
- if not any_wait:
95
- groups.append(("Clinical", "Which columns capture **wait times** (median/p90 or days)?"))
96
-
97
- groups.append(("Recommendations", "Any operational constraints or equity priorities we should encode (scheduling limits, rural access, partnerships)?"))
98
-
99
- groups = groups[:max_groups]
100
- out = ["**Clarification Questions**"]
101
- cur = None
102
- for grp, q in groups:
103
- if grp != cur:
104
- out.append(f"\n**{grp}:**")
105
- cur = grp
106
- out.append(f"- {q}")
107
- return "\n".join(out)
 
 
 
 
 
 
 
 
 
 
1
  from __future__ import annotations
2
  import re
3
  from dataclasses import dataclass, field
4
+ from typing import Dict, List, Any, Tuple, Optional, Set
5
  import pandas as pd
6
  from data_registry import DataRegistry
7
 
8
+ # Generic concept patterns that work across domains
9
+ UNIVERSAL_CONCEPT_PATTERNS = {
10
+ # Entity/grouping concepts
11
+ "facility": [r"\bfacilit(y|ies)\b", r"\bhospital\b", r"\bsite\b", r"\bcentre\b", r"\bcenter\b", r"\blocation\b", r"\bprovider\b"],
12
+ "organization": [r"\borganization\b", r"\bcompany\b", r"\bbusiness\b", r"\bfirm\b", r"\bentity\b"],
13
+ "department": [r"\bdepartment\b", r"\bdivision\b", r"\bunit\b", r"\bsection\b"],
14
+ "specialty": [r"\bspecialt(y|ies)\b", r"\bservice\b", r"\btype\b", r"\bcategory\b", r"\bkind\b"],
15
+ "region": [r"\bzone\b", r"\bregion\b", r"\barea\b", r"\bdistrict\b", r"\bterritory\b"],
16
+
17
+ # Time-based metrics
18
+ "wait_time": [r"\bwait", r"\bdelay", r"\btime", r"\bduration", r"\blength"],
19
+ "wait_median": [r"\bmedian\b.*\bwait", r"\bP50\b", r"\bwait.*\bmedian", r"median.*time"],
20
+ "wait_p90": [r"\bp90\b", r"\b90(th)?\s*percentile\b", r"\bwait.*p90", r"90.*wait"],
21
+ "response_time": [r"\bresponse\b.*\btime\b", r"\bprocessing\b.*\btime\b"],
22
+
23
+ # Performance metrics
24
+ "score": [r"\bscore\b", r"\brating\b", r"\bindex\b", r"\brank\b"],
25
+ "efficiency": [r"\befficiency\b", r"\bthroughput\b", r"\bproductivity\b"],
26
+ "quality": [r"\bquality\b", r"\bperformance\b", r"\boutcome\b"],
27
+ "satisfaction": [r"\bsatisfaction\b", r"\bfeedback\b", r"\brating\b"],
28
+
29
+ # Capacity metrics
30
+ "capacity": [r"\bcapacity\b", r"\bvolume\b", r"\bsize\b", r"\blimit\b"],
31
+ "utilization": [r"\butilization\b", r"\boccupancy\b", r"\busage\b"],
32
+ "availability": [r"\bavailab\w+", r"\bopen\b", r"\bfree\b"],
33
+
34
+ # Cost/financial metrics
35
+ "cost": [r"\bcost\b", r"\bprice\b", r"\bexpense\b", r"\bfee\b", r"\bcharge\b"],
36
+ "budget": [r"\bbudget\b", r"\bfunding\b", r"\ballocation\b"],
37
+ "revenue": [r"\brevenue\b", r"\bincome\b", r"\bearnings\b"],
38
+
39
+ # Count/volume metrics
40
+ "count": [r"\bcount\b", r"\bnumber\b", r"\bquantity\b", r"\btotal\b"],
41
+ "rate": [r"\brate\b", r"\bratio\b", r"\bpercent\b", r"\bfrequency\b"],
42
+ "volume": [r"\bvolume\b", r"\bamount\b", r"\bquantity\b"]
43
  }
44
 
45
+ def _extract_key_terms_from_scenario(scenario_text: str) -> Set[str]:
46
+ """Extract important terms from scenario text to guide concept detection."""
47
+ if not scenario_text:
48
+ return set()
49
+
50
+ # Extract meaningful words, filtering out common stop words
51
+ stop_words = {
52
+ 'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by',
53
+ 'is', 'are', 'was', 'were', 'be', 'been', 'have', 'has', 'had', 'do', 'does', 'did',
54
+ 'a', 'an', 'this', 'that', 'these', 'those', 'i', 'you', 'he', 'she', 'it', 'we', 'they'
55
+ }
56
+
57
+ words = re.findall(r'\b[a-zA-Z]{3,}\b', scenario_text.lower())
58
+ key_terms = {word for word in words if word not in stop_words}
59
+
60
+ return key_terms
61
+
62
+ def _generate_dynamic_patterns(scenario_terms: Set[str], existing_patterns: Dict[str, List[str]]) -> Dict[str, List[str]]:
63
+ """Generate additional concept patterns based on scenario content."""
64
+ dynamic_patterns = existing_patterns.copy()
65
+
66
+ # Add scenario-specific terms as potential concepts
67
+ for term in scenario_terms:
68
+ if len(term) >= 4: # Only meaningful terms
69
+ # Check if term relates to existing concepts
70
+ term_pattern = rf"\b{re.escape(term)}\b"
71
+
72
+ # Add as potential entity if it sounds like one
73
+ if any(indicator in term for indicator in ['hospital', 'clinic', 'school', 'department', 'facility']):
74
+ if 'facility' not in dynamic_patterns:
75
+ dynamic_patterns['facility'] = []
76
+ dynamic_patterns['facility'].append(term_pattern)
77
+
78
+ # Add as potential metric if it sounds like one
79
+ elif any(indicator in term for indicator in ['time', 'score', 'rate', 'cost', 'wait']):
80
+ concept_key = f"metric_{term}"
81
+ dynamic_patterns[concept_key] = [term_pattern]
82
+
83
+ return dynamic_patterns
84
+
85
+ def _score_column_match(col_name: str, patterns: List[str], scenario_terms: Set[str] = None) -> int:
86
+ """Score how well a column matches concept patterns."""
87
+ col_lower = col_name.lower()
88
+ score = 0
89
+
90
+ # Pattern matching
91
+ for i, pattern in enumerate(patterns):
92
+ if re.search(pattern, col_lower):
93
+ score += 100 - (i * 10) # Higher score for earlier patterns
94
+ break
95
+
96
+ # Boost score if column name contains scenario-relevant terms
97
+ if scenario_terms:
98
+ for term in scenario_terms:
99
+ if term in col_lower:
100
+ score += 25
101
+
102
+ return score
103
+
104
+ def _detect_column_types(df: pd.DataFrame) -> Dict[str, str]:
105
+ """Detect the likely type/purpose of each column."""
106
+ column_types = {}
107
+
108
+ for col in df.columns:
109
+ col_lower = col.lower()
110
+
111
+ # Detect numeric columns that could be converted
112
+ sample = df[col].dropna().head(50)
113
+ numeric_convertible = False
114
+ if len(sample) > 0:
115
+ try:
116
+ numeric_sample = pd.to_numeric(sample, errors='coerce')
117
+ if numeric_sample.notna().sum() > len(sample) * 0.7:
118
+ numeric_convertible = True
119
+ except:
120
+ pass
121
+
122
+ # Categorize columns
123
+ if numeric_convertible:
124
+ if any(term in col_lower for term in ['id', 'number', 'code', 'index']):
125
+ column_types[col] = 'identifier'
126
+ elif any(term in col_lower for term in ['time', 'date', 'duration', 'wait', 'delay']):
127
+ column_types[col] = 'time_metric'
128
+ elif any(term in col_lower for term in ['cost', 'price', 'budget', 'fee', 'expense']):
129
+ column_types[col] = 'cost_metric'
130
+ elif any(term in col_lower for term in ['count', 'number', 'quantity', 'volume']):
131
+ column_types[col] = 'count_metric'
132
+ elif any(term in col_lower for term in ['rate', 'ratio', 'percent', 'score']):
133
+ column_types[col] = 'performance_metric'
134
+ else:
135
+ column_types[col] = 'numeric_metric'
136
+ else:
137
+ # String/categorical columns
138
+ unique_ratio = df[col].nunique() / len(df)
139
+ if unique_ratio < 0.1:
140
+ column_types[col] = 'category'
141
+ elif unique_ratio < 0.5:
142
+ column_types[col] = 'grouping'
143
+ else:
144
+ column_types[col] = 'text'
145
+
146
+ return column_types
147
 
148
  @dataclass
149
  class MappingResult:
150
  resolved: Dict[str, Tuple[str, str]] = field(default_factory=dict)
151
  ambiguous: Dict[str, List[Tuple[str, str]]] = field(default_factory=dict)
152
  missing: List[str] = field(default_factory=list)
153
+ discovered: Dict[str, str] = field(default_factory=dict) # Discovered column types
154
 
155
  def map_concepts(scenario_text: str, registry: DataRegistry) -> MappingResult:
156
+ """Dynamically map concepts based on scenario content and available data."""
157
  result = MappingResult()
158
+
159
  if not registry.names():
160
+ result.missing = list(UNIVERSAL_CONCEPT_PATTERNS.keys())
161
  return result
162
 
163
+ # Extract key terms from scenario
164
+ scenario_terms = _extract_key_terms_from_scenario(scenario_text)
165
+
166
+ # Generate dynamic patterns based on scenario
167
+ concept_patterns = _generate_dynamic_patterns(scenario_terms, UNIVERSAL_CONCEPT_PATTERNS)
168
+
169
+ # Collect all available columns
170
+ all_columns = []
171
+ for table in registry.iter_tables():
172
+ # Detect column types for this table
173
+ column_types = _detect_column_types(table.df)
174
+ result.discovered.update({f"{table.name}.{col}": col_type for col, col_type in column_types.items()})
175
+
176
+ for col in table.df.columns:
177
+ all_columns.append((table.name, str(col)))
178
 
179
+ # Map concepts to columns
180
+ for concept, patterns in concept_patterns.items():
181
+ scores = [
182
+ ((tbl, col), _score_column_match(col, patterns, scenario_terms))
183
+ for (tbl, col) in all_columns
184
+ ]
185
+
186
  scores.sort(key=lambda x: x[1], reverse=True)
187
+
188
  if not scores or scores[0][1] == 0:
189
  result.missing.append(concept)
190
  continue
191
+
192
  top_score = scores[0][1]
193
+
194
+ # Find all columns with similar high scores (potential ambiguity)
195
+ threshold = max(50, top_score - 20)
196
+ high_scoring = [pair for pair, score in scores if score >= threshold]
197
+
198
+ if len(high_scoring) == 1:
199
+ tbl, col = high_scoring[0]
200
  result.resolved[concept] = (tbl, col)
201
  else:
202
+ # Multiple good matches - mark as ambiguous
203
+ result.ambiguous[concept] = high_scoring[:5] # Limit to top 5
204
+
205
  return result
206
 
207
+ def build_phase1_questions(scenario_text: str, registry: DataRegistry, mapping: MappingResult, max_questions: int = 6) -> str:
208
+ """Build clarifying questions based on scenario and data gaps."""
209
+ questions = []
210
+ scenario_lower = scenario_text.lower() if scenario_text else ""
211
+
212
+ # Data structure questions
213
+ if not mapping.resolved and not mapping.ambiguous:
214
+ questions.append("**Data Structure**: I don't see clear patterns in your data. Could you describe what each column represents?")
215
+ return "\n".join(questions)
216
+
217
+ # Ambiguous mappings - ask for clarification
218
+ important_concepts = ['facility', 'organization', 'department', 'specialty', 'region']
219
+ for concept in important_concepts:
220
+ if concept in mapping.ambiguous:
221
+ options = [f"{tbl}.{col}" for tbl, col in mapping.ambiguous[concept][:4]]
222
+ questions.append(f"**Entity Identification**: Which column represents the main {concept.replace('_', ' ')}? Options: {', '.join(options)}")
223
+ if len(questions) >= max_questions:
224
+ break
225
+
226
+ # Metric clarification
227
+ metric_concepts = ['wait_time', 'cost', 'score', 'performance', 'quality']
228
+ for concept in metric_concepts:
229
+ if concept in mapping.ambiguous:
230
+ options = [f"{tbl}.{col}" for tbl, col in mapping.ambiguous[concept][:3]]
231
+ questions.append(f"**Metric Clarification**: Which column best represents {concept.replace('_', ' ')}? Options: {', '.join(options)}")
232
+ if len(questions) >= max_questions:
233
+ break
234
+
235
+ # Missing critical data
236
+ if not any(concept in mapping.resolved for concept in ['facility', 'organization', 'department']):
237
+ questions.append("**Grouping Variable**: What should I group the analysis by? (e.g., facilities, departments, regions)")
238
+
239
+ if not any(concept in mapping.resolved for concept in ['wait_time', 'cost', 'score', 'performance']):
240
+ questions.append("**Key Metric**: What is the main metric you want to analyze? (e.g., performance scores, wait times, costs)")
241
+
242
+ # Scenario-specific questions
243
+ if any(term in scenario_lower for term in ['resource', 'allocation', 'priority']):
244
+ questions.append("**Resource Allocation**: What factors should guide resource prioritization? (e.g., volume, urgency, equity)")
245
+
246
+ if any(term in scenario_lower for term in ['comparison', 'benchmark', 'performance']):
247
+ questions.append("**Comparison Criteria**: How should different entities be compared? What constitutes good vs. poor performance?")
248
+
249
+ if any(term in scenario_lower for term in ['recommendation', 'decision', 'strategy']):
250
+ questions.append("**Decision Context**: What constraints or preferences should influence the recommendations? (e.g., budget limits, operational requirements)")
251
+
252
+ # Limit questions and format
253
+ questions = questions[:max_questions]
254
+
255
+ if not questions:
256
+ return "**Data Analysis Ready**: Your data appears well-structured. Please provide any additional context about your analysis goals."
257
+
258
+ formatted_questions = ["**Clarification Questions**", ""]
259
+ for i, q in enumerate(questions, 1):
260
+ formatted_questions.append(f"{i}. {q}")
261
+
262
+ return "\n".join(formatted_questions)