# app.py — Spine Coder (Chatbot + Feedback + Session Logs) — Gradio 4.x # ------------------------------------------------------------------------------ # FINAL-v2.2 UI build (hardened) + LIVE DATASET LOGGING (no restarts): # - Purge caches, dynamic file loader (SPINE_CORE_PATH override) + file SHA print # - Startup PROBE to logs + in-UI Diagnostics/Probe + Live Log Push Self-Test # - Clean CPT table (no per-row modifier columns) + Case Modifiers panel # - Build/Core chips, structured logs/export, modern Gradio usage # - Per-request log commits to a separate DATASET repo (no Space rebuilds) # ------------------------------------------------------------------------------ import os import io import json import uuid import pathlib import traceback from datetime import datetime, timezone from typing import Any, Dict, List, Tuple import pandas as pd import gradio as gr # ==== Bulletproof Core Import ================================================== import importlib, importlib.util, importlib.machinery import sys as __sys, os as __os, inspect as __inspect, hashlib __sys.path.insert(0, __os.path.abspath(".")) # ensure repo root is on sys.path def _purge_spine_modules(): """Remove cached spine_coder modules so we truly reload the file we want.""" for k in list(__sys.modules): if k == "spine_coder" or k.startswith("spine_coder."): __sys.modules.pop(k, None) importlib.invalidate_caches() def _sha256(path: str) -> str: try: h = hashlib.sha256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): h.update(chunk) return h.hexdigest()[:12] except Exception: return "unknown" def _load_core_from_file(path: str): """ Load spine_coder_core.py directly from a given path (bypass module cache). IMPORTANT: insert into sys.modules BEFORE exec_module; do NOT reload. """ path = __os.path.abspath(path) if not __os.path.exists(path): return None name = f"spine_coder_core_dynamic_{_sha256(path)}" loader = importlib.machinery.SourceFileLoader(name, path) spec = importlib.util.spec_from_loader(name, loader) mod = importlib.util.module_from_spec(spec) __sys.modules[name] = mod # register first loader.exec_module(mod) # then exec return mod def _force_import_core(): """Order: purge caches → SPINE_CORE_PATH → local files → package modules.""" _purge_spine_modules() # 1) Explicit path override (Space secret/variable) forced = __os.environ.get("SPINE_CORE_PATH") if forced and __os.path.exists(forced): mod = _load_core_from_file(forced) if mod: print("[CORE] forced path:", forced, "sha:", _sha256(forced)) return mod # 2) Likely local paths (edited copies near app) for rel in [ "spine_coder_core.py", "spine_coder/spine_coder_core.py", "spine_coder/spine_coder/spine_coder_core.py", # package-style tree ]: p = __os.path.abspath(rel) if __os.path.exists(p): mod = _load_core_from_file(p) if mod: print("[CORE] loaded file:", p, "sha:", _sha256(p)) return mod # 3) Package modules (may be stale) for modname in [ "spine_coder.spine_coder.spine_coder_core", "spine_coder.spine_coder_core", ]: try: mod = importlib.import_module(modname) mod = importlib.reload(mod) # ok for package import src = __inspect.getsourcefile(mod.suggest_with_cpt_billing) or "unknown" print("[CORE] loaded module:", modname, "from:", src) return mod except Exception: pass raise ImportError("Unable to locate spine_coder_core.py via modules or file paths.") _core = _force_import_core() suggest_with_cpt_billing = _core.suggest_with_cpt_billing try: _active_src = __inspect.getsourcefile(suggest_with_cpt_billing) or "unknown" print("[CORE] active source:", _active_src) except Exception: _active_src = "unknown" # ---- One-time startup probe (prints to Space logs) ---------------------------- try: _probe_note = "Discectomies at C4–C5, C5–C6, and C6–C7. Interbody cages and anterior plate spanning C4–C7." _probe = suggest_with_cpt_billing(_probe_note, payer="Medicare", top_k=5) print("[PROBE] build:", _probe.get("build")) print("[PROBE] first_suggestion:", (_probe.get("suggestions") or [{}])[0]) except Exception as e: print("[PROBE] failed:", e) # ==== Config ================================================================== os.environ.setdefault("GRADIO_ANALYTICS_ENABLED", "False") DEBUG = os.environ.get("DEBUG", "0") == "1" PAYER_CHOICES = ["Medicare", "BCBS", "Aetna", "Cigna", "UnitedHealthcare", "Other"] # ==== Remote log push settings (dataset repo; avoids Space rebuilds) ========== # Required env (set in Space Settings → Variables & secrets): # LOG_PUSH_ENABLE=1 # HF_TARGET_REPO=Slaiwala/spinecoder-logs # HF_REPO_TYPE=dataset # HF_TOKEN (or HUGGINGFACEHUB_API_TOKEN) with WRITE access to that repo LOG_PUSH_ENABLE = os.environ.get("LOG_PUSH_ENABLE", "0") == "1" HF_TARGET_REPO = os.environ.get("HF_TARGET_REPO") # e.g., "Slaiwala/spinecoder-logs" HF_REPO_TYPE = os.environ.get("HF_REPO_TYPE", "dataset") # keep 'dataset' to prevent Space rebuilds HF_WRITE_TOKEN = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACEHUB_API_TOKEN") _hf_api = None def _get_hf_api(): global _hf_api if _hf_api is None: from huggingface_hub import HfApi _hf_api = HfApi(token=HF_WRITE_TOKEN) return _hf_api # Log config banner (for quick sanity in Space logs) try: print( "[LOG CFG]", "enable=" + str(LOG_PUSH_ENABLE), "repo=" + str(HF_TARGET_REPO), "type=" + str(HF_REPO_TYPE), "token=" + ("set" if HF_WRITE_TOKEN else "missing"), ) except Exception: pass # ==== Local logging ============================================================ LOG_DIR = os.environ.get("LOG_DIR", "logs") pathlib.Path(LOG_DIR).mkdir(parents=True, exist_ok=True) def _log_path(session_id: str) -> str: return os.path.join(LOG_DIR, f"{session_id}.jsonl") def _utcnow_iso() -> str: return datetime.now(timezone.utc).isoformat(timespec="seconds") # ---- push a single line to dataset repo (no Space restart) ------------------- def _push_log_line_to_repo(entry: Dict[str, Any]) -> None: if not LOG_PUSH_ENABLE: return if not HF_TARGET_REPO or not HF_WRITE_TOKEN: # Misconfigured: silently skip rather than touching the Space repo return try: from huggingface_hub import CommitOperationAdd, hf_hub_download api = _get_hf_api() day = datetime.utcnow().strftime("%Y-%m-%d") path_in_repo = f"logs-live/{day}.jsonl" # Try to download existing file (may not exist yet) existing = "" try: local_fp = hf_hub_download( repo_id=HF_TARGET_REPO, filename=path_in_repo, repo_type=HF_REPO_TYPE, # 'dataset' token=HF_WRITE_TOKEN, local_dir="/tmp", local_dir_use_symlinks=False, force_download=False, ) with open(local_fp, "r", encoding="utf-8") as f: existing = f.read() except Exception: existing = "" # Append one line new_line = json.dumps(entry, ensure_ascii=False) if existing and not existing.endswith("\n"): existing += "\n" merged = (existing + new_line + "\n").encode("utf-8") api.create_commit( repo_id=HF_TARGET_REPO, repo_type=HF_REPO_TYPE, # 'dataset' operations=[CommitOperationAdd(path_in_repo=path_in_repo, path_or_fileobj=io.BytesIO(merged))], commit_message=f"logs: append {day} ({entry.get('event','')})", ) except Exception as e: print("[LOG PUSH] failed:", e) def _append_log(session_id: str, entry: Dict[str, Any]) -> None: entry = {"ts": _utcnow_iso(), "session_id": session_id, **entry} # Local JSONL (unchanged) with open(_log_path(session_id), "a", encoding="utf-8") as f: f.write(json.dumps(entry, ensure_ascii=False) + "\n") # Dataset push (no restart) _push_log_line_to_repo(entry) def export_session(session_id: str) -> str: src = _log_path(session_id) data: List[Dict[str, Any]] = [] if os.path.exists(src): with open(src, "r", encoding="utf-8") as f: data = [json.loads(l) for l in f if l.strip()] out_path = os.path.join(LOG_DIR, f"export_{session_id}.json") with open(out_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) return out_path # ==== UI Helpers ============================================================== def _core_path() -> str: try: return __inspect.getsourcefile(suggest_with_cpt_billing) or "unknown" except Exception: return "unknown" # Per-row modifiers intentionally removed from CPT table. SUGG_COLS = [ "CPT", "Description", "Rationale", "Confidence", "Primary", "Category", "Laterality", "Units" ] EMPTY_SUGG_DF = pd.DataFrame(columns=SUGG_COLS) EMPTY_MODS_DF = pd.DataFrame([{"modifier": "—", "reason": ""}]) def _coalesce_rows(result: Dict[str, Any]) -> Tuple[pd.DataFrame, Dict[str, Any]]: """Build the suggestions table (no modifier columns) and meta badges.""" if not isinstance(result, dict): return EMPTY_SUGG_DF, {} sugg = result.get("suggestions") or [] rows: List[Dict[str, Any]] = [] case_lat = (result.get("laterality") or "").strip().lower() for s in sugg: if not isinstance(s, dict): continue mods = s.get("modifiers", []) or [] # Derive row laterality from LT/RT or case laterality row_lat = (s.get("laterality") or "").strip().lower() if not row_lat: if isinstance(mods, list) and "LT" in mods: row_lat = "left" elif isinstance(mods, list) and "RT" in mods: row_lat = "right" elif case_lat in ("left", "right", "bilateral"): row_lat = case_lat conf_val = s.get("confidence") conf_out = round(float(conf_val), 2) if isinstance(conf_val, (int, float)) else (conf_val or "") rows.append({ "CPT": s.get("cpt", ""), "Description": s.get("desc", ""), "Rationale": s.get("rationale", ""), "Confidence": conf_out, "Primary": "✓" if s.get("primary") else "", "Category": s.get("category", ""), "Laterality": row_lat, "Units": s.get("units", 1), }) # Normalize levels segs: List[str] = [] inters_list: List[str] = [] lvl_lat = "" levels_obj = result.get("levels") if isinstance(levels_obj, dict): segs = list(levels_obj.get("segments") or []) inters_list = list(levels_obj.get("interspaces") or []) lvl_lat = levels_obj.get("laterality", "") or "" elif isinstance(levels_obj, list): segs = [str(x) for x in levels_obj] # Normalize flags flags_obj = result.get("flags") if isinstance(flags_obj, list): flags_list = [str(x) for x in flags_obj] elif isinstance(flags_obj, dict): flags_list = [k for k, v in flags_obj.items() if v] else: flags_list = [] meta = { "payer": result.get("payer", ""), "region": result.get("region", ""), "laterality": result.get("laterality", "") or lvl_lat, "levels_segments": ", ".join(segs), "levels_interspaces": ( str(result.get("interspaces_est", "")) if "interspaces_est" in result else ", ".join(inters_list) ), "flags": ", ".join(sorted(flags_list)), "build": result.get("build", ""), "mode": result.get("mode", ""), "core_path": _core_path(), } df = EMPTY_SUGG_DF if not rows else pd.DataFrame(rows) if not df.empty: for col in SUGG_COLS: if col not in df.columns: df[col] = "" df = df[SUGG_COLS] return df, meta def _case_mods_df(result: Dict[str, Any]) -> pd.DataFrame: mods = result.get("case_modifiers", []) or [] if not mods: return EMPTY_MODS_DF.copy() return pd.DataFrame([{"modifier": f"-{m.get('modifier','')}", "reason": m.get("reason","")} for m in mods]) def _summary_md(meta: Dict[str, Any]) -> str: chips = [] if meta.get("region"): chips.append(f"`Region: {meta['region']}`") if meta.get("laterality"): chips.append(f"`Laterality: {meta['laterality']}`") if meta.get("levels_segments"): chips.append(f"`Segments: {meta['levels_segments']}`") if meta.get("levels_interspaces"): chips.append(f"`Interspaces: {meta['levels_interspaces']}`") if meta.get("flags"): chips.append(f"`Flags: {meta['flags']}`") if meta.get("build"): chips.append(f"`Build: {meta['build']}`") if meta.get("mode"): chips.append(f"`Mode: {meta['mode']}`") try: core_base = os.path.basename(meta.get("core_path","")) if meta.get("core_path") else "" if core_base: chips.append(f"`Core: {core_base}`") except Exception: pass return " ".join(chips) if chips else "—" def new_session() -> str: return str(uuid.uuid4())[:8] # ==== Core actions ============================================================ def run_inference(note: str, payer: str, top_k: int, session_id: str): if not note.strip(): return ( EMPTY_SUGG_DF, EMPTY_MODS_DF.copy(), "—", "", "", session_id ) _append_log(session_id, {"event": "request", "payer": payer, "top_k": top_k, "note": note}) try: result = suggest_with_cpt_billing(note=note, payer=payer, top_k=top_k) if DEBUG: print("[DEBUG] build/region/laterality/flags:", result.get("build"), result.get("region"), result.get("laterality"), result.get("flags")) except Exception as e: tb = traceback.format_exc() _append_log(session_id, {"event": "error", "error": repr(e), "traceback": tb}) warn = f"⚠️ Error: {e}" if DEBUG: warn += f"\n\n```traceback\n{tb}\n```" return ( EMPTY_SUGG_DF, EMPTY_MODS_DF.copy(), "—", "", warn, session_id ) sugg_df, meta = _coalesce_rows(result) case_mods_df = _case_mods_df(result) summary = _summary_md(meta) json_pretty = json.dumps(result, indent=2, ensure_ascii=False) _append_log(session_id, { "event": "response", "meta": { **meta, "case_modifiers": ", ".join([f"-{m}" for m in [cm.get("modifier","") for cm in (result.get("case_modifiers") or [])] if m]) or "" }, "rows_len": int(len(sugg_df) if hasattr(sugg_df, "__len__") else 0) }) return sugg_df, case_mods_df, summary, json_pretty, "", session_id def record_feedback(session_id: str, vote: str, text: str): if not vote and not text: return "Please choose 👍/👎 or add a short note." _append_log(session_id, {"event": "feedback", "vote": vote, "text": text}) return "Thanks! Your feedback was recorded." def do_export(session_id: str): path = export_session(session_id) _append_log(session_id, {"event": "export", "path": path}) return path def on_clear(): return ( "", "Medicare", 10, EMPTY_SUGG_DF, EMPTY_MODS_DF.copy(), "—", "", "", new_session() ) # ==== Diagnostics / Probe ===================================================== def run_probe(session_id: str) -> Tuple[str, str]: """Run a live diagnostic: show core path & build and 3 smoke tests.""" info_lines = [] try: core_path = _core_path() tests = [ ("Implicit TLIF", "Left facetectomy L4–L5 with PEEK interbody cage and pedicle screws; rods secured.", ["22633"], # expect ), ("ACDF chain w/ plate", "Discectomies at C4–C5, C5–C6, and C6–C7. Interbody cages and anterior plate spanning C4–C7.", ["22551","22552","22846"], ), ("Exposure-only", "Anterior exposure of L4–S1 performed by vascular surgeon for access. No fusion performed.", ["00000"], ), ] # Run once to get build probe = suggest_with_cpt_billing(tests[1][1], payer="Medicare", top_k=10) build = probe.get("build","") info_lines.append(f"**Core path:** `{core_path}` \n**Build:** `{build}`") # Execute each test for label, note, expect_codes in tests: res = suggest_with_cpt_billing(note, payer="Medicare", top_k=10) codes = [s.get("cpt") for s in (res.get("suggestions") or [])] ok = all(any(ec == c for c in codes) for ec in expect_codes) info_lines.append(f"- **{label}** → codes: `{', '.join(codes) or '∅'}` — **{'PASS' if ok else 'CHECK'}**") md = "\n".join(info_lines) _append_log(session_id, {"event":"probe","details": md}) return md, "" except Exception as e: tb = traceback.format_exc() _append_log(session_id, {"event":"probe_error","error":repr(e),"traceback":tb}) return "", f"⚠️ Probe failed: {e}" def run_live_log_selftest(session_id: str) -> Tuple[str, str]: """Attempt a small append to dataset repo to verify live logging config.""" try: entry = { "event": "selftest", "note": "hello-from-selftest", "ts": _utcnow_iso(), "session_id": session_id, } _push_log_line_to_repo(entry) msg = ( f"✅ Live log push attempted.\n" f"- enable={LOG_PUSH_ENABLE}\n" f"- repo={HF_TARGET_REPO}\n" f"- type={HF_REPO_TYPE}\n" f"- token={'set' if HF_WRITE_TOKEN else 'missing'}\n" f"- path=logs-live/{datetime.utcnow().strftime('%Y-%m-%d')}.jsonl" ) _append_log(session_id, {"event":"selftest", "details":"manual push executed"}) return msg, "" except Exception as e: tb = traceback.format_exc() _append_log(session_id, {"event":"selftest_error","error":repr(e),"traceback":tb}) return "", f"⚠️ Self-test failed: {e}" # ==== Examples ================================================================ EXAMPLES = [ ["Left-sided TLIF L4–L5 with pedicle screws and interbody cage; posterolateral fusion performed. Navigation used.", "Medicare", 10], ["ACDF C5–C6 with PEEK cage and anterior plate; microscope and neuromonitoring used.", "Medicare", 10], ["Posterior cervical foraminotomy right C6–C7; no fusion or instrumentation.", "Medicare", 10], ["ALIF L5–S1 with structural allograft; non-segmental instrumentation placed.", "Medicare", 10], ["Removal of posterior segmental instrumentation T10–L2; no new hardware placed.", "Medicare", 10], # Case-modifier smoke tests: ["TLIF L4–L5 was initiated but aborted midway due to neuromonitoring changes.", "Medicare", 10], # -53 ["Bilateral decompression and foraminotomy at L4–L5 and L5–S1.", "Medicare", 10], # -50 ["Assistant surgeon present; resident not available.", "Medicare", 10], # -82 ["Complex exposure with severe deformity and adhesiolysis.", "Medicare", 10], # -22 ] # ==== Theme / CSS ============================================================= THEME = gr.themes.Soft( primary_hue="indigo", secondary_hue="blue", neutral_hue="slate", ).set( body_text_color="#0f172a", background_fill_primary="#ffffff", button_primary_background_fill="#4f46e5", input_background_fill="#ffffff", ) CUSTOM_CSS = """ :root { --radius-lg: 16px; } .gradio-container { font-family: ui-sans-serif, system-ui, -apple-system; } /* Header card */ .header-card { border-radius: 18px; padding: 18px; border: 1px solid #1f2937; background: linear-gradient(180deg,#0f172a,#0b1220); color: #e5e7eb; } .badge-row code { margin-right: 8px; border-radius: 12px; padding: 2px 8px; background: #111827; color: #e5e7eb; } /* Table container */ .table-wrap { max-height: 520px; overflow: auto; } /* Suggestions table target */ #suggestions_table .dataframe { font-size: 15px; width: 100% !important; table-layout: auto !important; border-collapse: collapse; } #suggestions_table .dataframe th, #suggestions_table .dataframe td { white-space: normal; word-wrap: break-word; text-align: left; vertical-align: top; padding: 10px 12px; } /* Column sizing (1-indexed): 1=CPT, 2=Description, 3=Rationale, 4=Confidence, 5=Primary, 6=Category, 7=Laterality, 8=Units */ /* CPT — wider & no wrap, monospace, centered */ #suggestions_table .dataframe th:nth-child(1), #suggestions_table .dataframe td:nth-child(1) { min-width: 120px; max-width: 140px; white-space: nowrap !important; font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", monospace; text-align: center; } /* Description — roomy */ #suggestions_table .dataframe th:nth-child(2), #suggestions_table .dataframe td:nth-child(2) { min-width: 360px; max-width: 560px; } /* Rationale — roomy */ #suggestions_table .dataframe th:nth-child(3), #suggestions_table .dataframe td:nth-child(3) { min-width: 320px; max-width: 520px; } /* Category — a bit wider */ #suggestions_table .dataframe th:nth-child(6), #suggestions_table .dataframe td:nth-child(6) { min-width: 180px; } /* Keep tiny columns compact */ #suggestions_table .dataframe th:nth-child(4), #suggestions_table .dataframe td:nth-child(4), #suggestions_table .dataframe th:nth-child(5), #suggestions_table .dataframe td:nth-child(5), #suggestions_table .dataframe th:nth-child(7), #suggestions_table .dataframe td:nth-child(7), #suggestions_table .dataframe th:nth-child(8), #suggestions_table .dataframe td:nth-child(8) { min-width: 90px; } .footer-note { color:#94a3b8; font-size:12px; } """ # ==== App Layout ============================================================== with gr.Blocks(theme=THEME, css=CUSTOM_CSS, title="Spine Coder — CPT Billing") as demo: session_id = gr.State(new_session()) with gr.Row(): with gr.Column(): gr.Markdown("### 🦴 Spine Coder — CPT Billing & Operative Note NLP") gr.Markdown( '
Structured CPT suggestions from spine operative notes — with payer-aware ' 'modifiers, laterality detection, and rationales.
' 'No PHI is stored; inputs are session-scoped and ephemeral.
' ) with gr.Row(): # Inputs with gr.Column(scale=5): note_in = gr.Textbox( label="Operative Note", placeholder="Paste an operative note here…", lines=14, autofocus=True, ) with gr.Row(): payer_dd = gr.Dropdown(PAYER_CHOICES, value="Medicare", label="Payer") topk = gr.Slider(1, 15, value=10, step=1, label="Top-K suggestions") with gr.Row(): run_btn = gr.Button("Analyze Note", variant="primary") clear_btn = gr.Button("Clear") with gr.Accordion("Quick Examples", open=False): gr.Examples( examples=EXAMPLES, inputs=[note_in, payer_dd, topk], label="Click a row to load an example" ) with gr.Accordion("Feedback", open=False): fb_choice = gr.Radio(choices=["👍", "👎"], label="Was this helpful?") fb_text = gr.Textbox(label="Optional comment", lines=2, placeholder="Tell us what worked or what missed…") fb_submit = gr.Button("Submit Feedback") fb_status = gr.Markdown("") with gr.Accordion("Session", open=False): sid_show = gr.Textbox(label="Session ID", value="", interactive=False) export_btn = gr.Button("Export Session as JSON") export_file = gr.File(label="Download", interactive=False) with gr.Accordion("Diagnostics", open=False): probe_btn = gr.Button("Run Probe (core path + 3 tests)") probe_md = gr.Markdown("") selftest_btn = gr.Button("Test Live Log Push") selftest_md = gr.Markdown("") # Results with gr.Column(scale=7): gr.Markdown("#### Results") summary_md = gr.Markdown("—", elem_classes=["badge-row"]) # Suggestions table (no modifier columns) table = gr.Dataframe( value=EMPTY_SUGG_DF, label="CPT Suggestions", interactive=False, row_count=(0, "dynamic"), wrap=True, elem_classes=["table-wrap"], elem_id="suggestions_table", ) # Case-level modifiers table gr.Markdown("### Case Modifiers (visit-level)") case_mods_table = gr.Dataframe( value=EMPTY_MODS_DF.copy(), headers=["modifier","reason"], interactive=False, wrap=True, label="Case Modifiers", ) with gr.Accordion("Raw JSON", open=False): json_out = gr.Code(language="json", value="", interactive=False) warn_md = gr.Markdown("") # ---- Events / Wiring ---- def _on_load(): sid = new_session() return sid, sid demo.load(_on_load, outputs=[session_id, sid_show]) run_inputs = [note_in, payer_dd, topk, session_id] run_outputs = [table, case_mods_table, summary_md, json_out, warn_md, session_id] run_btn.click(run_inference, inputs=run_inputs, outputs=run_outputs) note_in.submit(run_inference, inputs=run_inputs, outputs=run_outputs) clear_btn.click( on_clear, outputs=[note_in, payer_dd, topk, table, case_mods_table, summary_md, json_out, warn_md, session_id] ) fb_submit.click(record_feedback, inputs=[session_id, fb_choice, fb_text], outputs=fb_status) export_btn.click(do_export, inputs=[session_id], outputs=[export_file]) probe_btn.click(run_probe, inputs=[session_id], outputs=[probe_md, warn_md]) selftest_btn.click(run_live_log_selftest, inputs=[session_id], outputs=[selftest_md, warn_md]) if __name__ == "__main__": # If running locally: set server_name to 0.0.0.0 for external access; Space ignores. demo.launch()