| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import os |
| | import io |
| | import json |
| | import uuid |
| | import pathlib |
| | import traceback |
| | from datetime import datetime, timezone |
| | from typing import Any, Dict, List, Tuple |
| |
|
| | import pandas as pd |
| | import gradio as gr |
| |
|
| | |
| | import importlib, importlib.util, importlib.machinery |
| | import sys as __sys, os as __os, inspect as __inspect, hashlib |
| |
|
| | __sys.path.insert(0, __os.path.abspath(".")) |
| |
|
| | def _purge_spine_modules(): |
| | """Remove cached spine_coder modules so we truly reload the file we want.""" |
| | for k in list(__sys.modules): |
| | if k == "spine_coder" or k.startswith("spine_coder."): |
| | __sys.modules.pop(k, None) |
| | importlib.invalidate_caches() |
| |
|
| | def _sha256(path: str) -> str: |
| | try: |
| | h = hashlib.sha256() |
| | with open(path, "rb") as f: |
| | for chunk in iter(lambda: f.read(8192), b""): |
| | h.update(chunk) |
| | return h.hexdigest()[:12] |
| | except Exception: |
| | return "unknown" |
| |
|
| | def _load_core_from_file(path: str): |
| | """ |
| | Load spine_coder_core.py directly from a given path (bypass module cache). |
| | IMPORTANT: insert into sys.modules BEFORE exec_module; do NOT reload. |
| | """ |
| | path = __os.path.abspath(path) |
| | if not __os.path.exists(path): |
| | return None |
| | name = f"spine_coder_core_dynamic_{_sha256(path)}" |
| | loader = importlib.machinery.SourceFileLoader(name, path) |
| | spec = importlib.util.spec_from_loader(name, loader) |
| | mod = importlib.util.module_from_spec(spec) |
| | __sys.modules[name] = mod |
| | loader.exec_module(mod) |
| | return mod |
| |
|
| | def _force_import_core(): |
| | """Order: purge caches → SPINE_CORE_PATH → local files → package modules.""" |
| | _purge_spine_modules() |
| |
|
| | |
| | forced = __os.environ.get("SPINE_CORE_PATH") |
| | if forced and __os.path.exists(forced): |
| | mod = _load_core_from_file(forced) |
| | if mod: |
| | print("[CORE] forced path:", forced, "sha:", _sha256(forced)) |
| | return mod |
| |
|
| | |
| | for rel in [ |
| | "spine_coder_core.py", |
| | "spine_coder/spine_coder_core.py", |
| | "spine_coder/spine_coder/spine_coder_core.py", |
| | ]: |
| | p = __os.path.abspath(rel) |
| | if __os.path.exists(p): |
| | mod = _load_core_from_file(p) |
| | if mod: |
| | print("[CORE] loaded file:", p, "sha:", _sha256(p)) |
| | return mod |
| |
|
| | |
| | for modname in [ |
| | "spine_coder.spine_coder.spine_coder_core", |
| | "spine_coder.spine_coder_core", |
| | ]: |
| | try: |
| | mod = importlib.import_module(modname) |
| | mod = importlib.reload(mod) |
| | src = __inspect.getsourcefile(mod.suggest_with_cpt_billing) or "unknown" |
| | print("[CORE] loaded module:", modname, "from:", src) |
| | return mod |
| | except Exception: |
| | pass |
| |
|
| | raise ImportError("Unable to locate spine_coder_core.py via modules or file paths.") |
| |
|
| | _core = _force_import_core() |
| | suggest_with_cpt_billing = _core.suggest_with_cpt_billing |
| |
|
| | try: |
| | _active_src = __inspect.getsourcefile(suggest_with_cpt_billing) or "unknown" |
| | print("[CORE] active source:", _active_src) |
| | except Exception: |
| | _active_src = "unknown" |
| |
|
| | |
| | try: |
| | _probe_note = "Discectomies at C4–C5, C5–C6, and C6–C7. Interbody cages and anterior plate spanning C4–C7." |
| | _probe = suggest_with_cpt_billing(_probe_note, payer="Medicare", top_k=5) |
| | print("[PROBE] build:", _probe.get("build")) |
| | print("[PROBE] first_suggestion:", (_probe.get("suggestions") or [{}])[0]) |
| | except Exception as e: |
| | print("[PROBE] failed:", e) |
| |
|
| | |
| | os.environ.setdefault("GRADIO_ANALYTICS_ENABLED", "False") |
| | DEBUG = os.environ.get("DEBUG", "0") == "1" |
| | PAYER_CHOICES = ["Medicare", "BCBS", "Aetna", "Cigna", "UnitedHealthcare", "Other"] |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | LOG_PUSH_ENABLE = os.environ.get("LOG_PUSH_ENABLE", "0") == "1" |
| | HF_TARGET_REPO = os.environ.get("HF_TARGET_REPO") |
| | HF_REPO_TYPE = os.environ.get("HF_REPO_TYPE", "dataset") |
| | HF_WRITE_TOKEN = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACEHUB_API_TOKEN") |
| |
|
| | _hf_api = None |
| | def _get_hf_api(): |
| | global _hf_api |
| | if _hf_api is None: |
| | from huggingface_hub import HfApi |
| | _hf_api = HfApi(token=HF_WRITE_TOKEN) |
| | return _hf_api |
| |
|
| | |
| | try: |
| | print( |
| | "[LOG CFG]", |
| | "enable=" + str(LOG_PUSH_ENABLE), |
| | "repo=" + str(HF_TARGET_REPO), |
| | "type=" + str(HF_REPO_TYPE), |
| | "token=" + ("set" if HF_WRITE_TOKEN else "missing"), |
| | ) |
| | except Exception: |
| | pass |
| |
|
| | |
| | LOG_DIR = os.environ.get("LOG_DIR", "logs") |
| | pathlib.Path(LOG_DIR).mkdir(parents=True, exist_ok=True) |
| |
|
| | def _log_path(session_id: str) -> str: |
| | return os.path.join(LOG_DIR, f"{session_id}.jsonl") |
| |
|
| | def _utcnow_iso() -> str: |
| | return datetime.now(timezone.utc).isoformat(timespec="seconds") |
| |
|
| | |
| | def _push_log_line_to_repo(entry: Dict[str, Any]) -> None: |
| | if not LOG_PUSH_ENABLE: |
| | return |
| | if not HF_TARGET_REPO or not HF_WRITE_TOKEN: |
| | |
| | return |
| | try: |
| | from huggingface_hub import CommitOperationAdd, hf_hub_download |
| | api = _get_hf_api() |
| |
|
| | day = datetime.utcnow().strftime("%Y-%m-%d") |
| | path_in_repo = f"logs-live/{day}.jsonl" |
| |
|
| | |
| | existing = "" |
| | try: |
| | local_fp = hf_hub_download( |
| | repo_id=HF_TARGET_REPO, |
| | filename=path_in_repo, |
| | repo_type=HF_REPO_TYPE, |
| | token=HF_WRITE_TOKEN, |
| | local_dir="/tmp", |
| | local_dir_use_symlinks=False, |
| | force_download=False, |
| | ) |
| | with open(local_fp, "r", encoding="utf-8") as f: |
| | existing = f.read() |
| | except Exception: |
| | existing = "" |
| |
|
| | |
| | new_line = json.dumps(entry, ensure_ascii=False) |
| | if existing and not existing.endswith("\n"): |
| | existing += "\n" |
| | merged = (existing + new_line + "\n").encode("utf-8") |
| |
|
| | api.create_commit( |
| | repo_id=HF_TARGET_REPO, |
| | repo_type=HF_REPO_TYPE, |
| | operations=[CommitOperationAdd(path_in_repo=path_in_repo, path_or_fileobj=io.BytesIO(merged))], |
| | commit_message=f"logs: append {day} ({entry.get('event','')})", |
| | ) |
| | except Exception as e: |
| | print("[LOG PUSH] failed:", e) |
| |
|
| | def _append_log(session_id: str, entry: Dict[str, Any]) -> None: |
| | entry = {"ts": _utcnow_iso(), "session_id": session_id, **entry} |
| | |
| | with open(_log_path(session_id), "a", encoding="utf-8") as f: |
| | f.write(json.dumps(entry, ensure_ascii=False) + "\n") |
| | |
| | _push_log_line_to_repo(entry) |
| |
|
| | def export_session(session_id: str) -> str: |
| | src = _log_path(session_id) |
| | data: List[Dict[str, Any]] = [] |
| | if os.path.exists(src): |
| | with open(src, "r", encoding="utf-8") as f: |
| | data = [json.loads(l) for l in f if l.strip()] |
| | out_path = os.path.join(LOG_DIR, f"export_{session_id}.json") |
| | with open(out_path, "w", encoding="utf-8") as f: |
| | json.dump(data, f, indent=2, ensure_ascii=False) |
| | return out_path |
| |
|
| | |
| |
|
| | def _core_path() -> str: |
| | try: |
| | return __inspect.getsourcefile(suggest_with_cpt_billing) or "unknown" |
| | except Exception: |
| | return "unknown" |
| |
|
| | |
| | SUGG_COLS = [ |
| | "CPT", "Description", "Rationale", |
| | "Confidence", "Primary", "Category", "Laterality", "Units" |
| | ] |
| | EMPTY_SUGG_DF = pd.DataFrame(columns=SUGG_COLS) |
| | EMPTY_MODS_DF = pd.DataFrame([{"modifier": "—", "reason": ""}]) |
| |
|
| | def _coalesce_rows(result: Dict[str, Any]) -> Tuple[pd.DataFrame, Dict[str, Any]]: |
| | """Build the suggestions table (no modifier columns) and meta badges.""" |
| | if not isinstance(result, dict): |
| | return EMPTY_SUGG_DF, {} |
| |
|
| | sugg = result.get("suggestions") or [] |
| | rows: List[Dict[str, Any]] = [] |
| | case_lat = (result.get("laterality") or "").strip().lower() |
| |
|
| | for s in sugg: |
| | if not isinstance(s, dict): |
| | continue |
| | mods = s.get("modifiers", []) or [] |
| |
|
| | |
| | row_lat = (s.get("laterality") or "").strip().lower() |
| | if not row_lat: |
| | if isinstance(mods, list) and "LT" in mods: |
| | row_lat = "left" |
| | elif isinstance(mods, list) and "RT" in mods: |
| | row_lat = "right" |
| | elif case_lat in ("left", "right", "bilateral"): |
| | row_lat = case_lat |
| |
|
| | conf_val = s.get("confidence") |
| | conf_out = round(float(conf_val), 2) if isinstance(conf_val, (int, float)) else (conf_val or "") |
| |
|
| | rows.append({ |
| | "CPT": s.get("cpt", ""), |
| | "Description": s.get("desc", ""), |
| | "Rationale": s.get("rationale", ""), |
| | "Confidence": conf_out, |
| | "Primary": "✓" if s.get("primary") else "", |
| | "Category": s.get("category", ""), |
| | "Laterality": row_lat, |
| | "Units": s.get("units", 1), |
| | }) |
| |
|
| | |
| | segs: List[str] = [] |
| | inters_list: List[str] = [] |
| | lvl_lat = "" |
| | levels_obj = result.get("levels") |
| | if isinstance(levels_obj, dict): |
| | segs = list(levels_obj.get("segments") or []) |
| | inters_list = list(levels_obj.get("interspaces") or []) |
| | lvl_lat = levels_obj.get("laterality", "") or "" |
| | elif isinstance(levels_obj, list): |
| | segs = [str(x) for x in levels_obj] |
| |
|
| | |
| | flags_obj = result.get("flags") |
| | if isinstance(flags_obj, list): |
| | flags_list = [str(x) for x in flags_obj] |
| | elif isinstance(flags_obj, dict): |
| | flags_list = [k for k, v in flags_obj.items() if v] |
| | else: |
| | flags_list = [] |
| |
|
| | meta = { |
| | "payer": result.get("payer", ""), |
| | "region": result.get("region", ""), |
| | "laterality": result.get("laterality", "") or lvl_lat, |
| | "levels_segments": ", ".join(segs), |
| | "levels_interspaces": ( |
| | str(result.get("interspaces_est", "")) if "interspaces_est" in result |
| | else ", ".join(inters_list) |
| | ), |
| | "flags": ", ".join(sorted(flags_list)), |
| | "build": result.get("build", ""), |
| | "mode": result.get("mode", ""), |
| | "core_path": _core_path(), |
| | } |
| |
|
| | df = EMPTY_SUGG_DF if not rows else pd.DataFrame(rows) |
| | if not df.empty: |
| | for col in SUGG_COLS: |
| | if col not in df.columns: |
| | df[col] = "" |
| | df = df[SUGG_COLS] |
| | return df, meta |
| |
|
| | def _case_mods_df(result: Dict[str, Any]) -> pd.DataFrame: |
| | mods = result.get("case_modifiers", []) or [] |
| | if not mods: |
| | return EMPTY_MODS_DF.copy() |
| | return pd.DataFrame([{"modifier": f"-{m.get('modifier','')}", "reason": m.get("reason","")} for m in mods]) |
| |
|
| | def _summary_md(meta: Dict[str, Any]) -> str: |
| | chips = [] |
| | if meta.get("region"): chips.append(f"`Region: {meta['region']}`") |
| | if meta.get("laterality"): chips.append(f"`Laterality: {meta['laterality']}`") |
| | if meta.get("levels_segments"): chips.append(f"`Segments: {meta['levels_segments']}`") |
| | if meta.get("levels_interspaces"): chips.append(f"`Interspaces: {meta['levels_interspaces']}`") |
| | if meta.get("flags"): chips.append(f"`Flags: {meta['flags']}`") |
| | if meta.get("build"): chips.append(f"`Build: {meta['build']}`") |
| | if meta.get("mode"): chips.append(f"`Mode: {meta['mode']}`") |
| | try: |
| | core_base = os.path.basename(meta.get("core_path","")) if meta.get("core_path") else "" |
| | if core_base: |
| | chips.append(f"`Core: {core_base}`") |
| | except Exception: |
| | pass |
| | return " ".join(chips) if chips else "—" |
| |
|
| | def new_session() -> str: |
| | return str(uuid.uuid4())[:8] |
| |
|
| | |
| |
|
| | def run_inference(note: str, payer: str, top_k: int, session_id: str): |
| | if not note.strip(): |
| | return ( |
| | EMPTY_SUGG_DF, |
| | EMPTY_MODS_DF.copy(), |
| | "—", "", "", session_id |
| | ) |
| |
|
| | _append_log(session_id, {"event": "request", "payer": payer, "top_k": top_k, "note": note}) |
| | try: |
| | result = suggest_with_cpt_billing(note=note, payer=payer, top_k=top_k) |
| | if DEBUG: |
| | print("[DEBUG] build/region/laterality/flags:", |
| | result.get("build"), result.get("region"), |
| | result.get("laterality"), result.get("flags")) |
| | except Exception as e: |
| | tb = traceback.format_exc() |
| | _append_log(session_id, {"event": "error", "error": repr(e), "traceback": tb}) |
| | warn = f"⚠️ Error: {e}" |
| | if DEBUG: |
| | warn += f"\n\n```traceback\n{tb}\n```" |
| | return ( |
| | EMPTY_SUGG_DF, |
| | EMPTY_MODS_DF.copy(), |
| | "—", "", warn, session_id |
| | ) |
| |
|
| | sugg_df, meta = _coalesce_rows(result) |
| | case_mods_df = _case_mods_df(result) |
| | summary = _summary_md(meta) |
| | json_pretty = json.dumps(result, indent=2, ensure_ascii=False) |
| |
|
| | _append_log(session_id, { |
| | "event": "response", |
| | "meta": { |
| | **meta, |
| | "case_modifiers": ", ".join([f"-{m}" for m in [cm.get("modifier","") for cm in (result.get("case_modifiers") or [])] if m]) or "" |
| | }, |
| | "rows_len": int(len(sugg_df) if hasattr(sugg_df, "__len__") else 0) |
| | }) |
| | return sugg_df, case_mods_df, summary, json_pretty, "", session_id |
| |
|
| | def record_feedback(session_id: str, vote: str, text: str): |
| | if not vote and not text: |
| | return "Please choose 👍/👎 or add a short note." |
| | _append_log(session_id, {"event": "feedback", "vote": vote, "text": text}) |
| | return "Thanks! Your feedback was recorded." |
| |
|
| | def do_export(session_id: str): |
| | path = export_session(session_id) |
| | _append_log(session_id, {"event": "export", "path": path}) |
| | return path |
| |
|
| | def on_clear(): |
| | return ( |
| | "", "Medicare", 10, |
| | EMPTY_SUGG_DF, |
| | EMPTY_MODS_DF.copy(), |
| | "—", "", "", |
| | new_session() |
| | ) |
| |
|
| | |
| |
|
| | def run_probe(session_id: str) -> Tuple[str, str]: |
| | """Run a live diagnostic: show core path & build and 3 smoke tests.""" |
| | info_lines = [] |
| | try: |
| | core_path = _core_path() |
| | tests = [ |
| | ("Implicit TLIF", |
| | "Left facetectomy L4–L5 with PEEK interbody cage and pedicle screws; rods secured.", |
| | ["22633"], |
| | ), |
| | ("ACDF chain w/ plate", |
| | "Discectomies at C4–C5, C5–C6, and C6–C7. Interbody cages and anterior plate spanning C4–C7.", |
| | ["22551","22552","22846"], |
| | ), |
| | ("Exposure-only", |
| | "Anterior exposure of L4–S1 performed by vascular surgeon for access. No fusion performed.", |
| | ["00000"], |
| | ), |
| | ] |
| | |
| | probe = suggest_with_cpt_billing(tests[1][1], payer="Medicare", top_k=10) |
| | build = probe.get("build","") |
| | info_lines.append(f"**Core path:** `{core_path}` \n**Build:** `{build}`") |
| |
|
| | |
| | for label, note, expect_codes in tests: |
| | res = suggest_with_cpt_billing(note, payer="Medicare", top_k=10) |
| | codes = [s.get("cpt") for s in (res.get("suggestions") or [])] |
| | ok = all(any(ec == c for c in codes) for ec in expect_codes) |
| | info_lines.append(f"- **{label}** → codes: `{', '.join(codes) or '∅'}` — **{'PASS' if ok else 'CHECK'}**") |
| | md = "\n".join(info_lines) |
| | _append_log(session_id, {"event":"probe","details": md}) |
| | return md, "" |
| | except Exception as e: |
| | tb = traceback.format_exc() |
| | _append_log(session_id, {"event":"probe_error","error":repr(e),"traceback":tb}) |
| | return "", f"⚠️ Probe failed: {e}" |
| |
|
| | def run_live_log_selftest(session_id: str) -> Tuple[str, str]: |
| | """Attempt a small append to dataset repo to verify live logging config.""" |
| | try: |
| | entry = { |
| | "event": "selftest", |
| | "note": "hello-from-selftest", |
| | "ts": _utcnow_iso(), |
| | "session_id": session_id, |
| | } |
| | _push_log_line_to_repo(entry) |
| | msg = ( |
| | f"✅ Live log push attempted.\n" |
| | f"- enable={LOG_PUSH_ENABLE}\n" |
| | f"- repo={HF_TARGET_REPO}\n" |
| | f"- type={HF_REPO_TYPE}\n" |
| | f"- token={'set' if HF_WRITE_TOKEN else 'missing'}\n" |
| | f"- path=logs-live/{datetime.utcnow().strftime('%Y-%m-%d')}.jsonl" |
| | ) |
| | _append_log(session_id, {"event":"selftest", "details":"manual push executed"}) |
| | return msg, "" |
| | except Exception as e: |
| | tb = traceback.format_exc() |
| | _append_log(session_id, {"event":"selftest_error","error":repr(e),"traceback":tb}) |
| | return "", f"⚠️ Self-test failed: {e}" |
| |
|
| | |
| |
|
| | EXAMPLES = [ |
| | ["Left-sided TLIF L4–L5 with pedicle screws and interbody cage; posterolateral fusion performed. Navigation used.", "Medicare", 10], |
| | ["ACDF C5–C6 with PEEK cage and anterior plate; microscope and neuromonitoring used.", "Medicare", 10], |
| | ["Posterior cervical foraminotomy right C6–C7; no fusion or instrumentation.", "Medicare", 10], |
| | ["ALIF L5–S1 with structural allograft; non-segmental instrumentation placed.", "Medicare", 10], |
| | ["Removal of posterior segmental instrumentation T10–L2; no new hardware placed.", "Medicare", 10], |
| | |
| | ["TLIF L4–L5 was initiated but aborted midway due to neuromonitoring changes.", "Medicare", 10], |
| | ["Bilateral decompression and foraminotomy at L4–L5 and L5–S1.", "Medicare", 10], |
| | ["Assistant surgeon present; resident not available.", "Medicare", 10], |
| | ["Complex exposure with severe deformity and adhesiolysis.", "Medicare", 10], |
| | ] |
| |
|
| | |
| |
|
| | THEME = gr.themes.Soft( |
| | primary_hue="indigo", |
| | secondary_hue="blue", |
| | neutral_hue="slate", |
| | ).set( |
| | body_text_color="#0f172a", |
| | background_fill_primary="#ffffff", |
| | button_primary_background_fill="#4f46e5", |
| | input_background_fill="#ffffff", |
| | ) |
| |
|
| | CUSTOM_CSS = """ |
| | :root { --radius-lg: 16px; } |
| | .gradio-container { font-family: ui-sans-serif, system-ui, -apple-system; } |
| | |
| | /* Header card */ |
| | .header-card { |
| | border-radius: 18px; |
| | padding: 18px; |
| | border: 1px solid #1f2937; |
| | background: linear-gradient(180deg,#0f172a,#0b1220); |
| | color: #e5e7eb; |
| | } |
| | .badge-row code { |
| | margin-right: 8px; |
| | border-radius: 12px; |
| | padding: 2px 8px; |
| | background: #111827; |
| | color: #e5e7eb; |
| | } |
| | |
| | /* Table container */ |
| | .table-wrap { max-height: 520px; overflow: auto; } |
| | |
| | /* Suggestions table target */ |
| | #suggestions_table .dataframe { |
| | font-size: 15px; |
| | width: 100% !important; |
| | table-layout: auto !important; |
| | border-collapse: collapse; |
| | } |
| | #suggestions_table .dataframe th, |
| | #suggestions_table .dataframe td { |
| | white-space: normal; |
| | word-wrap: break-word; |
| | text-align: left; |
| | vertical-align: top; |
| | padding: 10px 12px; |
| | } |
| | |
| | /* Column sizing (1-indexed): |
| | 1=CPT, 2=Description, 3=Rationale, 4=Confidence, |
| | 5=Primary, 6=Category, 7=Laterality, 8=Units */ |
| | |
| | /* CPT — wider & no wrap, monospace, centered */ |
| | #suggestions_table .dataframe th:nth-child(1), |
| | #suggestions_table .dataframe td:nth-child(1) { |
| | min-width: 120px; |
| | max-width: 140px; |
| | white-space: nowrap !important; |
| | font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", monospace; |
| | text-align: center; |
| | } |
| | |
| | /* Description — roomy */ |
| | #suggestions_table .dataframe th:nth-child(2), |
| | #suggestions_table .dataframe td:nth-child(2) { |
| | min-width: 360px; |
| | max-width: 560px; |
| | } |
| | |
| | /* Rationale — roomy */ |
| | #suggestions_table .dataframe th:nth-child(3), |
| | #suggestions_table .dataframe td:nth-child(3) { |
| | min-width: 320px; |
| | max-width: 520px; |
| | } |
| | |
| | /* Category — a bit wider */ |
| | #suggestions_table .dataframe th:nth-child(6), |
| | #suggestions_table .dataframe td:nth-child(6) { |
| | min-width: 180px; |
| | } |
| | |
| | /* Keep tiny columns compact */ |
| | #suggestions_table .dataframe th:nth-child(4), |
| | #suggestions_table .dataframe td:nth-child(4), |
| | #suggestions_table .dataframe th:nth-child(5), |
| | #suggestions_table .dataframe td:nth-child(5), |
| | #suggestions_table .dataframe th:nth-child(7), |
| | #suggestions_table .dataframe td:nth-child(7), |
| | #suggestions_table .dataframe th:nth-child(8), |
| | #suggestions_table .dataframe td:nth-child(8) { |
| | min-width: 90px; |
| | } |
| | |
| | .footer-note { color:#94a3b8; font-size:12px; } |
| | """ |
| |
|
| | |
| | with gr.Blocks(theme=THEME, css=CUSTOM_CSS, title="Spine Coder — CPT Billing") as demo: |
| | session_id = gr.State(new_session()) |
| |
|
| | with gr.Row(): |
| | with gr.Column(): |
| | gr.Markdown("### 🦴 Spine Coder — CPT Billing & Operative Note NLP") |
| | gr.Markdown( |
| | '<div class="header-card">Structured CPT suggestions from spine operative notes — with payer-aware ' |
| | 'modifiers, laterality detection, and rationales.<br/>' |
| | '<span class="footer-note">No PHI is stored; inputs are session-scoped and ephemeral.</span></div>' |
| | ) |
| |
|
| | with gr.Row(): |
| | |
| | with gr.Column(scale=5): |
| | note_in = gr.Textbox( |
| | label="Operative Note", |
| | placeholder="Paste an operative note here…", |
| | lines=14, |
| | autofocus=True, |
| | ) |
| | with gr.Row(): |
| | payer_dd = gr.Dropdown(PAYER_CHOICES, value="Medicare", label="Payer") |
| | topk = gr.Slider(1, 15, value=10, step=1, label="Top-K suggestions") |
| | with gr.Row(): |
| | run_btn = gr.Button("Analyze Note", variant="primary") |
| | clear_btn = gr.Button("Clear") |
| |
|
| | with gr.Accordion("Quick Examples", open=False): |
| | gr.Examples( |
| | examples=EXAMPLES, |
| | inputs=[note_in, payer_dd, topk], |
| | label="Click a row to load an example" |
| | ) |
| |
|
| | with gr.Accordion("Feedback", open=False): |
| | fb_choice = gr.Radio(choices=["👍", "👎"], label="Was this helpful?") |
| | fb_text = gr.Textbox(label="Optional comment", lines=2, placeholder="Tell us what worked or what missed…") |
| | fb_submit = gr.Button("Submit Feedback") |
| | fb_status = gr.Markdown("") |
| |
|
| | with gr.Accordion("Session", open=False): |
| | sid_show = gr.Textbox(label="Session ID", value="", interactive=False) |
| | export_btn = gr.Button("Export Session as JSON") |
| | export_file = gr.File(label="Download", interactive=False) |
| |
|
| | with gr.Accordion("Diagnostics", open=False): |
| | probe_btn = gr.Button("Run Probe (core path + 3 tests)") |
| | probe_md = gr.Markdown("") |
| | selftest_btn = gr.Button("Test Live Log Push") |
| | selftest_md = gr.Markdown("") |
| |
|
| | |
| | with gr.Column(scale=7): |
| | gr.Markdown("#### Results") |
| | summary_md = gr.Markdown("—", elem_classes=["badge-row"]) |
| |
|
| | |
| | table = gr.Dataframe( |
| | value=EMPTY_SUGG_DF, |
| | label="CPT Suggestions", |
| | interactive=False, |
| | row_count=(0, "dynamic"), |
| | wrap=True, |
| | elem_classes=["table-wrap"], |
| | elem_id="suggestions_table", |
| | ) |
| |
|
| | |
| | gr.Markdown("### Case Modifiers (visit-level)") |
| | case_mods_table = gr.Dataframe( |
| | value=EMPTY_MODS_DF.copy(), |
| | headers=["modifier","reason"], |
| | interactive=False, |
| | wrap=True, |
| | label="Case Modifiers", |
| | ) |
| |
|
| | with gr.Accordion("Raw JSON", open=False): |
| | json_out = gr.Code(language="json", value="", interactive=False) |
| | warn_md = gr.Markdown("") |
| |
|
| | |
| | def _on_load(): |
| | sid = new_session() |
| | return sid, sid |
| |
|
| | demo.load(_on_load, outputs=[session_id, sid_show]) |
| |
|
| | run_inputs = [note_in, payer_dd, topk, session_id] |
| | run_outputs = [table, case_mods_table, summary_md, json_out, warn_md, session_id] |
| |
|
| | run_btn.click(run_inference, inputs=run_inputs, outputs=run_outputs) |
| | note_in.submit(run_inference, inputs=run_inputs, outputs=run_outputs) |
| |
|
| | clear_btn.click( |
| | on_clear, |
| | outputs=[note_in, payer_dd, topk, table, case_mods_table, summary_md, json_out, warn_md, session_id] |
| | ) |
| |
|
| | fb_submit.click(record_feedback, inputs=[session_id, fb_choice, fb_text], outputs=fb_status) |
| | export_btn.click(do_export, inputs=[session_id], outputs=[export_file]) |
| |
|
| | probe_btn.click(run_probe, inputs=[session_id], outputs=[probe_md, warn_md]) |
| | selftest_btn.click(run_live_log_selftest, inputs=[session_id], outputs=[selftest_md, warn_md]) |
| |
|
| | if __name__ == "__main__": |
| | |
| | demo.launch() |
| |
|