from __future__ import annotations import re from dataclasses import dataclass, field from typing import Dict, List, Any, Tuple, Optional import pandas as pd from data_registry import DataRegistry CONCEPT_HINTS = { "facility": [r"\bfacilit(y|ies)\b", r"\bhospital\b", r"\bsite\b", r"\bcentre\b", r"\bcenter\b"], "specialty": [r"\bspecialt(y|ies)\b", r"\bservice\b", r"\bdepartment\b"], "zone": [r"\bzone\b", r"\bregion\b", r"\bhealth zone\b"], "wait_median": [r"\bmedian\b.*\bwait", r"\bP50\b.*\bwait", r"\bwait.*\bmedian"], "wait_p90": [r"\bp90\b.*\bwait", r"\b90(th)? percentile\b.*\bwait", r"\bwait.*p90"], "wait_days": [r"\bwait\b.*\bdays?\b", r"\bdays?\b.*\bwait\b"], "capacity_beds": [r"\bstaffed\b.*\bbeds?\b", r"\bbeds?\b"], "cost_fixed": [r"\bfixed\b.*\bcost", r"\bstartup\b.*\bcost"], "cost_variable": [r"\bvariable\b.*\bcost", r"\bcost.*per\b(client|case|visit)\b"], "clients_per_day": [r"\bclients?\b.*\bday\b", r"\bper[-_\s]?day\b.*clients?"], "teams": [r"\bteams?\b", r"\bscreen(ing)? team\b"], } def _score_col(col_name: str, patterns: List[str]) -> int: c = col_name.lower() for i, pat in enumerate(patterns): if re.search(pat, c): return 100 - i return 0 @dataclass class MappingResult: resolved: Dict[str, Tuple[str, str]] = field(default_factory=dict) ambiguous: Dict[str, List[Tuple[str, str]]] = field(default_factory=dict) missing: List[str] = field(default_factory=list) def map_concepts(scenario_text: str, registry: DataRegistry) -> MappingResult: result = MappingResult() if not registry.names(): result.missing = list(CONCEPT_HINTS.keys()) return result all_cols = [] for t in registry.iter_tables(): for c in t.df.columns: all_cols.append((t.name, str(c))) for concept, patterns in CONCEPT_HINTS.items(): scores = [(((tbl, col)), _score_col(col, patterns)) for (tbl, col) in all_cols] scores.sort(key=lambda x: x[1], reverse=True) if not scores or scores[0][1] == 0: result.missing.append(concept) continue top_score = scores[0][1] near = [pair for pair, s in scores if s >= max(50, top_score - 5)] if len(near) == 1: tbl, col = near[0] result.resolved[concept] = (tbl, col) else: result.ambiguous[concept] = near return result def build_phase1_questions(scenario_text: str, registry: DataRegistry, mapping: MappingResult, max_groups: int = 5) -> str: groups: List[Tuple[str, str]] = [] def _ask_disamb(concept: str, pairs: List[Tuple[str, str]], group: str, lead: str): opts = "; ".join([f"{t}.{c}" for t, c in pairs[:6]]) groups.append((group, f"{lead} **Which column matches** `{concept}`? Options: {opts}")) if "facility" in mapping.ambiguous: _ask_disamb("facility", mapping.ambiguous["facility"], "Prioritization", "We need the facility identifier to aggregate by site.") elif "facility" in mapping.missing: groups.append(("Prioritization", "Provide the **facility/site** column to group results (name or ID).")) if "specialty" in mapping.ambiguous: _ask_disamb("specialty", mapping.ambiguous["specialty"], "Prioritization", "We need the specialty/service field to rank by specialty.") elif "specialty" in mapping.missing: groups.append(("Prioritization", "Provide the **specialty/service** column (e.g., General Surgery, Ortho).")) if "capacity_beds" in mapping.ambiguous: _ask_disamb("capacity_beds", mapping.ambiguous["capacity_beds"], "Capacity", "To estimate staffed capacity, confirm the **staffed beds** column.") elif "capacity_beds" in mapping.missing: groups.append(("Capacity", "Provide **staffed beds** (or equivalent capacity) column.")) if "clients_per_day" in mapping.missing: groups.append(("Capacity", "What is the **clients per day** rate per team?")) if "teams" in mapping.missing: groups.append(("Capacity", "How many **teams** operate concurrently?")) if "cost_fixed" in mapping.missing: groups.append(("Cost", "Provide **fixed/startup cost** (or confirm none).")) if "cost_variable" in mapping.missing: groups.append(("Cost", "Provide **variable cost per client/case** (or unit definition).")) any_wait = ("wait_median" in mapping.resolved) or ("wait_p90" in mapping.resolved) or ("wait_days" in mapping.resolved) if not any_wait: groups.append(("Clinical", "Which columns capture **wait times** (median/p90 or days)?")) groups.append(("Recommendations", "Any operational constraints or equity priorities we should encode (scheduling limits, rural access, partnerships)?")) groups = groups[:max_groups] out = ["**Clarification Questions**"] cur = None for grp, q in groups: if grp != cur: out.append(f"\n**{grp}:**") cur = grp out.append(f"- {q}") return "\n".join(out)