# core/fastpath_high.py from __future__ import annotations from dataclasses import dataclass from typing import Optional, Literal, List, Dict, Any Stage = Literal["HIGH", "VERY_HIGH"] @dataclass class HighPlan: stage: Stage priority: int title: str reply: str speak_ssml: str actions: List[str] recheck_minutes: int meta: Dict[str, Any] def _strip_md(s: str) -> str: s = str(s or "") return s.replace("**", "").replace("__", "").replace("`", "") def _speakable_text(s: str) -> str: import re s = _strip_md(s) s = s.replace("&", " und ").replace("<", "").replace(">", "") s = s.replace("mg/dL", "Milligramm pro Deziliter").replace("MG/DL", "Milligramm pro Deziliter") def repl_num(m: re.Match) -> str: num = m.group(0) if "." in num: a, b = num.split(".", 1) return f"{a} Komma {b}" return num s = re.sub(r"\b\d+(?:\.\d+)?\b", repl_num, s) s = re.sub(r"[ \t]+", " ", s).strip() return s def _ssml(parts: List[str], break_ms: int = 450) -> str: safe_parts = [] for p in parts: p2 = _speakable_text(p) if p2: safe_parts.append(p2) body = f"".join(safe_parts) return f"{body}" def high_fastpath( bg_mgdl: Optional[float], *, session_id: Optional[str] = None, source: str = "glucose", high_low: int = 180, very_high: int = 250, ) -> Optional[HighPlan]: """ Deterministisch nach Routing Matrix: - HIGH: 180–250 - VERY_HIGH: >250 """ if bg_mgdl is None: return None bg = float(bg_mgdl) if bg < high_low: return None if bg <= very_high: # 180–250 actions = [ "Erhöht – nach eurem Plan vorgehen (CamAPS Vorschlag prüfen/bestätigen).", "Pumpe/Katheter prüfen, wenn ungewöhnlich.", "In 60 Minuten erneut kontrollieren (oder nach eurem Plan).", ] reply = ( f"🟡 **ERHÖHT (180–250)**: {int(bg)} mg/dL.\n" "• Nach eurem Plan vorgehen (CamAPS Korrekturvorschlag prüfen/bestätigen).\n" "• Prüfen, ob Katheter/Pumpe/Insulinabgabe passt.\n" "• Spätestens in **60 Minuten** erneut kontrollieren (oder nach eurem Plan)." ) speak = _ssml( [ f"{int(bg)} Milligramm pro Deziliter. Erhöhter Wert.", "Bitte nach eurem festgelegten Plan vorgehen.", "Prüfe Pumpe und Katheter, wenn es ungewöhnlich ist.", "Spätestens in einer Stunde erneut kontrollieren.", ] ) return HighPlan( stage="HIGH", priority=2, title=f"ERHÖHT {int(bg)} mg/dL", reply=reply, speak_ssml=speak, actions=actions, recheck_minutes=60, meta={"bg_mgdl": bg, "source": source, "session_id": session_id, "high_low": high_low, "very_high": very_high}, ) # >250 actions = [ "Sehr hoch – engmaschig kontrollieren.", "Nach eurem Plan handeln (CamAPS/Korrektur/Checks).", "Katheter/Pumpe/Insulinabgabe prüfen.", ] reply = ( f"🔴 **STARK ERHÖHT (>250)**: {int(bg)} mg/dL.\n" "• **Engmaschig kontrollieren** und nach eurem festgelegten Plan handeln.\n" "• Prüfen, ob **Katheter/Pumpe/Insulinabgabe** passt.\n" "• Folge-Check nach eurem Plan (typisch ~60 Min), ggf. früher." ) speak = _ssml( [ f"{int(bg)} Milligramm pro Deziliter. Stark erhöht.", "Bitte engmaschig kontrollieren und nach eurem Plan handeln.", "Prüfe Katheter, Pumpe und Insulinabgabe.", ] ) return HighPlan( stage="VERY_HIGH", priority=3, title=f"STARK ERHÖHT {int(bg)} mg/dL", reply=reply, speak_ssml=speak, actions=actions, recheck_minutes=60, meta={"bg_mgdl": bg, "source": source, "session_id": session_id, "high_low": high_low, "very_high": very_high}, ) def plan_to_response(plan: HighPlan) -> Dict[str, Any]: return { "reply": plan.reply, "priority": plan.priority, "fastpath": "high", "stage": plan.stage, "title": plan.title, "actions": plan.actions, "recheck_minutes": plan.recheck_minutes, "speak": plan.speak_ssml, "session_id": plan.meta.get("session_id"), "sources": [{"type": plan.meta.get("source", "glucose"), "bg_mgdl": plan.meta.get("bg_mgdl")}], "thresholds": {"high_low": plan.meta.get("high_low"), "very_high": plan.meta.get("very_high")}, }