Medica_DecisionSupportAI / schema_mapper.py
Rajan Sharma
Create schema_mapper.py
49f10c8 verified
raw
history blame
5.03 kB
from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import Dict, List, Any, Tuple, Optional
import pandas as pd
from data_registry import DataRegistry
CONCEPT_HINTS = {
"facility": [r"\bfacilit(y|ies)\b", r"\bhospital\b", r"\bsite\b", r"\bcentre\b", r"\bcenter\b"],
"specialty": [r"\bspecialt(y|ies)\b", r"\bservice\b", r"\bdepartment\b"],
"zone": [r"\bzone\b", r"\bregion\b", r"\bhealth zone\b"],
"wait_median": [r"\bmedian\b.*\bwait", r"\bP50\b.*\bwait", r"\bwait.*\bmedian"],
"wait_p90": [r"\bp90\b.*\bwait", r"\b90(th)? percentile\b.*\bwait", r"\bwait.*p90"],
"wait_days": [r"\bwait\b.*\bdays?\b", r"\bdays?\b.*\bwait\b"],
"capacity_beds": [r"\bstaffed\b.*\bbeds?\b", r"\bbeds?\b"],
"cost_fixed": [r"\bfixed\b.*\bcost", r"\bstartup\b.*\bcost"],
"cost_variable": [r"\bvariable\b.*\bcost", r"\bcost.*per\b(client|case|visit)\b"],
"clients_per_day": [r"\bclients?\b.*\bday\b", r"\bper[-_\s]?day\b.*clients?"],
"teams": [r"\bteams?\b", r"\bscreen(ing)? team\b"],
}
def _score_col(col_name: str, patterns: List[str]) -> int:
c = col_name.lower()
for i, pat in enumerate(patterns):
if re.search(pat, c):
return 100 - i
return 0
@dataclass
class MappingResult:
resolved: Dict[str, Tuple[str, str]] = field(default_factory=dict)
ambiguous: Dict[str, List[Tuple[str, str]]] = field(default_factory=dict)
missing: List[str] = field(default_factory=list)
def map_concepts(scenario_text: str, registry: DataRegistry) -> MappingResult:
result = MappingResult()
if not registry.names():
result.missing = list(CONCEPT_HINTS.keys())
return result
all_cols = []
for t in registry.iter_tables():
for c in t.df.columns:
all_cols.append((t.name, str(c)))
for concept, patterns in CONCEPT_HINTS.items():
scores = [(((tbl, col)), _score_col(col, patterns)) for (tbl, col) in all_cols]
scores.sort(key=lambda x: x[1], reverse=True)
if not scores or scores[0][1] == 0:
result.missing.append(concept)
continue
top_score = scores[0][1]
near = [pair for pair, s in scores if s >= max(50, top_score - 5)]
if len(near) == 1:
tbl, col = near[0]
result.resolved[concept] = (tbl, col)
else:
result.ambiguous[concept] = near
return result
def build_phase1_questions(scenario_text: str, registry: DataRegistry, mapping: MappingResult, max_groups: int = 5) -> str:
groups: List[Tuple[str, str]] = []
def _ask_disamb(concept: str, pairs: List[Tuple[str, str]], group: str, lead: str):
opts = "; ".join([f"{t}.{c}" for t, c in pairs[:6]])
groups.append((group, f"{lead} **Which column matches** `{concept}`? Options: {opts}"))
if "facility" in mapping.ambiguous:
_ask_disamb("facility", mapping.ambiguous["facility"], "Prioritization", "We need the facility identifier to aggregate by site.")
elif "facility" in mapping.missing:
groups.append(("Prioritization", "Provide the **facility/site** column to group results (name or ID)."))
if "specialty" in mapping.ambiguous:
_ask_disamb("specialty", mapping.ambiguous["specialty"], "Prioritization", "We need the specialty/service field to rank by specialty.")
elif "specialty" in mapping.missing:
groups.append(("Prioritization", "Provide the **specialty/service** column (e.g., General Surgery, Ortho)."))
if "capacity_beds" in mapping.ambiguous:
_ask_disamb("capacity_beds", mapping.ambiguous["capacity_beds"], "Capacity", "To estimate staffed capacity, confirm the **staffed beds** column.")
elif "capacity_beds" in mapping.missing:
groups.append(("Capacity", "Provide **staffed beds** (or equivalent capacity) column."))
if "clients_per_day" in mapping.missing:
groups.append(("Capacity", "What is the **clients per day** rate per team?"))
if "teams" in mapping.missing:
groups.append(("Capacity", "How many **teams** operate concurrently?"))
if "cost_fixed" in mapping.missing:
groups.append(("Cost", "Provide **fixed/startup cost** (or confirm none)."))
if "cost_variable" in mapping.missing:
groups.append(("Cost", "Provide **variable cost per client/case** (or unit definition)."))
any_wait = ("wait_median" in mapping.resolved) or ("wait_p90" in mapping.resolved) or ("wait_days" in mapping.resolved)
if not any_wait:
groups.append(("Clinical", "Which columns capture **wait times** (median/p90 or days)?"))
groups.append(("Recommendations", "Any operational constraints or equity priorities we should encode (scheduling limits, rural access, partnerships)?"))
groups = groups[:max_groups]
out = ["**Clarification Questions**"]
cur = None
for grp, q in groups:
if grp != cur:
out.append(f"\n**{grp}:**")
cur = grp
out.append(f"- {q}")
return "\n".join(out)