"""Figment Gradio app scaffold.""" from __future__ import annotations import html import json from pathlib import Path from typing import Any from fastapi.responses import HTMLResponse from figment.audio_intake import confirm_audio_draft as _confirm_audio_draft from figment.audio_intake import draft_audio_intake as _draft_audio_intake from figment.config import FigmentConfig, load_config from figment.model_client import ModelClient, ModelClientError, hosted_audio_limits_text, validate_hosted_audio_file from figment.navigator import run_navigation from figment.parakeet_asr import ( ParakeetAsrError, parakeet_audio_limits_text, transcribe_audio_with_parakeet, validate_parakeet_audio_file, ) from figment.retrieval import load_protocol_cards, query_from_intake, retrieval_source_summary, search_protocol_cards from figment.rules import evaluate_rules, run_red_flag_checks from figment.sbar import render_sbar from figment.trace import normalize_trace_payload, runtime_route_label, stable_hash, write_trace from figment.ui_theme import FIGMENT_CSS from figment.validators import urgency_floor_from_rules, validate_audio_ready try: import gradio as gr from gradio.data_classes import FileData except (ImportError, OSError): # pragma: no cover - lets unit tests import without gradio installed gr = None FileData = Any # type: ignore[misc, assignment] TAB_TITLES = [ "Intake", "Risk Check", "Protocol Guidance", "Navigator Output + Handoff", "Trace", ] PROJECT_ROOT = Path(__file__).resolve().parent DEMO_AUDIO_FILENAMES = ( "case_1_dictated_intake.wav", "case_2_dictated_intake.wav", "case_3_dictated_intake.wav", ) INTAKE_FIELD_KEYS = ( "setting", "patient_age", "pregnancy_status", "chief_concern", "symptoms", "vitals", "allergies", "medications", "available_supplies", "responder_note", ) DEMO_CASES: dict[str, dict[str, str]] = { "Disaster clinic: pediatric dehydration": { "setting": "shelter clinic", "patient_age": "7", "pregnancy_status": "not_applicable", "chief_concern": "vomiting and dehydration concern", "symptoms": "lethargic, very dry mouth, no urine since morning", "vitals": "temperature and blood pressure missing", "allergies": "unknown", "medications": "none reported", "available_supplies": "oral rehydration solution, radio, transport team", "responder_note": "Child after flood cleanup cannot keep fluids down.", }, "Disaster injury: wound infection": { "setting": "mobile clinic", "patient_age": "43", "pregnancy_status": "not_applicable", "chief_concern": "wound getting worse", "symptoms": "spreading redness, swelling, foul drainage", "vitals": "temperature unknown", "allergies": "unknown", "medications": "unknown", "available_supplies": "clean dressings, radio", "responder_note": "Cut from debris three days ago.", }, "Rural clinic: pregnancy danger sign": { "setting": "rural clinic", "patient_age": "29", "pregnancy_status": "pregnant", "chief_concern": "bleeding and severe headache", "symptoms": "vaginal bleeding, severe headache, dizziness", "vitals": "blood pressure not available", "allergies": "unknown", "medications": "prenatal vitamin reported", "available_supplies": "phone, transport contact", "responder_note": "Patient is pregnant and reports bleeding.", }, } def collect_intake( setting: str, patient_age: str, pregnancy_status: str, chief_concern: str, symptoms: str, vitals: str, allergies: str, medications: str, available_supplies: str, responder_note: str, ) -> dict[str, Any]: return { "setting": setting, "patient_age": patient_age, "pregnancy_status": pregnancy_status, "chief_concern": chief_concern, "symptoms": symptoms, "vitals": vitals, "allergies": allergies, "medications": medications, "available_supplies": available_supplies, "responder_note": responder_note, "confirmed": False, } def confirm_intake(intake: dict[str, Any], audio_draft: dict[str, Any] | None = None) -> dict[str, Any]: audio_validation = validate_audio_ready(audio_draft) if not audio_validation.passed: raise ValueError("; ".join(audio_validation.failures)) confirmed = dict(intake) confirmed["confirmed"] = True return confirmed def evaluate_red_flags(intake: dict[str, Any]) -> list[dict[str, Any]]: if not intake.get("confirmed"): return [] return [rule.to_dict() for rule in run_red_flag_checks(intake)] def draft_audio_intake( transcript: str = "", config: FigmentConfig | None = None, audio_file: str | None = None, provider_payload: dict[str, Any] | None = None, ) -> dict[str, Any]: config = (config or load_config()).validated() provider_error = None if audio_file and not transcript.strip() and provider_payload is None and _should_use_hosted_omni_audio(config): try: validate_hosted_audio_file(audio_file) except ModelClientError as exc: provider_error = f"Hosted Omni audio draft skipped; typed transcript or canned fallback required. {exc}" else: try: provider_payload = ModelClient(config).generate_audio_draft(audio_file) except ModelClientError as exc: provider_error = f"Hosted Omni audio draft failed; typed transcript or canned fallback required. {exc}" elif audio_file and not transcript.strip() and provider_payload is None and _should_use_parakeet_audio(config): try: validate_parakeet_audio_file(audio_file) except ParakeetAsrError as exc: provider_error = f"Parakeet ASR draft skipped; typed transcript or canned fallback required. {exc}" else: try: provider_payload = transcribe_audio_with_parakeet(audio_file, config=config) except ParakeetAsrError as exc: provider_error = f"Parakeet ASR draft failed; typed transcript or canned fallback required. {exc}" draft = _draft_audio_intake( transcript=transcript, config=config, provider_payload=provider_payload, audio_file_received=bool(audio_file), ) if audio_file: draft["audio_file_received"] = True draft["audio_filename"] = Path(audio_file).name draft["raw_audio_stored"] = False retention_note = ( "Original clip bytes are not written to Figment traces; Gradio may keep upload/session files " "while the app is running, and committed demo clips stay on disk." ) if _should_use_hosted_omni_audio(config): hosted_disclosure = _hosted_audio_disclosure_text() draft["hosted_audio_disclosure"] = hosted_disclosure retention_note = f"{retention_note} {hosted_disclosure}" elif _should_use_parakeet_audio(config): retention_note = ( f"{retention_note} Parakeet ASR runs on the configured local/ZeroGPU runtime; " f"limit: {parakeet_audio_limits_text()}." ) draft["audio_retention_note"] = retention_note if provider_error and draft.get("audio_intake_path") == "audio_received_needs_transcript_or_model": draft["processing_status"] = provider_error return draft def confirm_audio_draft( intake: dict[str, Any], audio_draft: dict[str, Any], *, accept: bool = True, edits: dict[str, str] | None = None, reject_fields: set[str] | None = None, ) -> tuple[dict[str, Any], dict[str, Any]]: return _confirm_audio_draft(intake, audio_draft, accept=accept, edits=edits, reject_fields=reject_fields) def run_case(intake: dict[str, Any], config: FigmentConfig | None = None, audio_draft: dict[str, Any] | None = None) -> dict[str, Any]: confirmed = confirm_intake(intake, audio_draft=audio_draft) rules = evaluate_red_flags(confirmed) runtime_config = (config or load_config()).validated() retrieved_cards = search_protocol_cards(query_from_intake(confirmed)) output, trace = run_navigation( confirmed, rules, audio_draft=audio_draft, config=runtime_config, retrieved_cards=retrieved_cards, ) evaluation = evaluate_rules(confirmed) trace_payload = normalize_trace_payload(trace.to_dict()) trace_payload["retrieval"] = retrieval_source_summary(retrieved_cards) return { "intake": confirmed, "risk": evaluation, "retrieved_cards": retrieved_cards, "navigator_output": output, "sbar": render_sbar(output, trace.validator_result), "trace": trace_payload, } def trace_download_path(trace: dict[str, Any], config: FigmentConfig | None = None) -> str: config = (config or load_config()).validated() trace_id = stable_hash(trace or {}) path = config.trace_dir / f"figment-trace-{trace_id}.json" return str(write_trace(trace or {}, path)) class _FallbackDemo: def queue(self) -> "_FallbackDemo": return self def launch(self, *args: Any, **kwargs: Any) -> "_FallbackDemo": return self def build_app(config: FigmentConfig | None = None): config = (config or load_config()).validated() if gr is None: return _FallbackDemo() if not hasattr(gr, "Server"): raise RuntimeError("Figment Server mode requires gradio>=6.0 so gradio.Server is available.") server = gr.Server( title="Figment", summary="Protocol navigator for field clinics and disaster response.", version="1.0.0", ) @server.api(name="runtime", concurrency_limit=None) def runtime_api() -> dict[str, Any]: return _runtime_payload(config) @server.api(name="load_demo_case", concurrency_limit=None) def load_demo_case_api(name: str) -> dict[str, Any]: fields = _fields_dict_from_values(_load_demo_case(name)) return { "fields": fields, "intake": collect_intake(*_field_values(fields)), "risk": _empty_risk_result(), "risk_html": _risk_summary_html(_empty_risk_result()), "guidance_html": _protocol_results_html([]), "navigator_html": _navigator_summary_html({}), "trace_audit_html": _trace_audit_html({}), } @server.api(name="draft_audio", concurrency_limit=1) def draft_audio_api(audio_file: FileData | None = None, transcript: str = "") -> dict[str, Any]: path = _file_data_path(audio_file) return draft_audio_intake(transcript=transcript or "", config=config, audio_file=path) @server.api(name="apply_audio_draft", concurrency_limit=None) def apply_audio_draft_api(fields: dict[str, Any], audio_draft: dict[str, Any] | None = None) -> dict[str, Any]: values = _field_values(fields) updated = _apply_audio_draft_ui(*values, audio_draft) updated_fields = _fields_dict_from_values(updated[: len(INTAKE_FIELD_KEYS)]) return { "fields": updated_fields, "audio_draft": updated[-1], "intake": collect_intake(*_field_values(updated_fields)), "risk": _empty_risk_result(), "risk_html": _risk_summary_html(_empty_risk_result()), "guidance_html": _protocol_results_html([]), "navigator_html": _navigator_summary_html({}), "trace_audit_html": _trace_audit_html({}), } @server.api(name="confirm_intake", concurrency_limit=None) def confirm_intake_api(fields: dict[str, Any], audio_draft: dict[str, Any] | None = None) -> dict[str, Any]: confirmed, intake_state, updated_audio = _confirm_ui_intake(*_field_values(fields), audio_draft) return {"intake": confirmed, "intake_state": intake_state, "audio_draft": updated_audio} @server.api(name="risk_check", concurrency_limit=None) def risk_check_api(intake: dict[str, Any]) -> dict[str, Any]: risk, summary = _risk_ui_with_summary(intake) return {"risk": risk, "risk_html": summary} @server.api(name="retrieve_protocol_cards", concurrency_limit=None) def retrieve_protocol_cards_api(intake: dict[str, Any]) -> dict[str, Any]: cards, evidence, summary = _retrieve_with_evidence_and_summary_ui(intake) return {"cards": cards, "evidence": evidence, "guidance_html": summary} @server.api(name="run_navigator", concurrency_limit=1) def run_navigator_api(intake: dict[str, Any], audio_draft: dict[str, Any] | None = None) -> dict[str, Any]: output, sbar, trace, trace_state, summary, audit = _navigate_ui_with_summary(intake, audio_draft, config=config) return { "navigator_output": output, "sbar": sbar, "trace": trace, "trace_state": trace_state, "navigator_html": summary, "trace_audit_html": audit, } @server.get("/", response_class=HTMLResponse) async def homepage() -> str: return _server_homepage_html(config) @server.get("/health") async def health() -> dict[str, str]: return {"status": "ok", "mode": "gradio.Server"} return server def _h(value: Any) -> str: return html.escape("" if value is None else str(value), quote=True) def _app_header_html() -> str: return """
Offline protocol support for field clinics and disaster response
! For trained responders only. Not a substitute for clinical judgment.
""" def _statusline_html(config: FigmentConfig) -> str: audio_chip = "green" if config.enable_audio_intake else "amber" backend_chip = "blue" if config.model_backend in {"hosted_omni", "hf_zerogpu"} else "amber" return f"""
Runtime {_h(_model_mode_label(config))} MODEL_STACK={_h(config.model_stack)} MODEL_BACKEND={_h(config.model_backend)} ENABLE_AUDIO_INTAKE={_h('ON' if config.enable_audio_intake else 'OFF')} Privacy: no raw audio retained in traces
""" def _footer_rail_html(config: FigmentConfig) -> str: return f""" """ def _runtime_payload(config: FigmentConfig) -> dict[str, Any]: return { "model_mode_label": _model_mode_label(config), "model_stack": config.model_stack, "model_backend": config.model_backend, "audio_backend": config.audio_backend, "enable_audio_intake": config.enable_audio_intake, "audio_section_title": _audio_section_title(config), "audio_section_subtitle": _audio_section_subtitle(config), "audio_clip_label": _audio_clip_label(config), "transcript_label": _transcript_label(config), "audio_chips_html": _audio_runtime_chips_html(config), "demo_audio_examples": _demo_audio_examples(), "status_text": _status_text(config), } def _fields_dict_from_values(values: list[Any] | tuple[Any, ...]) -> dict[str, str]: return {key: str(value or "") for key, value in zip(INTAKE_FIELD_KEYS, values, strict=True)} def _field_values(fields: dict[str, Any] | None) -> list[str]: fields = fields or {} return [str(fields.get(key, "") or "") for key in INTAKE_FIELD_KEYS] def _file_data_path(file_data: Any) -> str | None: if not file_data: return None if isinstance(file_data, dict): path = file_data.get("path") return str(path) if path else None path = getattr(file_data, "path", None) return str(path) if path else None def _json_for_script(value: Any) -> str: return json.dumps(value, ensure_ascii=True).replace(" str: initial_data = { "tabTitles": TAB_TITLES, "fieldKeys": INTAKE_FIELD_KEYS, "runtime": _runtime_payload(config), "emptyRisk": _empty_risk_result(), "riskHtml": _risk_summary_html(_empty_risk_result()), "protocolLibraryHtml": _protocol_library_html(), "guidanceHtml": _protocol_results_html([]), "navigatorHtml": _navigator_summary_html({}), "traceAuditHtml": _trace_audit_html({}), } html_doc = """ Figment
Intake

Case intake

Responder-entered facts for the protocol run.

Audio draft

""" return ( html_doc.replace("__FIGMENT_CSS__", FIGMENT_CSS) .replace("__FIGMENT_DATA__", _json_for_script(initial_data)) .replace("__RED_FLAG_CHECKLIST__", _red_flag_checklist_html()) ) def _model_mode_label(config: FigmentConfig) -> str: if config.model_backend == "hosted_omni": return "Configured backend: hosted_omni" if config.model_backend == "hf_zerogpu": return "Configured backend: hf_zerogpu" if config.model_backend == "llama_cpp": return "Configured backend: llama_cpp" return "Configured backend: canned" def _audio_section_title(config: FigmentConfig) -> str: if not config.enable_audio_intake or config.audio_backend == "none": return "Audio draft intake disabled" if config.audio_backend == "omni_native" and config.model_backend == "hosted_omni": return "Hosted Omni audio draft" if config.audio_backend == "parakeet_nemo": return "Parakeet ASR draft" if config.audio_backend == "canned": return "Canned audio demo draft" return "Audio draft intake" def _audio_section_subtitle(config: FigmentConfig) -> str: if not config.enable_audio_intake or config.audio_backend == "none": return "Typed confirmed intake remains the only active source for rules and navigation." if config.audio_backend == "omni_native" and config.model_backend == "hosted_omni": return ( "Record or upload responder dictation for a provisional Omni draft. Audio is sent to the configured " f"hosted endpoint; use only synthetic or de-identified clips. Limit: {hosted_audio_limits_text()}." ) if config.audio_backend == "parakeet_nemo": return "Use configured Parakeet ASR for provisional field suggestions, then confirm fields before rules run." if config.audio_backend == "canned": return "Use canned clips only as repeatable demo input, then confirm fields before rules run." return "Draft suggestions are provisional until the confirmed intake form is reviewed." def _audio_clip_label(config: FigmentConfig) -> str: if not config.enable_audio_intake or config.audio_backend == "none": return "Audio intake disabled" if config.audio_backend == "parakeet_nemo": return "Parakeet audio intake" if config.audio_backend == "canned": return "Demo audio intake" return "Hosted Omni audio intake" def _transcript_label(config: FigmentConfig) -> str: if not config.enable_audio_intake or config.audio_backend == "none": return "Typed transcript heuristic disabled" return "Typed transcript heuristic" def _audio_runtime_chips_html(config: FigmentConfig) -> str: if not config.enable_audio_intake or config.audio_backend == "none": return 'Audio intake disabled' chips = ['Confirm before rules run'] if config.audio_backend == "omni_native" and config.model_backend == "hosted_omni": chips.insert(0, 'Hosted Omni audio') chips.append('Hosted endpoint: synthetic/de-identified only') elif config.audio_backend == "parakeet_nemo": chips.insert(0, 'Parakeet ASR') elif config.audio_backend == "canned": chips.insert(0, 'Canned demo audio') else: chips.insert(0, 'Typed transcript heuristic') return " ".join(chips) def _section_header_html(title: str, subtitle: str = "") -> str: subtitle_html = f'
{_h(subtitle)}
' if subtitle else "" return f'
{_h(title)}
{subtitle_html}' def _red_flag_checklist_html() -> str: categories = { "Airway / Breathing": [ "Unable to speak full sentences", "O2 sat below local threshold", "Stridor or severe wheeze", "RR very high or very low", ], "Circulation": [ "SBP below local threshold", "Cap refill prolonged", "Active bleeding not controlled", "Pulse thready or collapsing", ], "Neurologic": [ "Unresponsive or difficult to arouse", "New confusion or disorientation", "Seizure activity", "Severe headache with danger signs", ], "Pregnancy": [ "Vaginal bleeding", "Severe headache or visual changes", "Convulsions", "Severe abdominal pain", ], "Pediatric": [ "Lethargic or not waking", "Poor feeding or refuses fluids", "Cap refill prolonged", "No urine reported", ], "Infection / Wound": [ "Spreading redness", "Foul drainage", "Suspected sepsis cues", "Rapidly worsening pain", ], "Chest Pain / Stroke": [ "Crushing or pressure pain", "Radiates to arm, jaw, or back", "Face droop or arm weakness", "Speech difficulty", ], } panels = [] for title, items in categories.items(): panels.append( '
' f'
{_h(title)}
' f'' "
" ) return f'
{"".join(panels)}
' def _risk_ui_with_summary(intake: dict[str, Any]) -> tuple[dict[str, Any], str]: result = _risk_ui(intake) return result, _risk_summary_html(result) def _risk_summary_html(result: dict[str, Any]) -> str: urgency = str(result.get("protocol_urgency") or "routine").lower() if urgency not in {"routine", "monitor", "urgent", "emergency"}: urgency = "routine" rules = result.get("red_flags") if isinstance(result.get("red_flags"), list) else [] rows = [] for rule in rules: if not isinstance(rule, dict): continue rows.append( "" f"{_h(rule.get('rule_id'))}" f"{_h(rule.get('evidence'))}" f"{_h(rule.get('card_id'))}" f"{_urgency_chip_html(str(rule.get('urgency') or urgency))}" "" ) if not rows: rows.append('No confirmed intake red flags have fired yet.') source_cards = sorted({str(rule.get("card_id")) for rule in rules if isinstance(rule, dict) and rule.get("card_id")}) if not source_cards: source_cards = ["Run rules after confirming intake"] return f"""
PROTOCOL_URGENCY
{_h(urgency.upper())}
Deterministic safety floor locked
Rules enforce this minimum. AI cannot lower this floor.
Fired Rules (deterministic)
{''.join(rows)}
Rule IDEvidenceProtocol CardUrgency Floor

Validation Messages

Source Protocol Cards

{''.join(f'{_h(card)} ' for card in source_cards)}
""" def _retrieve_with_evidence_and_summary_ui(intake: dict[str, Any]) -> tuple[list[dict[str, Any]], str, str]: cards, evidence = _retrieve_with_evidence_ui(intake) return cards, evidence, _protocol_results_html(cards) def _protocol_library_html() -> str: rows = [] for card in load_protocol_cards()[:10]: card_id = str(card.get("card_id", "")) rows.append( "" f"{_h(card_id)}" f"{_h(_protocol_condition(card))}" f"{_protocol_card_badge_html(card)}" "v1" "" ) return f"""
Search and filters are represented by the confirmed intake query in this prototype.
{''.join(rows)}
Card IDConditionUrgencyVersion
""" def _protocol_results_html(cards: list[dict[str, Any]]) -> str: if not cards: return """
Selected Protocol Card
Confirm intake, then retrieve protocol cards to populate this browser.
""" first = cards[0] card = first.get("card") if isinstance(first.get("card"), dict) else first title = str(card.get("title") or first.get("title") or "Selected protocol card") card_id = str(card.get("card_id") or first.get("card_id") or "") rationale_rows = [] for item in cards: item_card = item.get("card") if isinstance(item.get("card"), dict) else item rationale_rows.append( "" f"{_h(item.get('card_id') or item_card.get('card_id'))}" f"{_h(_protocol_condition(item_card))}" f"{_h(item.get('source') or 'unknown')}" f"{_h(_relevance_text(item))}" "" ) return f"""
Selected Protocol Card Version: v1

{_h(card_id)}

{_h(title)}
{_protocol_detail_card_html("When Relevant", card.get("applies_to"))} {_protocol_detail_card_html("Red Flags (Escalate)", card.get("red_flags"))} {_protocol_detail_card_html("Collect Next", card.get("required_observations"))} {_protocol_detail_card_html("Responder Checklist", card.get("local_actions"))} {_protocol_detail_card_html("Do Not Do", card.get("forbidden_actions"))} {_protocol_detail_card_html("Source Note", [card.get("source_note"), card.get("safety_boundary")])}
Why these cards were retrieved
{''.join(rationale_rows)}
Card IDMatched ContextSourceRelevance Reason
""" def _protocol_detail_card_html(title: str, values: Any) -> str: items = _as_list(values) if not items: items = ["No value available yet."] return ( '
' f'

{_h(title)}

' f'' "
" ) def _protocol_condition(card: dict[str, Any]) -> str: applies_to = _as_list(card.get("applies_to")) if applies_to: return str(applies_to[0]).replace("_", " ").title() card_id = str(card.get("card_id", "")) return card_id.split("-")[0].title() if card_id else "General" def _protocol_card_badge_html(card: dict[str, Any]) -> str: card_text = " ".join(_as_list(card.get("red_flags")) + _as_list(card.get("escalation_criteria"))).lower() if "emergency" in card_text: return 'Emergency' if "urgent" in card_text or card.get("red_flags"): return 'Urgent' return 'All' def _navigate_ui_with_summary( intake: dict[str, Any], audio_draft: dict[str, Any] | None, config: FigmentConfig | None = None, ) -> tuple[dict[str, Any], str, dict[str, Any], dict[str, Any], str, str]: output, sbar, trace, trace_state = _navigate_ui(intake, audio_draft, config=config) return output, sbar, trace, trace_state, _navigator_summary_html(output, trace), _trace_audit_html(trace) def _navigator_summary_html(output: dict[str, Any], trace: dict[str, Any] | None = None) -> str: if not output: return """
Protocol Urgency
Run the navigator after confirming intake and red-flag checks.
""" urgency = str(output.get("protocol_urgency") or "routine").lower() if urgency not in {"routine", "monitor", "urgent", "emergency"}: urgency = "routine" handoff = output.get("handoff_note_sbar") if isinstance(output.get("handoff_note_sbar"), dict) else {} runtime_card = _runtime_contribution_card_html(trace) evidence_card = _harness_evidence_card_html(output, trace) return f"""
Protocol Urgency
{_h(urgency.upper())}
Deterministic safety floor locked
Minimum rules enforced. AI cannot lower this floor.
{runtime_card}
{evidence_card}
{_navigator_list_card_html("Missing Observations", output.get("missing_info_to_collect"))} {_navigator_list_card_html("Responder Checklist", output.get("responder_checklist"), checked=True)} {_navigator_list_card_html("Do-Not-Do", output.get("do_not_do"))} {_navigator_list_card_html("Source Cards", output.get("source_cards"))}

Responder Script (plain language)

{_h(output.get("responder_plain_language_script") or "No script generated yet.")}

SBAR Handoff
SituationBackgroundAssessment Observations OnlyHandoff Request
{_h(handoff.get("situation"))} {_h(handoff.get("background"))} {_h(handoff.get("assessment_observations_only"))} {_h(handoff.get("handoff_request"))}
""" def _runtime_contribution_card_html(trace: dict[str, Any] | None) -> str: if not trace: return "" payload = normalize_trace_payload(trace) route = payload.get("model_route") if isinstance(payload.get("model_route"), dict) else {} retrieval = payload.get("retrieval") if isinstance(payload.get("retrieval"), dict) else {} provenance_summary = payload.get("field_provenance_summary") if isinstance(payload.get("field_provenance_summary"), dict) else {} final_route = str(route.get("final_route") or "unknown") fallback_reason = route.get("fallback_reason") or "none" retrieval_source = retrieval.get("primary_source") or "not traced" return f"""

Runtime contribution

{_h(runtime_route_label(route))} Configured backend: {_h(route.get('raw_route'))} validation={_h(route.get('validation_status'))} retrieval={_h(retrieval_source)} {_field_provenance_counts_html(provenance_summary)} {_repair_metrics_inline_html(route)}
fallback_reason={_h(fallback_reason)}
""" def _harness_evidence_card_html(output: dict[str, Any] | None, trace: dict[str, Any] | None = None) -> str: evidence = _harness_evidence_from(output, trace) if not evidence: return "" retrieved_count = len(_as_list(evidence.get("retrieved_card_ids"))) rule_count = len(_as_list(evidence.get("deterministic_rule_ids"))) source_count = len(_as_list(evidence.get("source_card_ids"))) final_route = str(evidence.get("final_route") or "unknown") return f"""

Harness Evidence

Intake confirmed: {_h(evidence.get('confirmed_intake'))} Validation: {_h(evidence.get('validator_status'))} Retrieved cards: {_h(retrieved_count)} Rule results: {_h(rule_count)} Source cards: {_h(source_count)} Urgency floor: {_h(evidence.get('urgency_floor'))} Route: {_h(runtime_route_label(final_route))} Audio correction: {_h(evidence.get('audio_correction_status'))}
""" def _harness_evidence_from(output: dict[str, Any] | None, trace: dict[str, Any] | None = None) -> dict[str, Any]: if isinstance(output, dict) and isinstance(output.get("harness_evidence"), dict): return output["harness_evidence"] if isinstance(trace, dict): normalized = normalize_trace_payload(trace) if isinstance(normalized.get("harness_evidence"), dict): return normalized["harness_evidence"] navigator_output = normalized.get("navigator_output") if isinstance(navigator_output, dict) and isinstance(navigator_output.get("harness_evidence"), dict): return navigator_output["harness_evidence"] return {} def _navigator_list_card_html(title: str, values: Any, *, checked: bool = False) -> str: items = _as_list(values) or ["No items generated yet."] cls = "figment-checklist checked" if checked else "figment-checklist" return ( '
' f'

{_h(title)}

' f'' "
" ) def _route_chip_class(final_route: str) -> str: return { "live_model_generated": "green", "model_repaired": "blue", "model_with_deterministic_patches": "blue", "validation_fallback": "amber", "canned_backend": "amber", }.get(final_route, "amber") def _field_provenance_counts_html(summary: dict[str, Any]) -> str: counts = summary.get("counts") if isinstance(summary.get("counts"), dict) else {} if not counts: return 'Field provenance: not traced' chips = [ f'Field provenance: {_h(name)}={_h(count)}' for name, count in sorted(counts.items()) ] return "".join(chips) def _repair_metrics_inline_html(route: dict[str, Any]) -> str: attempts = route.get("repair_attempt_count", 0) if not attempts: return 'Repair calls: 0' cap = route.get("repair_attempt_cap", 0) latency = route.get("repair_latency_ms", 0.0) capped = " capped" if route.get("repair_capped") else "" return ( f'Repair calls: {_h(attempts)} / {_h(cap)}{capped}' f'Repair latency: {_h(latency)} ms' ) def _trace_audit_html(trace: dict[str, Any]) -> str: if not trace: return """
Run the navigator to populate timeline, validation, model route, and trace metadata.
""" trace = normalize_trace_payload(trace) events = _as_list(trace.get("events")) if not events: events = ["input captured", "rules evaluated", "cards retrieved", "navigator output generated", "validation complete"] rows = [] for index, event in enumerate(events, start=1): rows.append( "" f"{index}" f"{_h(event)}" 'OK' f"{_h(_trace_event_detail(event, trace))}" "" ) validator = trace.get("validator_result") if isinstance(trace.get("validator_result"), dict) else {} route = trace.get("model_route") if isinstance(trace.get("model_route"), dict) else {} retrieval = trace.get("retrieval") if isinstance(trace.get("retrieval"), dict) else {} provenance_summary = trace.get("field_provenance_summary") if isinstance(trace.get("field_provenance_summary"), dict) else {} evidence = _harness_evidence_from(None, trace) final_route = str(route.get("final_route") or "unknown") return f""" {''.join(rows)}
StepComponentStatusDetails

Audit Summary

Raw audio retained: false Schema valid: {_h(validator.get('passed'))} {_h(runtime_route_label(route))} Retrieval: {_h(retrieval.get('primary_source') or 'not traced')} Harness evidence: {_h('visible' if evidence else 'not traced')}
{_harness_evidence_card_html(None, trace)}

Model & Performance

Raw route{_h(route.get('raw_route'))}
Final route{_h(route.get('final_route'))}
Model ID{_h(route.get('model_id'))}
Fallback tier{_h(route.get('fallback_tier'))}
Fallback reason{_h(route.get('fallback_reason') or 'none')}
Validation status{_h(route.get('validation_status'))}
Repair calls{_h(route.get('repair_attempt_count', 0))} / {_h(route.get('repair_attempt_cap', 0))}
Repair latency ms{_h(route.get('repair_latency_ms', 0.0))}

Field provenance

{_field_provenance_counts_html(provenance_summary)}
Total fields{_h(provenance_summary.get('total_fields', 0))}
Deterministic patches{_h(provenance_summary.get('deterministic_patch_count', 0))}
Model retained{_h(provenance_summary.get('model_retained_count', 0))}
""" def _trace_event_detail(event: str, trace: dict[str, Any]) -> str: if "rules" in event: return f"{len(trace.get('red_flags') or [])} deterministic red-flag result(s)." if "cards" in event: retrieval = trace.get("retrieval") if isinstance(trace.get("retrieval"), dict) else {} source = retrieval.get("primary_source") suffix = f" via {source}" if source else "" return f"{len(trace.get('retrieved_card_ids') or [])} protocol card(s) retrieved{suffix}." if "validation" in event: validator = trace.get("validator_result") if isinstance(trace.get("validator_result"), dict) else {} return "Output conforms to schema." if validator.get("passed") else "Validation failures present." if "model" in event or "navigator" in event: route = trace.get("model_route") if isinstance(trace.get("model_route"), dict) else {} return runtime_route_label(route) return "Trace step recorded." def _urgency_chip_html(urgency: str) -> str: value = urgency.lower() cls = { "routine": "blue", "monitor": "green", "urgent": "amber", "emergency": "red", }.get(value, "blue") return f'{_h(value.upper())}' def _as_list(value: Any) -> list[str]: if value is None: return [] if isinstance(value, list): return [str(item) for item in value if item not in (None, "")] if isinstance(value, tuple | set): return [str(item) for item in value if item not in (None, "")] if isinstance(value, str): return [value] if value else [] return [str(value)] def _status_text(config: FigmentConfig) -> str: audio = "enabled" if config.enable_audio_intake else "disabled" return f"`MODEL_STACK={config.model_stack}` | `MODEL_BACKEND={config.model_backend}` | audio {audio}" def _demo_audio_examples() -> list[list[str]]: examples = [] for filename in DEMO_AUDIO_FILENAMES: path = PROJECT_ROOT / "data" / "demo_audio" / filename if path.exists(): examples.append([str(path), ""]) return examples def _load_demo_case(name: str) -> list[str]: case = DEMO_CASES.get(name or "", next(iter(DEMO_CASES.values()))) return [ case["setting"], case["patient_age"], case["pregnancy_status"], case["chief_concern"], case["symptoms"], case["vitals"], case["allergies"], case["medications"], case["available_supplies"], case["responder_note"], ] def _load_demo_case_and_reset(name: str) -> list[Any]: return [ *_load_demo_case(name), None, "", None, None, _empty_risk_result(), _risk_summary_html(_empty_risk_result()), [], "", _protocol_results_html([]), {}, "", _navigator_summary_html({}), {}, None, _trace_audit_html({}), {}, None, {}, ] def _empty_risk_result() -> dict[str, Any]: return {"red_flags": [], "protocol_urgency": "routine"} def _clear_source_outputs() -> list[Any]: return [ None, _empty_risk_result(), _risk_summary_html(_empty_risk_result()), [], "", _protocol_results_html([]), {}, "", _navigator_summary_html({}), {}, None, _trace_audit_html({}), {}, {}, ] def _clear_audio_outputs() -> list[Any]: return [ None, None, _empty_risk_result(), _risk_summary_html(_empty_risk_result()), [], "", _protocol_results_html([]), {}, "", _navigator_summary_html({}), {}, None, _trace_audit_html({}), {}, None, {}, ] def _draft_audio_ui(audio_file: str | None, transcript: str, config: FigmentConfig | None = None) -> dict[str, Any]: return draft_audio_intake(transcript=transcript, config=config, audio_file=audio_file) def _should_use_hosted_omni_audio(config: FigmentConfig) -> bool: return ( config.enable_audio_intake and config.audio_backend == "omni_native" and config.model_backend == "hosted_omni" ) def _should_use_parakeet_audio(config: FigmentConfig) -> bool: return ( config.enable_audio_intake and config.audio_backend == "parakeet_nemo" and config.allow_local_asr and config.model_stack == "local_4b_parakeet" ) def _hosted_audio_disclosure_text() -> str: return ( "Hosted audio is sent to the configured hosted endpoint for drafting; use only synthetic or " f"de-identified audio. Hosted upload cap: {hosted_audio_limits_text()}." ) def _apply_audio_draft_ui(*values: Any) -> list[Any]: *field_values, audio_draft = values if not audio_draft: return [*field_values, None, None] intake = collect_intake(*field_values) updated_audio_draft = dict(audio_draft) suggestions = [] for suggestion in audio_draft.get("suggested_fields", []): item = dict(suggestion) field = str(suggestion.get("field", "")) value = str(suggestion.get("draft_value", "")).strip() if field in intake and value and not intake.get(field): intake[field] = value item["status"] = "applied_unreviewed" item["needs_confirmation"] = True suggestions.append(item) updated_audio_draft["suggested_fields"] = suggestions if suggestions: updated_audio_draft["confirmed_intake_required"] = True updated_audio_draft["confirmation_status"] = "unconfirmed" fields = [ intake["setting"], intake["patient_age"], intake["pregnancy_status"], intake["chief_concern"], intake["symptoms"], intake["vitals"], intake["allergies"], intake["medications"], intake["available_supplies"], intake["responder_note"], ] return [*fields, updated_audio_draft, updated_audio_draft] def _confirm_ui_intake(*values: Any) -> tuple[dict[str, Any], dict[str, Any], dict[str, Any] | None]: *field_values, audio_draft = values intake = collect_intake(*field_values) if audio_draft and audio_draft.get("confirmation_status") != "confirmed": edits = { str(suggestion.get("field")): str(intake.get(str(suggestion.get("field")), "")) for suggestion in audio_draft.get("suggested_fields", []) if suggestion.get("field") and intake.get(str(suggestion.get("field"))) } intake, audio_draft = confirm_audio_draft(intake, audio_draft, accept=False, edits=edits) confirmed = confirm_intake(intake, audio_draft=audio_draft) return confirmed, confirmed, audio_draft def _risk_ui(intake: dict[str, Any]) -> dict[str, Any]: if not intake: return _empty_risk_result() rules = evaluate_red_flags(intake) return {"red_flags": rules, "protocol_urgency": urgency_floor_from_rules(rules)} def _retrieve_ui(intake: dict[str, Any]) -> list[dict[str, Any]]: if not intake: return [] return search_protocol_cards(query_from_intake(intake)) def _retrieve_with_evidence_ui(intake: dict[str, Any]) -> tuple[list[dict[str, Any]], str]: cards = _retrieve_ui(intake) return cards, protocol_evidence_panel(cards) def protocol_evidence_panel(retrieved_cards: list[dict[str, Any]]) -> str: if not retrieved_cards: return ( "Prototype evidence/source material for trained-responder review only; " "no protocol cards retrieved. Use local protocol, supervisor, clinician, " "or emergency pathway rather than improvising." ) lines = [ "Prototype evidence/source material for trained-responder review only; not medical advice.", "", "| Card ID | Title | Source | Cue / boundary | Relevance |", "| --- | --- | --- | --- | --- |", ] for item in retrieved_cards: card = item.get("card") if isinstance(item.get("card"), dict) else item card_id = _compact_cell(str(item.get("card_id") or card.get("card_id") or "unknown")) title = _compact_cell(str(item.get("title") or card.get("title") or "Untitled card")) source = _compact_cell(str(item.get("source") or "unknown")) cue = _compact_cell(_evidence_cue(card)) relevance = _compact_cell(_relevance_text(item)) lines.append(f"| {card_id} | {title} | {source} | {cue} | {relevance} |") return "\n".join(lines) def _evidence_cue(card: dict[str, Any]) -> str: for field in ("escalation_criteria", "red_flags", "safety_boundary"): value = card.get(field) if isinstance(value, list): for item in value: text = str(item).strip() if text: return text elif value: return str(value).strip() return "No escalation cue or safety boundary summary available." def _relevance_text(result: dict[str, Any]) -> str: parts: list[str] = [] score = result.get("score") if isinstance(score, int | float): parts.append(f"score={float(score):.2f}") elif score not in (None, ""): parts.append(f"score={score}") snippet = result.get("snippet") or result.get("matched_text") or result.get("summary") if snippet: parts.append(str(snippet).strip()) return "; ".join(parts) if parts else "No relevance score or snippet available." def _compact_cell(value: str, max_chars: int = 180) -> str: text = " ".join(value.split()) if len(text) > max_chars: text = text[: max_chars - 3].rstrip() + "..." return text.replace("|", "\\|") def _navigate_ui( intake: dict[str, Any], audio_draft: dict[str, Any] | None, config: FigmentConfig | None = None, ) -> tuple[dict[str, Any], str, dict[str, Any], dict[str, Any]]: if not intake: return {}, "", {}, {} result = run_case(intake, (config or load_config()).validated(), audio_draft=audio_draft) return result["navigator_output"], result["sbar"], result["trace"], result["trace"] if __name__ == "__main__": build_app().launch()