Spaces:
Running
Running
| # core/fastpath_high.py | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from typing import Optional, Literal, List, Dict, Any | |
| Stage = Literal["HIGH", "VERY_HIGH"] | |
| class HighPlan: | |
| stage: Stage | |
| priority: int | |
| title: str | |
| reply: str | |
| speak_ssml: str | |
| actions: List[str] | |
| recheck_minutes: int | |
| meta: Dict[str, Any] | |
| def _strip_md(s: str) -> str: | |
| s = str(s or "") | |
| return s.replace("**", "").replace("__", "").replace("`", "") | |
| def _speakable_text(s: str) -> str: | |
| import re | |
| s = _strip_md(s) | |
| s = s.replace("&", " und ").replace("<", "").replace(">", "") | |
| s = s.replace("mg/dL", "Milligramm pro Deziliter").replace("MG/DL", "Milligramm pro Deziliter") | |
| def repl_num(m: re.Match) -> str: | |
| num = m.group(0) | |
| if "." in num: | |
| a, b = num.split(".", 1) | |
| return f"{a} Komma {b}" | |
| return num | |
| s = re.sub(r"\b\d+(?:\.\d+)?\b", repl_num, s) | |
| s = re.sub(r"[ \t]+", " ", s).strip() | |
| return s | |
| def _ssml(parts: List[str], break_ms: int = 450) -> str: | |
| safe_parts = [] | |
| for p in parts: | |
| p2 = _speakable_text(p) | |
| if p2: | |
| safe_parts.append(p2) | |
| body = f"<break time='{break_ms}ms'/>".join(safe_parts) | |
| return f"<speak><prosody rate='medium' pitch='+0%'>{body}</prosody></speak>" | |
| def high_fastpath( | |
| bg_mgdl: Optional[float], | |
| *, | |
| session_id: Optional[str] = None, | |
| source: str = "glucose", | |
| high_low: int = 180, | |
| very_high: int = 250, | |
| ) -> Optional[HighPlan]: | |
| """ | |
| Deterministisch nach Routing Matrix: | |
| - HIGH: 180–250 | |
| - VERY_HIGH: >250 | |
| """ | |
| if bg_mgdl is None: | |
| return None | |
| bg = float(bg_mgdl) | |
| if bg < high_low: | |
| return None | |
| if bg <= very_high: | |
| # 180–250 | |
| actions = [ | |
| "Erhöht – nach eurem Plan vorgehen (CamAPS Vorschlag prüfen/bestätigen).", | |
| "Pumpe/Katheter prüfen, wenn ungewöhnlich.", | |
| "In 60 Minuten erneut kontrollieren (oder nach eurem Plan).", | |
| ] | |
| reply = ( | |
| f"🟡 **ERHÖHT (180–250)**: {int(bg)} mg/dL.\n" | |
| "• Nach eurem Plan vorgehen (CamAPS Korrekturvorschlag prüfen/bestätigen).\n" | |
| "• Prüfen, ob Katheter/Pumpe/Insulinabgabe passt.\n" | |
| "• Spätestens in **60 Minuten** erneut kontrollieren (oder nach eurem Plan)." | |
| ) | |
| speak = _ssml( | |
| [ | |
| f"{int(bg)} Milligramm pro Deziliter. Erhöhter Wert.", | |
| "Bitte nach eurem festgelegten Plan vorgehen.", | |
| "Prüfe Pumpe und Katheter, wenn es ungewöhnlich ist.", | |
| "Spätestens in einer Stunde erneut kontrollieren.", | |
| ] | |
| ) | |
| return HighPlan( | |
| stage="HIGH", | |
| priority=2, | |
| title=f"ERHÖHT {int(bg)} mg/dL", | |
| reply=reply, | |
| speak_ssml=speak, | |
| actions=actions, | |
| recheck_minutes=60, | |
| meta={"bg_mgdl": bg, "source": source, "session_id": session_id, "high_low": high_low, "very_high": very_high}, | |
| ) | |
| # >250 | |
| actions = [ | |
| "Sehr hoch – engmaschig kontrollieren.", | |
| "Nach eurem Plan handeln (CamAPS/Korrektur/Checks).", | |
| "Katheter/Pumpe/Insulinabgabe prüfen.", | |
| ] | |
| reply = ( | |
| f"🔴 **STARK ERHÖHT (>250)**: {int(bg)} mg/dL.\n" | |
| "• **Engmaschig kontrollieren** und nach eurem festgelegten Plan handeln.\n" | |
| "• Prüfen, ob **Katheter/Pumpe/Insulinabgabe** passt.\n" | |
| "• Folge-Check nach eurem Plan (typisch ~60 Min), ggf. früher." | |
| ) | |
| speak = _ssml( | |
| [ | |
| f"{int(bg)} Milligramm pro Deziliter. Stark erhöht.", | |
| "Bitte engmaschig kontrollieren und nach eurem Plan handeln.", | |
| "Prüfe Katheter, Pumpe und Insulinabgabe.", | |
| ] | |
| ) | |
| return HighPlan( | |
| stage="VERY_HIGH", | |
| priority=3, | |
| title=f"STARK ERHÖHT {int(bg)} mg/dL", | |
| reply=reply, | |
| speak_ssml=speak, | |
| actions=actions, | |
| recheck_minutes=60, | |
| meta={"bg_mgdl": bg, "source": source, "session_id": session_id, "high_low": high_low, "very_high": very_high}, | |
| ) | |
| def plan_to_response(plan: HighPlan) -> Dict[str, Any]: | |
| return { | |
| "reply": plan.reply, | |
| "priority": plan.priority, | |
| "fastpath": "high", | |
| "stage": plan.stage, | |
| "title": plan.title, | |
| "actions": plan.actions, | |
| "recheck_minutes": plan.recheck_minutes, | |
| "speak": plan.speak_ssml, | |
| "session_id": plan.meta.get("session_id"), | |
| "sources": [{"type": plan.meta.get("source", "glucose"), "bg_mgdl": plan.meta.get("bg_mgdl")}], | |
| "thresholds": {"high_low": plan.meta.get("high_low"), "very_high": plan.meta.get("very_high")}, | |
| } |