# core/fastpath_high.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional, Literal, List, Dict, Any
Stage = Literal["HIGH", "VERY_HIGH"]
@dataclass
class HighPlan:
stage: Stage
priority: int
title: str
reply: str
speak_ssml: str
actions: List[str]
recheck_minutes: int
meta: Dict[str, Any]
def _strip_md(s: str) -> str:
s = str(s or "")
return s.replace("**", "").replace("__", "").replace("`", "")
def _speakable_text(s: str) -> str:
import re
s = _strip_md(s)
s = s.replace("&", " und ").replace("<", "").replace(">", "")
s = s.replace("mg/dL", "Milligramm pro Deziliter").replace("MG/DL", "Milligramm pro Deziliter")
def repl_num(m: re.Match) -> str:
num = m.group(0)
if "." in num:
a, b = num.split(".", 1)
return f"{a} Komma {b}"
return num
s = re.sub(r"\b\d+(?:\.\d+)?\b", repl_num, s)
s = re.sub(r"[ \t]+", " ", s).strip()
return s
def _ssml(parts: List[str], break_ms: int = 450) -> str:
safe_parts = []
for p in parts:
p2 = _speakable_text(p)
if p2:
safe_parts.append(p2)
body = f"".join(safe_parts)
return f"{body}"
def high_fastpath(
bg_mgdl: Optional[float],
*,
session_id: Optional[str] = None,
source: str = "glucose",
high_low: int = 180,
very_high: int = 250,
) -> Optional[HighPlan]:
"""
Deterministisch nach Routing Matrix:
- HIGH: 180–250
- VERY_HIGH: >250
"""
if bg_mgdl is None:
return None
bg = float(bg_mgdl)
if bg < high_low:
return None
if bg <= very_high:
# 180–250
actions = [
"Erhöht – nach eurem Plan vorgehen (CamAPS Vorschlag prüfen/bestätigen).",
"Pumpe/Katheter prüfen, wenn ungewöhnlich.",
"In 60 Minuten erneut kontrollieren (oder nach eurem Plan).",
]
reply = (
f"🟡 **ERHÖHT (180–250)**: {int(bg)} mg/dL.\n"
"• Nach eurem Plan vorgehen (CamAPS Korrekturvorschlag prüfen/bestätigen).\n"
"• Prüfen, ob Katheter/Pumpe/Insulinabgabe passt.\n"
"• Spätestens in **60 Minuten** erneut kontrollieren (oder nach eurem Plan)."
)
speak = _ssml(
[
f"{int(bg)} Milligramm pro Deziliter. Erhöhter Wert.",
"Bitte nach eurem festgelegten Plan vorgehen.",
"Prüfe Pumpe und Katheter, wenn es ungewöhnlich ist.",
"Spätestens in einer Stunde erneut kontrollieren.",
]
)
return HighPlan(
stage="HIGH",
priority=2,
title=f"ERHÖHT {int(bg)} mg/dL",
reply=reply,
speak_ssml=speak,
actions=actions,
recheck_minutes=60,
meta={"bg_mgdl": bg, "source": source, "session_id": session_id, "high_low": high_low, "very_high": very_high},
)
# >250
actions = [
"Sehr hoch – engmaschig kontrollieren.",
"Nach eurem Plan handeln (CamAPS/Korrektur/Checks).",
"Katheter/Pumpe/Insulinabgabe prüfen.",
]
reply = (
f"🔴 **STARK ERHÖHT (>250)**: {int(bg)} mg/dL.\n"
"• **Engmaschig kontrollieren** und nach eurem festgelegten Plan handeln.\n"
"• Prüfen, ob **Katheter/Pumpe/Insulinabgabe** passt.\n"
"• Folge-Check nach eurem Plan (typisch ~60 Min), ggf. früher."
)
speak = _ssml(
[
f"{int(bg)} Milligramm pro Deziliter. Stark erhöht.",
"Bitte engmaschig kontrollieren und nach eurem Plan handeln.",
"Prüfe Katheter, Pumpe und Insulinabgabe.",
]
)
return HighPlan(
stage="VERY_HIGH",
priority=3,
title=f"STARK ERHÖHT {int(bg)} mg/dL",
reply=reply,
speak_ssml=speak,
actions=actions,
recheck_minutes=60,
meta={"bg_mgdl": bg, "source": source, "session_id": session_id, "high_low": high_low, "very_high": very_high},
)
def plan_to_response(plan: HighPlan) -> Dict[str, Any]:
return {
"reply": plan.reply,
"priority": plan.priority,
"fastpath": "high",
"stage": plan.stage,
"title": plan.title,
"actions": plan.actions,
"recheck_minutes": plan.recheck_minutes,
"speak": plan.speak_ssml,
"session_id": plan.meta.get("session_id"),
"sources": [{"type": plan.meta.get("source", "glucose"), "bg_mgdl": plan.meta.get("bg_mgdl")}],
"thresholds": {"high_low": plan.meta.get("high_low"), "very_high": plan.meta.get("very_high")},
}