Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import re | |
| from dataclasses import dataclass, field | |
| from typing import Dict, List, Any, Tuple, Optional | |
| import pandas as pd | |
| from data_registry import DataRegistry | |
| CONCEPT_HINTS = { | |
| "facility": [r"\bfacilit(y|ies)\b", r"\bhospital\b", r"\bsite\b", r"\bcentre\b", r"\bcenter\b"], | |
| "specialty": [r"\bspecialt(y|ies)\b", r"\bservice\b", r"\bdepartment\b"], | |
| "zone": [r"\bzone\b", r"\bregion\b", r"\bhealth zone\b"], | |
| "wait_median": [r"\bmedian\b.*\bwait", r"\bP50\b.*\bwait", r"\bwait.*\bmedian"], | |
| "wait_p90": [r"\bp90\b.*\bwait", r"\b90(th)? percentile\b.*\bwait", r"\bwait.*p90"], | |
| "wait_days": [r"\bwait\b.*\bdays?\b", r"\bdays?\b.*\bwait\b"], | |
| "capacity_beds": [r"\bstaffed\b.*\bbeds?\b", r"\bbeds?\b"], | |
| "cost_fixed": [r"\bfixed\b.*\bcost", r"\bstartup\b.*\bcost"], | |
| "cost_variable": [r"\bvariable\b.*\bcost", r"\bcost.*per\b(client|case|visit)\b"], | |
| "clients_per_day": [r"\bclients?\b.*\bday\b", r"\bper[-_\s]?day\b.*clients?"], | |
| "teams": [r"\bteams?\b", r"\bscreen(ing)? team\b"], | |
| } | |
| def _score_col(col_name: str, patterns: List[str]) -> int: | |
| c = col_name.lower() | |
| for i, pat in enumerate(patterns): | |
| if re.search(pat, c): | |
| return 100 - i | |
| return 0 | |
| class MappingResult: | |
| resolved: Dict[str, Tuple[str, str]] = field(default_factory=dict) | |
| ambiguous: Dict[str, List[Tuple[str, str]]] = field(default_factory=dict) | |
| missing: List[str] = field(default_factory=list) | |
| def map_concepts(scenario_text: str, registry: DataRegistry) -> MappingResult: | |
| result = MappingResult() | |
| if not registry.names(): | |
| result.missing = list(CONCEPT_HINTS.keys()) | |
| return result | |
| all_cols = [] | |
| for t in registry.iter_tables(): | |
| for c in t.df.columns: | |
| all_cols.append((t.name, str(c))) | |
| for concept, patterns in CONCEPT_HINTS.items(): | |
| scores = [(((tbl, col)), _score_col(col, patterns)) for (tbl, col) in all_cols] | |
| scores.sort(key=lambda x: x[1], reverse=True) | |
| if not scores or scores[0][1] == 0: | |
| result.missing.append(concept) | |
| continue | |
| top_score = scores[0][1] | |
| near = [pair for pair, s in scores if s >= max(50, top_score - 5)] | |
| if len(near) == 1: | |
| tbl, col = near[0] | |
| result.resolved[concept] = (tbl, col) | |
| else: | |
| result.ambiguous[concept] = near | |
| return result | |
| def build_phase1_questions(scenario_text: str, registry: DataRegistry, mapping: MappingResult, max_groups: int = 5) -> str: | |
| groups: List[Tuple[str, str]] = [] | |
| def _ask_disamb(concept: str, pairs: List[Tuple[str, str]], group: str, lead: str): | |
| opts = "; ".join([f"{t}.{c}" for t, c in pairs[:6]]) | |
| groups.append((group, f"{lead} **Which column matches** `{concept}`? Options: {opts}")) | |
| if "facility" in mapping.ambiguous: | |
| _ask_disamb("facility", mapping.ambiguous["facility"], "Prioritization", "We need the facility identifier to aggregate by site.") | |
| elif "facility" in mapping.missing: | |
| groups.append(("Prioritization", "Provide the **facility/site** column to group results (name or ID).")) | |
| if "specialty" in mapping.ambiguous: | |
| _ask_disamb("specialty", mapping.ambiguous["specialty"], "Prioritization", "We need the specialty/service field to rank by specialty.") | |
| elif "specialty" in mapping.missing: | |
| groups.append(("Prioritization", "Provide the **specialty/service** column (e.g., General Surgery, Ortho).")) | |
| if "capacity_beds" in mapping.ambiguous: | |
| _ask_disamb("capacity_beds", mapping.ambiguous["capacity_beds"], "Capacity", "To estimate staffed capacity, confirm the **staffed beds** column.") | |
| elif "capacity_beds" in mapping.missing: | |
| groups.append(("Capacity", "Provide **staffed beds** (or equivalent capacity) column.")) | |
| if "clients_per_day" in mapping.missing: | |
| groups.append(("Capacity", "What is the **clients per day** rate per team?")) | |
| if "teams" in mapping.missing: | |
| groups.append(("Capacity", "How many **teams** operate concurrently?")) | |
| if "cost_fixed" in mapping.missing: | |
| groups.append(("Cost", "Provide **fixed/startup cost** (or confirm none).")) | |
| if "cost_variable" in mapping.missing: | |
| groups.append(("Cost", "Provide **variable cost per client/case** (or unit definition).")) | |
| any_wait = ("wait_median" in mapping.resolved) or ("wait_p90" in mapping.resolved) or ("wait_days" in mapping.resolved) | |
| if not any_wait: | |
| groups.append(("Clinical", "Which columns capture **wait times** (median/p90 or days)?")) | |
| groups.append(("Recommendations", "Any operational constraints or equity priorities we should encode (scheduling limits, rural access, partnerships)?")) | |
| groups = groups[:max_groups] | |
| out = ["**Clarification Questions**"] | |
| cur = None | |
| for grp, q in groups: | |
| if grp != cur: | |
| out.append(f"\n**{grp}:**") | |
| cur = grp | |
| out.append(f"- {q}") | |
| return "\n".join(out) | |