import gradio as gr import asyncio import httpx import tempfile import os import requests import time import threading import json import re import csv import html import markdown import base64 from datetime import datetime, timedelta # Hugging Face Hub storage for evaluations try: from hf_storage import save_to_huggingface HF_STORAGE_AVAILABLE = True except ImportError: HF_STORAGE_AVAILABLE = False import shutil import zipfile # Optional Whisper ASR support (for local use); not required on Hugging Face Space try: import whisper # type: ignore WHISPER_AVAILABLE = True except ImportError: whisper = None # type: ignore WHISPER_AVAILABLE = False # ICD-10 code to description mapping (common codes) ICD10_DESCRIPTIONS = { "I130": "Hypertensive heart and chronic kidney disease", "I5033": "Acute on chronic diastolic heart failure", "E872": "Hypokalemia", "N184": "Chronic kidney disease, stage 4", "E1122": "Type 2 diabetes with chronic kidney disease", "N2581": "Secondary hyperparathyroidism of renal origin", "I2510": "Atherosclerotic heart disease without angina", "E11319": "Type 2 diabetes with unspecified diabetic retinopathy", "D6489": "Other specified anemias", "E785": "Hyperlipidemia, unspecified", "Z955": "Presence of coronary angioplasty implant and graft", "Z86718": "Personal history of other venous thrombosis", "I252": "Old myocardial infarction", "Z2239": "Encounter for screening for other suspected endocrine disorder", "G4700": "Insomnia, unspecified", "M1A9XX0": "Chronic gout, unspecified, without tophus", "R0902": "Hypoxemia", "E1151": "Type 2 diabetes with diabetic peripheral angiopathy without gangrene", "Z794": "Long term use of insulin", "E669": "Obesity, unspecified", "Z6831": "Body mass index 31.0-31.9, adult", "V4571": "Renal dialysis status", "I10": "Essential hypertension", "E119": "Type 2 diabetes without complications", "J449": "Chronic obstructive pulmonary disease, unspecified", "N18": "Chronic kidney disease", } # MIMIC-IV Lab Test Item ID to name mapping (common tests) LAB_TEST_NAMES = { "50934": "H-Hematocrit", "50947": "I-Ionized Calcium", "51678": "L-Lymphocytes", "50868": "Anion Gap", "50882": "Bicarbonate", "50912": "Creatinine", "50971": "Potassium", "50983": "Sodium", "51006": "Urea Nitrogen", "50902": "Chloride", "50931": "Glucose", "51221": "Hematocrit", "51222": "Hemoglobin", "51265": "Platelet Count", "51301": "White Blood Cells", "50820": "pH", "50821": "pO2", "50818": "pCO2", "51237": "INR", "51274": "PT", "51275": "PTT", "50813": "Lactate", "50960": "Magnesium", "50970": "Phosphate", "50893": "Calcium Total", } session = requests.Session() if WHISPER_AVAILABLE: model = whisper.load_model("base") else: model = None base_url = "http://localhost:8080" timeout = 60 concurrency_count=10 def start_backend_server(): """Start the Flask backend (src.server) in a background thread for Spaces.""" try: from src.server import create_app, configure_routes class Args: counselor_config_path = './src/configs/counselor_config.yaml' store_dir = './user_data' app = create_app() configure_routes(app, Args()) def run(): app.run(port=8080, host='0.0.0.0', debug=False) threading.Thread(target=run, daemon=True).start() print("Backend server started on http://localhost:8080") except Exception as e: print(f"Failed to start backend server: {e}") async def initialization(api_key, username): url = f"{base_url}/api/initialization" headers = {'Content-Type': 'application/json'} data = { 'api_key': api_key, 'username': username, } async with httpx.AsyncClient(timeout=timeout) as client: try: response = await client.post(url, json=data, headers=headers) if response.status_code == 200: return "Initialization successful." else: return f"Initialization failed: {response.text}" except asyncio.TimeoutError: print("The request timed out") return "Request timed out during initialization." except Exception as e: return f"Error in initialization: {str(e)}" # def fetch_default_prompts(chatbot_type): # url = f"{base_url}?chatbot_type={chatbot_type}" # try: # response = httpx.get(url, timeout=timeout) # if response.status_code == 200: # prompts = response.json() # print(prompts) # return prompts # else: # print(f"Failed to fetch prompts: {response.status_code} - {response.text}") # return {} # except Exception as e: # print(f"Error fetching prompts: {str(e)}") # return {} async def get_backend_response(api_key, patient_prompt, username, chatbot_type): url = f"{base_url}/responses/doctor" headers = {'Content-Type': 'application/json'} data = { 'username': username, 'patient_prompt': patient_prompt, 'chatbot_type': chatbot_type } async with httpx.AsyncClient(timeout=timeout) as client: try: response = await client.post(url, json=data, headers=headers) if response.status_code == 200: response_data = response.json() return response_data else: return f"Failed to fetch response from backend: {response.text}" except Exception as e: return f"Error contacting backend service: {str(e)}" async def save_conversation_and_memory(username, chatbot_type): url = f"{base_url}/save/end_and_save" headers = {'Content-Type': 'application/json'} data = { 'username': username, 'chatbot_type': chatbot_type } async with httpx.AsyncClient(timeout=timeout) as client: try: response = await client.post(url, json=data, headers=headers) if response.status_code == 200: response_data = response.json() return response_data.get('message', 'Saving Error!') else: return f"Failed to save conversations and memory graph: {response.text}" except Exception as e: return f"Error contacting backend service: {str(e)}" async def get_conversation_histories(username, chatbot_type): url = f"{base_url}/save/download_conversations" headers = {'Content-Type': 'application/json'} data = { 'username': username, 'chatbot_type': chatbot_type } async with httpx.AsyncClient(timeout=timeout) as client: try: response = await client.post(url, json=data, headers=headers) if response.status_code == 200: conversation_data = response.json() return conversation_data else: return [] except Exception as e: return [] def download_conversations(username, chatbot_type): conversation_histories = asyncio.run(get_conversation_histories(username, chatbot_type)) files = [] temp_dir = tempfile.mkdtemp() for conversation_entry in conversation_histories: file_name = conversation_entry.get('file_name', f"Conversation_{len(files)+1}.txt") conversation = conversation_entry.get('conversation', []) conversation_text = "" for message_pair in conversation: if isinstance(message_pair, list) and len(message_pair) == 2: speaker, message = message_pair conversation_text += f"{speaker.capitalize()}: {message}\n\n" else: conversation_text += f"Unknown format: {message_pair}\n\n" temp_file_path = os.path.join(temp_dir, file_name) with open(temp_file_path, 'w') as temp_file: temp_file.write(conversation_text) files.append(temp_file_path) return files # async def get_biography(username, chatbot_type): # url = f"{base_url}/save/generate_autobiography" # headers = {'Content-Type': 'application/json'} # data = { # 'username': username, # 'chatbot_type': chatbot_type # } # async with httpx.AsyncClient(timeout=timeout) as client: # try: # response = await client.post(url, json=data, headers=headers) # if response.status_code == 200: # biography_data = response.json() # biography_text = biography_data.get('biography', '') # return biography_text # else: # return "Failed to generate biography." # except Exception as e: # return f"Error contacting backend service: {str(e)}" # def download_biography(username, chatbot_type): # biography_text = asyncio.run(get_biography(username, chatbot_type)) # if not biography_text or "Failed" in biography_text or "Error" in biography_text: # return gr.update(value=None, visible=False), gr.update(value=biography_text, visible=True) # temp_dir = tempfile.mkdtemp() # temp_file_path = os.path.join(temp_dir, "biography.txt") # with open(temp_file_path, 'w') as temp_file: # temp_file.write(biography_text) # return temp_file_path, gr.update(value=biography_text, visible=True) def transcribe_audio(audio_file): if not WHISPER_AVAILABLE or model is None: raise RuntimeError("Audio transcription model is not available in this deployment.") transcription = model.transcribe(audio_file)["text"] return transcription def submit_text_and_respond(edited_text, api_key, username, selected_title, history, chatbot_type): content = '' for opt in options: if opt['title'] == selected_title: content = opt['text'] if content == '': query_with_context = edited_text else: query_with_context = f'Given the following context information:\n {content}\n\n Answer the following question: {edited_text}' print(query_with_context) response = asyncio.run(get_backend_response(api_key, query_with_context, username, chatbot_type)) print('------') print(response) if isinstance(response, str): history.append((edited_text, response)) return history, "" doctor_response = response['doctor_response'] history.append((edited_text, doctor_response)) return history, "" def set_initialize_button(api_key_input, username_input): message = asyncio.run(initialization(api_key_input, username_input)) print(message) return message, api_key_input def save_conversation(username, chatbot_type): response = asyncio.run(save_conversation_and_memory(username, chatbot_type)) return response def start_recording(audio_file): if not audio_file: return "" try: transcription = transcribe_audio(audio_file) return transcription except Exception as e: return f"Failed to transcribe: {str(e)}" # Patient Sample Evaluation functions def save_patient_evaluation(patient_id, patient_input, ai_summary, rating, feedback, expert_name, categories, comments=None): """Save patient sample evaluation data to a JSON file""" timestamp = datetime.now().isoformat() # Handle comments - can be dict with hallucination_comments and critical_omission_comments, or old format if isinstance(comments, dict): hallucination_comments = comments.get('hallucination_comments', '') critical_omission_comments = comments.get('critical_omission_comments', '') else: hallucination_comments = '' critical_omission_comments = '' evaluation = { "timestamp": timestamp, "patient_id": patient_id, "expert_name": expert_name, "patient_input": patient_input, "ai_summary": ai_summary, "overall_rating": rating, "feedback": feedback, "categories": categories, "hallucination_comments": hallucination_comments, "critical_omission_comments": critical_omission_comments } # Create evaluations directory if it doesn't exist eval_dir = "patient_evaluations" if not os.path.exists(eval_dir): os.makedirs(eval_dir) # Save to JSON file eval_file = os.path.join(eval_dir, f"patient_eval_{patient_id}_{timestamp.replace(':', '-')}.json") with open(eval_file, 'w', encoding='utf-8') as f: json.dump(evaluation, f, ensure_ascii=False, indent=2) # Also append to a master CSV file for easier analysis csv_file = os.path.join(eval_dir, "patient_evaluations_master.csv") file_exists = os.path.isfile(csv_file) with open(csv_file, 'a', newline='', encoding='utf-8') as f: writer = csv.writer(f) if not file_exists: writer.writerow(['timestamp', 'patient_id', 'expert_name', 'overall_rating', 'clinical_accuracy', 'completeness_coverage', 'clinical_relevance', 'clarity_structure', 'reasoning_risk', 'actionability', 'hallucination', 'critical_omission', 'feedback', 'hallucination_comments', 'critical_omission_comments']) writer.writerow([ timestamp, patient_id, expert_name, rating, categories.get('clinical_accuracy', ''), categories.get('completeness_coverage', ''), categories.get('clinical_relevance', ''), categories.get('clarity_structure', ''), categories.get('reasoning_risk', ''), categories.get('actionability', ''), categories.get('hallucination', ''), categories.get('critical_omission', ''), feedback, hallucination_comments, critical_omission_comments ]) f.flush() os.fsync(f.fileno()) # Save to local files (for immediate access) # Also try to save to Hugging Face Hub if available save_messages = [] if HF_STORAGE_AVAILABLE: csv_row = [ timestamp, patient_id, expert_name, rating, categories.get('clinical_accuracy', ''), categories.get('completeness_coverage', ''), categories.get('clinical_relevance', ''), categories.get('clarity_structure', ''), categories.get('reasoning_risk', ''), categories.get('actionability', ''), categories.get('hallucination', ''), categories.get('critical_omission', ''), feedback, hallucination_comments, critical_omission_comments ] print(f"[HF Upload] Starting upload for patient {patient_id}...") try: success, msg = save_to_huggingface(evaluation, csv_row) print(f"[HF Upload] Result: success={success}, msg={msg}") if success: save_messages.append(f"Uploaded to Hugging Face Hub") else: save_messages.append(f"HF upload failed: {msg}") except Exception as e: import traceback error_detail = f"{str(e)}\n{traceback.format_exc()}" print(f"[HF Upload] Exception occurred: {error_detail}") save_messages.append(f"HF upload error: {str(e)}") else: print(f"[HF Upload] HF_STORAGE_AVAILABLE is False, skipping upload") # Combine messages base_msg = f"Patient evaluation saved successfully at {timestamp}" if save_messages: return f"{base_msg}\n" + "\n".join(save_messages) return base_msg def submit_patient_evaluation(patient_id, patient_input, ai_summary, overall_rating, clinical_accuracy, completeness_coverage, clinical_relevance, clarity_structure, reasoning_risk, actionability, hallucination, critical_omission, feedback, expert_name, hallucination_comments, critical_omission_comments): """Process and save the patient evaluation""" if not expert_name.strip(): error_message = "⚠️ ERROR: Please enter your Clinician ID before submitting evaluation." # Trigger JavaScript alert by including a special marker return f"__ALERT__{error_message}", feedback, overall_rating, clinical_accuracy, completeness_coverage, clinical_relevance, clarity_structure, reasoning_risk, actionability, hallucination, critical_omission, hallucination_comments, critical_omission_comments, expert_name # Check all ratings and collect missing ones rating_checks = { 'Overall Quality': overall_rating, 'Clinical Accuracy': clinical_accuracy, 'Completeness / Coverage': completeness_coverage, 'Clinical Relevance': clinical_relevance, 'Clarity and Structure': clarity_structure, 'Reasoning / Risk Stratification': reasoning_risk, 'Actionability': actionability, 'Hallucination': hallucination, 'Critical Omission': critical_omission } missing_ratings = [name for name, value in rating_checks.items() if value == 0] if missing_ratings: missing_list = "\n".join([f" • {name}" for name in missing_ratings]) error_message = f"⚠️ WARNING: Please rate all evaluation dimensions before submitting.\n\nMissing ratings:\n{missing_list}\n\nPlease provide ratings for all items above." # Trigger JavaScript alert by including a special marker return f"__ALERT__{error_message}", feedback, overall_rating, clinical_accuracy, completeness_coverage, clinical_relevance, clarity_structure, reasoning_risk, actionability, hallucination, critical_omission, hallucination_comments, critical_omission_comments, expert_name categories = { 'clinical_accuracy': clinical_accuracy, 'completeness_coverage': completeness_coverage, 'clinical_relevance': clinical_relevance, 'clarity_structure': clarity_structure, 'reasoning_risk': reasoning_risk, 'actionability': actionability, 'hallucination': hallucination, 'critical_omission': critical_omission } # Store hallucination and critical omission comments comments = { 'hallucination_comments': hallucination_comments.strip() if hallucination_comments else '', 'critical_omission_comments': critical_omission_comments.strip() if critical_omission_comments else '' } result = save_patient_evaluation(patient_id, patient_input, ai_summary, overall_rating, feedback, expert_name, categories, comments) # Reset form after successful submission (keep expert_name) return result, "", 0, 0, 0, 0, 0, 0, 0, 0, 0, "", "", expert_name def get_conversation_for_evaluation(history): """Get the last conversation pair for evaluation""" if not history or len(history) == 0: return "", "" last_conversation = history[-1] user_input = last_conversation[0] if len(last_conversation) > 0 else "" bot_response = last_conversation[1] if len(last_conversation) > 1 else "" return user_input, bot_response def export_patient_evaluations(): """Export patient evaluation data for analysis""" eval_dir = "patient_evaluations" csv_file = os.path.join(eval_dir, "patient_evaluations_master.csv") if not os.path.exists(csv_file): return None, "No patient evaluation data found." return csv_file, f"Patient evaluation data exported." def get_patient_evaluation_stats(): """Get basic statistics about patient evaluations""" eval_dir = "patient_evaluations" csv_file = os.path.join(eval_dir, "patient_evaluations_master.csv") if not os.path.exists(csv_file): return "No patient evaluation data available." try: import pandas as pd # Force reload from disk df = pd.read_csv(csv_file) if df.empty: return "No patient evaluation data available." total_evaluations = len(df) avg_overall_rating = df['overall_rating'].mean() if 'overall_rating' in df.columns else 0 avg_medical_accuracy = df['medical_accuracy'].mean() if 'medical_accuracy' in df.columns else 0 avg_completeness = df['completeness'].mean() if 'completeness' in df.columns else 0 expert_count = df['expert_name'].nunique() if 'expert_name' in df.columns else 0 patient_count = df['patient_id'].nunique() if 'patient_id' in df.columns else 0 total_sentence_comments = df['sentence_comments_count'].sum() if 'sentence_comments_count' in df.columns else 0 stats = f""" **Patient Evaluation Statistics** - **Total Evaluations**: {total_evaluations} - **Patients Evaluated**: {patient_count} - **Average Overall Rating**: {avg_overall_rating:.2f}/5 - **Average Medical Accuracy**: {avg_medical_accuracy:.2f}/5 - **Average Completeness**: {avg_completeness:.2f}/5 - **Number of Experts**: {expert_count} - **Total Sentence Comments**: {total_sentence_comments} - **Latest Evaluation**: {df['timestamp'].iloc[-1] if not df.empty else 'N/A'} """ return stats except ImportError: # Fallback if pandas is not available with open(csv_file, 'r', encoding='utf-8') as f: lines = f.readlines() total_evaluations = len(lines) - 1 # Subtract header return f"Total patient evaluations: {total_evaluations} (Install pandas for detailed stats)" except Exception as e: return f"Error reading patient evaluation data: {str(e)}" def create_backup_zip(): """Create a backup zip file of all evaluation data""" try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_filename = f"patient_evaluations_backup_{timestamp}.zip" with zipfile.ZipFile(backup_filename, 'w') as backup_zip: # Add CSV file if it exists csv_file = "patient_evaluations/patient_evaluations_master.csv" if os.path.exists(csv_file): backup_zip.write(csv_file, f"patient_evaluations_master_{timestamp}.csv") # Add JSON files eval_dir = "patient_evaluations" if os.path.exists(eval_dir): for filename in os.listdir(eval_dir): if filename.endswith('.json'): file_path = os.path.join(eval_dir, filename) backup_zip.write(file_path, f"json_files/{filename}") return backup_filename, f"Backup created successfully: {backup_filename}" except Exception as e: return None, f"Error creating backup: {str(e)}" def update_methods(chapter): return gr.update(choices=interview_protocols[chapter], value=interview_protocols[chapter][0]) # def update_memory_graph(memory_data): # table_data = [] # for node in memory_data: # table_data.append([ # node.get('date', ''), # node.get('topic', ''), # node.get('event_description', ''), # node.get('people_involved', '') # ]) # return table_data # def update_prompts(chatbot_display_name): # chatbot_type = display_to_value.get(chatbot_display_name, 'enhanced') # prompts = fetch_default_prompts(chatbot_type) # return ( # gr.update(value=prompts.get('system_prompt', '')), # gr.update(value=prompts.get('conv_instruction_prompt', '')), # gr.update(value=prompts.get('therapy_prompt', '')), # gr.update(value=prompts.get('autobio_generation_prompt', '')), # ) # def update_chatbot_type(chatbot_display_name): # chatbot_type = display_to_value.get(chatbot_display_name, 'enhanced') # return chatbot_type # CSS to keep the buttons small css = """ #start_button, #reset_button { padding: 4px 10px !important; font-size: 12px !important; width: auto !important; } /* Force scrollable areas for long text */ #patient_input_display textarea { max-height: 420px !important; overflow-y: auto !important; } #ai_summary_display { max-height: 420px !important; overflow-y: auto !important; } #ai_summary_markdown { max-height: 480px !important; overflow-y: auto !important; } """ # Add CSS for clickable summary sentences css = css + """ .sum-sent { cursor: pointer; padding: 2px 4px; border-radius: 3px; transition: background-color 0.2s; user-select: none; /* Prevent text selection */ -webkit-user-select: none; -moz-user-select: none; -ms-user-select: none; } .sum-sent:hover { background-color: #e0e0e0; text-decoration: underline; } .sum-sent:active { background-color: #b0b0b0; } """ # Add global JavaScript to fix form validation issues and ensure click handlers work global_js = """ """ with gr.Blocks() as app: # Add CSS via HTML component (Gradio 3.50.2+ compatibility) gr.HTML(f"") # Add JavaScript for form validation and alert monitoring gr.HTML(global_js) # Debug: Print HF storage status on startup print("="*60) print("HF Storage Status on Startup") print("="*60) print(f"HF_STORAGE_AVAILABLE: {HF_STORAGE_AVAILABLE}") if HF_STORAGE_AVAILABLE: try: from hf_storage import get_hf_storage, HF_AVAILABLE print(f"HF_AVAILABLE (in hf_storage): {HF_AVAILABLE}") storage = get_hf_storage() if storage: print(f"Storage repo_id: {storage.repo_id}") print(f"HF_TOKEN set: {'Yes' if os.getenv('HF_TOKEN') else 'No'}") print(f"HF_EVAL_REPO_ID set: {os.getenv('HF_EVAL_REPO_ID') or 'No'}") print(f"SPACE_ID: {os.getenv('SPACE_ID') or 'Not set'}") except Exception as e: print(f"Error checking storage: {e}") print("="*60) # In Spaces, start backend inside the same process start_backend_server() chatbot_type_state = gr.State('enhanced') api_key_state = gr.State() selected_title = gr.State() is_running = gr.State() target_timestamp = gr.State() # Load patient data globally def load_jsonl(filepath): data = [] def _read(path): with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line: data.append(json.loads(line)) try: _read(filepath) return data except FileNotFoundError: repo_dir = os.path.dirname(os.path.abspath(__file__)) local_prefix = "/Users/liuzijie/Desktop/chatbot-mimic-notes/" if isinstance(filepath, str) and filepath.startswith(local_prefix): relative_part = filepath[len(local_prefix):] alt_path = os.path.join(repo_dir, relative_part) if os.path.exists(alt_path): _read(alt_path) return data alt_path2 = os.path.join(repo_dir, os.path.basename(filepath)) if os.path.exists(alt_path2): _read(alt_path2) return data return [] # Safe JSON file loader def load_json(path): try: with open(path, 'r', encoding='utf-8') as f: return json.load(f) except Exception: return None # Load per admission data (optional; skip if missing in Space) # Prefer env var ADMISSION_JSONL; otherwise resolve relative to this file script_dir_for_adm = os.path.dirname(os.path.abspath(__file__)) default_adm_path = os.path.join(script_dir_for_adm, "per_admission_summaries", "llama-3.2-3b_per_admission.jsonl") jsonl_path = os.environ.get("ADMISSION_JSONL", default_adm_path) admission_data = load_jsonl(jsonl_path) if os.path.exists(jsonl_path) else [] # Group admissions by patient_id patient_admissions = {} for admission in admission_data: patient_id = admission['patient_id'] if patient_id not in patient_admissions: patient_admissions[patient_id] = [] patient_admissions[patient_id].append(admission) # Create options with admission tabs for each patient options = [] for patient_id, admissions in patient_admissions.items(): for i, admission in enumerate(admissions): admission_id = admission['hadm_id'] admit_time = admission['admittime'] summary = admission.get('summary', '') input_text = admission.get('input_text', '') # Create a unique title for each admission title = f"Patient {patient_id} - Admission {i+1} (ID: {admission_id})" options.append({ 'title': title, 'text': input_text, # Use the actual input text that was given to LLM 'note': summary, # AI generated summary 'patient_id': patient_id, 'admission_id': admission_id, 'admission_index': i+1, 'admit_time': admit_time, 'raw_data': admission.get('raw', {}) }) # ---------------------- # Load new summaries data (grouped by patient -> admissions) # Directory: /Users/liuzijie/Desktop/chatbot-mimic-notes/summaries # Filenames: subject_{patient}_hadm_{hadm}_model_input.txt and subject_{patient}_hadm_{hadm}_summary.txt # ---------------------- # Prefer env var, then path relative to this file (repo root) script_dir = os.path.dirname(os.path.abspath(__file__)) default_summaries_dir = os.path.join(script_dir, "summaries") summaries_dir = os.environ.get("SUMMARIES_DIR", default_summaries_dir) def load_text_file(path): try: with open(path, 'r', encoding='utf-8') as f: return f.read() except Exception: return "" date_regex = re.compile(r"(\d{4}-\d{2}-\d{2}(?:[ T]\d{2}:\d{2}(?::\d{2})?)?)") def extract_timestamp_from_text(text): if not text: return "" m = date_regex.search(text) return m.group(1) if m else "" # Clean summary output by removing JSON/object appendix and code fences def clean_summary_output(text): """Remove trailing JSON block or code fences from model output, and extract only markdown summary.""" if not text: return text # Remove reasoning tags text = re.sub(r'.*?', '', text, flags=re.DOTALL | re.IGNORECASE) # Extract only the markdown summary part (A) PREOP SUMMARY section) # Look for "A) PREOP" or "A) PREOP SUMMARY" and extract until "---" or "B) JSON" markdown_pattern = r'(A\)\s*PREOP.*?)(?:---|B\)\s*JSON)' match = re.search(markdown_pattern, text, re.DOTALL | re.IGNORECASE) if match: text = match.group(1).strip() # Remove the "A) PREOP SUMMARY" or "A) PREOP" heading line text = re.sub(r'^A\)\s*PREOP\s+SUMMARY\s*:?\s*', '', text, flags=re.IGNORECASE | re.MULTILINE) text = re.sub(r'^A\)\s*PREOP\s*:?\s*', '', text, flags=re.IGNORECASE | re.MULTILINE) text = re.sub(r'^A\)\s*PREOP\s+SUMMARY\s+with\s+headings\s*:?\s*', '', text, flags=re.IGNORECASE | re.MULTILINE) else: # Fallback: remove JSON markers if markdown pattern not found cut_markers = ['**JSON Object:**', '```json', '```', '\n{', 'B) JSON', '---'] for marker in cut_markers: if marker in text: text = text.split(marker)[0].strip() return text.strip() # Load from new JSONL format first (single admission per record) # IMPORTANT: Only load from Qwen JSONL, skip all other data sources jsonl_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'Qwen__Qwen2.5-7B-Instruct_io.jsonl') jsonl_loaded = False eval_data_by_patient = {} # Initialize empty to ensure clean start # FORCE: Only load from Qwen JSONL, completely skip all other data sources if os.path.exists(jsonl_path): print(f"[INFO] ========== LOADING FROM QWEN JSONL ONLY ==========") print(f"Loading data from JSONL: {jsonl_path}") jsonl_data = load_jsonl(jsonl_path) print(f"Found {len(jsonl_data)} entries in JSONL file") entry_count = 0 for entry in jsonl_data: entry_count += 1 full_id = entry.get('patient_id', '') or '' # Extract patient_id from format like "11318742_admission_29646478_input" # Only keep the patient_id part (e.g., "11318742") if '_admission_' in full_id: patient_id = full_id.split('_admission_')[0] # Extract admission_id from the remaining part (e.g., "29646478_input" -> "29646478") admission_part = full_id.split('_admission_', 1)[1] admission_id = admission_part.split('_')[0] if '_' in admission_part else admission_part else: # Fallback: if format doesn't match, try to extract just the numeric patient ID patient_id = full_id.split('_')[0] if '_' in full_id else full_id admission_id = '1' # Default admission ID # Get input and output fields (support multiple schema variants) # New Qwen JSONL uses 'patient_input' and 'output_text'; # older versions used 'input' and 'output'. input_text = ( entry.get('patient_input') or entry.get('input') or entry.get('patient_input_text') or '' ) raw_summary = ( entry.get('output') or entry.get('output_text') or entry.get('summary') or '' ) # Clean the output to extract only markdown summary summary = clean_summary_output(raw_summary) # Extract admission time from input text admittime = None if input_text: adm_match = re.search(r'Admission Time:\s*(\d{4}-\d{2}-\d{2}[\sT]\d{2}:\d{2}:\d{2})', input_text) if adm_match: admittime = adm_match.group(1) # Store data: input goes to patient raw data, cleaned output goes to AI summary patient_dict = eval_data_by_patient.setdefault(patient_id, {}) patient_dict[admission_id] = { 'patient_id': patient_id, 'admission_id': admission_id, 'admission_index': 1, 'input_text': input_text, # This will be shown in "Patient raw data from EHR" 'summary': summary, # This will be shown in "AI Generated Summary" (cleaned) 'admittime': admittime, 'timestamp': admittime, 'highlights': [] } jsonl_loaded = True print(f"Processed {entry_count} entries from JSONL") print(f"Loaded {len(eval_data_by_patient)} unique patients from JSONL") print(f"Patient IDs: {sorted(list(eval_data_by_patient.keys()))}") print(f"[INFO] JSONL loading complete. jsonl_loaded = {jsonl_loaded}") print(f"[INFO] ================================================") else: print(f"[WARNING] JSONL file not found: {jsonl_path}") print(f"[WARNING] Will attempt to load from fallback sources") # Fallback to old format if JSONL not found # IMPORTANT: Only load from fallback if JSONL was NOT loaded if (not jsonl_loaded) and os.path.exists(summaries_dir): print(f"[WARNING] ========== FALLBACK: Loading from summaries_dir ==========") print(f"[INFO] Loading from fallback summaries_dir: {summaries_dir}") print(f"[WARNING] This should NOT happen if Qwen JSONL exists!") files = os.listdir(summaries_dir) pattern = re.compile(r"^subject_(\d+)_hadm_(\d+)_(model_input|summary)\.txt$") for fname in files: match = pattern.match(fname) if not match: continue patient_id, hadm_id, kind = match.groups() patient_dict = eval_data_by_patient.setdefault(patient_id, {}) record = patient_dict.setdefault(hadm_id, { 'patient_id': patient_id, 'admission_id': hadm_id, 'input_text': "", 'summary': "", 'timestamp': "", 'admittime': None, 'highlights': [] }) full_path = os.path.join(summaries_dir, fname) if kind == 'model_input': record['input_text'] = load_text_file(full_path) # prefer embedded timestamp if present if not record.get('timestamp'): embedded = extract_timestamp_from_text(record['input_text']) if embedded: record['timestamp'] = embedded record['_input_mtime'] = os.path.getmtime(full_path) elif kind == 'summary': record['summary'] = load_text_file(full_path) if not record.get('timestamp'): embedded = extract_timestamp_from_text(record['summary']) if embedded: record['timestamp'] = embedded record['_summary_mtime'] = os.path.getmtime(full_path) # finalize timestamps: do NOT fallback to mtime; leave as None if not found in content for p_id, hadm_map in eval_data_by_patient.items(): for h_id, rec in hadm_map.items(): if not rec.get('timestamp'): rec['timestamp'] = None # cleanup internal fields if '_input_mtime' in rec: del rec['_input_mtime'] if '_summary_mtime' in rec: del rec['_summary_mtime'] # Merge per-admission structured JSONs (input_text, summary, admittime) # IMPORTANT: Only load if JSONL was NOT loaded (to avoid duplicate patients) per_adm_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'per_admission_results_30patient_2ad') if (not jsonl_loaded) and os.path.exists(per_adm_dir): print(f"[WARNING] ========== FALLBACK: Loading from per_admission_results_30patient_2ad ==========") print(f"[WARNING] JSONL not loaded, falling back to per_admission_results_30patient_2ad directory") print(f"[WARNING] This should NOT happen if Qwen JSONL exists!") print(f"[WARNING] jsonl_loaded flag = {jsonl_loaded}") for fname in os.listdir(per_adm_dir): if not fname.startswith('patient_') or not fname.endswith('.json') or fname == 'all_patients_per_admission.json': continue pjson = load_json(os.path.join(per_adm_dir, fname)) if not pjson: continue p_id = str(pjson.get('patient_id') or '') admissions = pjson.get('admissions', []) if not p_id: continue for idx, adm in enumerate(admissions, start=1): base_hadm_id = str(adm.get('hadm_id')) if not base_hadm_id: continue # Use composite key to disambiguate duplicates: {hadm_id}#{index} hadm_key = f"{base_hadm_id}#{idx}" patient_dict = eval_data_by_patient.setdefault(p_id, {}) rec = patient_dict.setdefault(hadm_key, { 'patient_id': p_id, 'admission_id': base_hadm_id, 'admission_index': idx, 'input_text': '', 'summary': '', 'timestamp': None, 'admittime': None, 'highlights': [] }) # Merge/override from structured JSON if not rec.get('input_text'): rec['input_text'] = adm.get('input_text', '') if not rec.get('summary'): rec['summary'] = adm.get('summary', '') if not rec.get('admittime'): rec['admittime'] = adm.get('admittime') or adm.get('raw_admission_data', {}).get('admittime') # Load trace-back results and attach matches (only if JSONL was loaded) # This only adds highlights, doesn't add new patients trace_back_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'trace_back_results_30patient_2ad') if jsonl_loaded and os.path.exists(trace_back_dir): print(f"[INFO] Loading trace-back results for existing patients only") for p_id, hadm_map in list(eval_data_by_patient.items()): tb_path = os.path.join(trace_back_dir, f"patient_{p_id}_trace_back.json") tb = load_json(tb_path) if not tb: continue tb_adms = tb.get('admissions', []) # Build mapping idx -> hadm_key for this patient (sorted by admission_index) idx_to_key = {} for hadm_key, rec in hadm_map.items(): idx = rec.get('admission_index') if idx: idx_to_key[int(idx)] = hadm_key for adm_entry in tb_adms: idx = int(adm_entry.get('admission_idx', 0)) + 1 hadm_key = idx_to_key.get(idx) if not hadm_key: continue rec = hadm_map.get(hadm_key) if not rec: continue cos_list = adm_entry.get('cosine_results', []) att_list = adm_entry.get('attention_results', []) # Split summary into lines to enumerate sentences (simple split by newline) summary_lines = [ln for ln in (rec.get('summary', '').split('\n')) if ln.strip()] sentence_map = {} for sidx, sline in enumerate(summary_lines): sentence_map[sidx] = { 'summary_sentence_idx': sidx, 'summary_sentence': sline, 'cosine_matches': [], 'attention_matches': [] } for item in cos_list: sidx = item.get('summary_sentence_idx') matches = item.get('top_10_matches', []) if sidx in sentence_map: sentence_map[sidx]['cosine_matches'] = matches for item in att_list: sidx = item.get('summary_sentence_idx') matches = item.get('top_10_matches', []) if sidx in sentence_map: sentence_map[sidx]['attention_matches'] = matches rec['highlights'] = [sentence_map[k] for k in sorted(sentence_map.keys())] # Precompute patient id list for UI (single admission per patient) patient_ids_list = sorted(list(eval_data_by_patient.keys())) print(f"[DEBUG] ========== FINAL DATA LOAD SUMMARY ==========") print(f"[DEBUG] JSONL loaded: {jsonl_loaded}") print(f"[DEBUG] Total patients in UI list: {len(patient_ids_list)}") print(f"[DEBUG] Patient IDs: {patient_ids_list}") # NOTE: Patient count can change (e.g., dataset expanded from 24 → 30). # Avoid hard-coding an expected number; rely on jsonl_loaded/fallback logs above. print(f"[DEBUG] Patient count check skipped (dynamic dataset size)") print(f"[DEBUG] ============================================") with gr.Tabs(): with gr.TabItem("Expert Evaluation"): # Load evaluation groups — always load if the file exists so the # Clinician dropdown and per-clinician filtering are active by default. eval_groups = {} eval_groups_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'eval_groups_5groups_pairwise.json') if os.path.exists(eval_groups_file): try: with open(eval_groups_file, 'r') as f: eval_groups_data = json.load(f) eval_groups = eval_groups_data.get('groups', {}) print(f"[INFO] Loaded eval groups from {eval_groups_file} ({len(eval_groups)} clinicians)") except Exception as e: print(f"[WARNING] Failed to load eval groups file {eval_groups_file}: {e}") eval_groups = {} with gr.Row(): with gr.Column(scale=1): # Patient Sample Selection for Evaluation with gr.Box(): gr.Markdown("## Select Patient Sample") # Clinician ID selection if eval_groups: gr.Markdown("### Evaluation Clinician ID") # eval_groups keys are already "Clinician_1", "Clinician_2", etc. clinician_choices = sorted(eval_groups.keys()) eval_group_dropdown = gr.Dropdown( choices=clinician_choices, label="Select your ID to see assigned patient cases", value=None, interactive=True ) group_info_display = gr.Markdown( value="", visible=True ) option_titles = [option["title"] for option in options] # Single patient selector (one admission per patient) # Default to empty list - patients will be shown only after clinician selects their ID patient_ids = [] if eval_groups else patient_ids_list eval_patient_radio = gr.Radio( choices=patient_ids, label="Choose Patient", interactive=True, value=None ) with gr.Column(scale=4): # Patient Sample Evaluation Module with gr.Box(): gr.Markdown("## 🏥 Patient Sample Evaluation Module") gr.Markdown("Evaluate AI-generated patient summaries based on patient data") with gr.Row(): expert_name_input = gr.Textbox( label="Clinician ID", placeholder="Enter your ID", scale=2 ) evaluation_status = gr.Textbox( label="Status", interactive=False, scale=1, elem_id="evaluation_status_box" ) # HTML component for alert popup (visible when there's content) alert_popup = gr.HTML( value="", visible=True, elem_id="alert_popup_container" ) # Patient Data Display (based on selected sample) gr.Markdown("### Patient Raw Data from EHR") with gr.Accordion("Patient raw data from EHR", open=True): patient_input_display = gr.HTML( value="Select a patient sample to view data", label="Original Patient Data", elem_id="patient_input_display" ) # AI Summary Display (scrollable) gr.Markdown("### AI Generated Summary") with gr.Accordion("🤖 AI Generated Summary", open=True): ai_summary_display = gr.Markdown( value="Select a patient to view AI summary...", label="AI Generated Summary", elem_id="ai_summary_display" ) # Overall Quality Rating gr.Markdown("⚠️ Please rate all dimensions below before submitting your evaluation.") gr.Markdown("### Overall evaluation (1 = Poor, 10 = Excellent)") gr.Markdown("*A global assessment integrating all dimensions (accuracy, completeness, relevance...)*") overall_rating = gr.Slider( minimum=1, maximum=10, step=1, label="Overall Quality", value=0 ) # Core Evaluation Dimensions gr.Markdown("### Core Evaluation Dimensions (1 = Poor, 10 = Excellent)") with gr.Row(): with gr.Column(): gr.Markdown("**Clinical Accuracy**: the extent to which the content is factually correct and consistent with the source EHR.") clinical_accuracy = gr.Slider( minimum=1, maximum=10, step=1, label="", value=0 ) with gr.Column(): gr.Markdown("**Completeness / Coverage**: whether all information required for a comprehensive preoperative evaluation is included.") completeness_coverage = gr.Slider( minimum=1, maximum=10, step=1, label="", value=0 ) with gr.Row(): with gr.Column(): gr.Markdown("**Clinical Relevance**: the degree to which the summary prioritizes information most important for preoperative decision-making.") clinical_relevance = gr.Slider( minimum=1, maximum=10, step=1, label="", value=0 ) with gr.Column(): gr.Markdown("**Clarity and Structure**: how easily clinicians can understand, navigate, and apply the summary in practice.") clarity_structure = gr.Slider( minimum=1, maximum=10, step=1, label="", value=0 ) with gr.Row(): with gr.Column(): gr.Markdown("**Reasoning / Risk Stratification**: accurate identification of perioperative risk factors, appropriate interpretation, and required follow-up actions.") reasoning_risk = gr.Slider( minimum=1, maximum=10, step=1, label="", value=0 ) with gr.Column(): gr.Markdown("**Actionability**: whether the summary supports clear clinical decisions and next steps.") actionability = gr.Slider( minimum=1, maximum=10, step=1, label="", value=0 ) # Critical Error Assessment gr.Markdown("### Critical Error Assessment") gr.Markdown("*Number of hallucinations and omission of critical information that could directly lead to adverse events*") with gr.Row(): hallucination = gr.Slider( minimum=1, maximum=10, step=1, label="Hallucination", value=0 ) critical_omission = gr.Slider( minimum=1, maximum=10, step=1, label="Critical Omission", value=0 ) # Hallucination and Critical Omission Comments gr.Markdown("*If critical errors were identified, please provide detail information below*") with gr.Row(): hallucination_comments = gr.Textbox( label="Hallucination", placeholder="Copy the relevant portion of the summary and provide your feedback directly underneath", lines=5, value="" ) critical_omission_comments = gr.Textbox( label="Critical Omission", placeholder="Copy the relevant portion of the summary and provide your feedback directly underneath", lines=5, value="" ) # Detailed feedback gr.Markdown("### Other Feedback and Comment") feedback_text = gr.Textbox( label="Other Feedback and Comment", placeholder="Please provide any additional feedback or comments...", lines=3 ) # Submit evaluation button submit_eval_button = gr.Button( "Submit Patient Evaluation", variant="primary", size="large" ) # Connect patient evaluation functionality def update_highlight_by_sentence(patient_id, admission_label, sentence_text, mode): """Highlight the matching input sentence for the selected summary sentence""" print(f"\n{'='*60}") print(f"[Highlight] Button clicked!") print(f"[Highlight] Patient ID: {patient_id}") print(f"[Highlight] Admission: {admission_label}") print(f"[Highlight] Mode: {mode}") print(f"[Highlight] Selected sentence: {sentence_text[:100] if sentence_text else 'None'}...") if not patient_id or not sentence_text: print(f"[Highlight] Missing required parameters") return gr.update() # No update # Single admission per patient: use first record pid = str(patient_id) recs = eval_data_by_patient.get(pid, {}) if not recs: print(f"[Highlight] No records for patient {pid}") return gr.update() hadm_key = list(recs.keys())[0] print(f"[Highlight] Using hadm_key: {hadm_key}") rec = recs.get(hadm_key) if not rec: print(f"[Highlight] Record not found for patient {patient_id}, hadm {hadm_key}") return gr.update() # Find the matching input sentence highlights = rec.get('highlights', []) print(f"[Highlight] Total highlights available: {len(highlights)}") target_sentence = "" # Try to find exact match target_matches = [] found_match = False for idx, h in enumerate(highlights): stored_sentence = h.get('summary_sentence', '').strip() if stored_sentence == sentence_text: found_match = True print(f"[Highlight] Found matching sentence at index {idx}") if mode == "Cosine Only": matches = h.get('cosine_matches', []) else: matches = h.get('attention_matches', []) print(f"[Highlight] Total matches: {len(matches)}") # Get all matches with similarity > 0 (top 10) if matches and len(matches) > 0: # Filter matches with similarity > 0 valid_matches = [m for m in matches if m.get('similarity', 0) > 0] print(f"[Highlight] Valid matches (similarity > 0): {len(valid_matches)}") if valid_matches: for i, m in enumerate(valid_matches[:5]): print(f"[Highlight] Match #{i+1}: similarity={m.get('similarity', 0):.4f}, sentence={m.get('input_sentence', '')[:60]}...") target_matches = valid_matches[:10] # Keep top 10 else: print(f"[Highlight] No matches found") break if not found_match: print(f"[Highlight] WARNING: Could not find exact match for selected sentence") print(f"[Highlight] Final target_matches count: {len(target_matches)}") # Get base text base = rec.get('input_text', '') if not base: return gr.update(value="
No input data
") print(f"[Highlight] Input text length: {len(base)} chars") print(f"[Highlight] Input text first 100 chars: {base[:100]}") # Escape HTML html_text = base.replace('&', '&').replace('<', '<').replace('>', '>') # Apply highlights for all top 10 matches if target_matches: import re print(f"[Highlight] Applying highlights to {len(target_matches)} matches") # Sort by similarity (highest first) to highlight in order sorted_matches = sorted(target_matches, key=lambda x: x.get('similarity', 0), reverse=True) # Define color intensity based on rank highlighted_count = 0 for idx, match in enumerate(sorted_matches): sentence = match.get('input_sentence', '') similarity = match.get('similarity', 0) if not sentence: print(f"[Highlight] Match #{idx+1}: Empty sentence, skipping") continue # Check if sentence exists in original text BEFORE escaping if sentence not in base: print(f"[Highlight] Match #{idx+1}: Sentence not in original text") print(f"[Highlight] Looking for: {sentence[:80]}") # Try to find partial match sentence_parts = sentence.split('\n')[0] # Try first line if sentence_parts in base: print(f"[Highlight] Found partial match: {sentence_parts[:60]}") continue # Escape HTML in sentence escaped_sentence = sentence.replace('&', '&').replace('<', '<').replace('>', '>') # Determine color based on rank and similarity if idx == 0: # Top match: strong highlight bg_color = '#ffeb3b' if mode == "Cosine Only" else '#4caf50' opacity = '1.0' elif idx < 3: # Top 3: medium highlight bg_color = '#ffeb3b' if mode == "Cosine Only" else '#4caf50' opacity = '0.7' elif idx < 5: # Top 5: lighter highlight bg_color = '#ffeb3b' if mode == "Cosine Only" else '#4caf50' opacity = '0.5' else: # Top 10: very light highlight bg_color = '#ffeb3b' if mode == "Cosine Only" else '#4caf50' opacity = '0.3' # Create highlight with rank number esc = re.escape(escaped_sentence) rank_label = f'#{idx+1}' highlighted = f'{rank_label}{escaped_sentence}' # Replace first occurrence only before_count = html_text.count(escaped_sentence) html_text = re.sub(esc, highlighted, html_text, count=1, flags=re.IGNORECASE) after_count = html_text.count(escaped_sentence) if before_count > after_count: highlighted_count += 1 print(f"[Highlight] Match #{idx+1}: Applied highlight (similarity={similarity:.4f})") else: print(f"[Highlight] Match #{idx+1}: NOT FOUND in text (similarity={similarity:.4f})") print(f"[Highlight] Looking for: {escaped_sentence[:80]}...") print(f"[Highlight] Total highlights applied: {highlighted_count}/{len(target_matches)}") else: print(f"[Highlight] No target matches to highlight") # Wrap in scrollable div result = f'
{html_text}
' print(f"{'='*60}\n") return gr.update(value=result) def render_clickable_summary(summary_text, highlights_data=None): """Return summary text and list of sentences for Radio""" if not summary_text or summary_text == "Select a patient to view AI summary...": return summary_text, [] # Extract non-empty lines as sentence choices sentences = [ln.strip() for ln in summary_text.split('\n') if ln.strip()] return summary_text, sentences # End of render_clickable_summary def replace_icd_codes_and_lab_ids(text): """Replace ICD codes and Lab Item IDs with descriptions""" # Replace ICD-10 codes for code, description in ICD10_DESCRIPTIONS.items(): # Match patterns like "ICD-10: CODE" or "ICD-9: CODE" text = re.sub( rf'\b(ICD-10|ICD-9):\s*{re.escape(code)}\b', rf'\1: {code} ({description})', text, flags=re.IGNORECASE ) # Replace Lab Item IDs for item_id, test_name in LAB_TEST_NAMES.items(): # Match patterns like "Item ID 50934" text = re.sub( rf'\bItem ID\s+{item_id}\b', f'{test_name} (ID {item_id})', text ) return text def update_patient_eval_display(patient_id, admission_label=None): """ Return patient input HTML and summary markdown for the selected patient. Single-admission mode: always uses the first (only) record for that patient. """ try: if not patient_id: return "No patient selected", "Select a patient to view AI summary..." pid = str(patient_id) if pid not in eval_data_by_patient or not eval_data_by_patient[pid]: return "Patient data not found", "AI summary not found" # Single admission per patient first_key = list(eval_data_by_patient[pid].keys())[0] rec = eval_data_by_patient[pid][first_key] summary_text = clean_summary_output(rec.get('summary', '') or 'No summary available') input_text = rec.get('input_text', '') or 'No input data available' # Replace ICD codes and Lab IDs with descriptions input_text_with_descriptions = replace_icd_codes_and_lab_ids(input_text) sanitized_input = ( input_text_with_descriptions.replace('&', '&') .replace('<', '<') .replace('>', '>') ) input_html = ( '
'
                        f'{sanitized_input}'
                        '
' ) # Return raw markdown text; scrolling controlled by CSS on elem_id return input_html, summary_text except Exception as e: import traceback error_msg = f"Error loading patient data: {str(e)}\n{traceback.format_exc()}" print(f"[ERROR] {error_msg}") return "Error loading data", f"Error: {str(e)}" # Handle clinician ID selection to filter patient list if eval_groups: def update_patient_list_by_group(selected_clinician): """Filter patient list based on selected clinician ID and auto-fill clinician ID field""" if not selected_clinician: filtered_patients = [] info_text = "" clinician_id_update = gr.update() else: # Directly use selected_clinician as key (no mapping needed) group_data = eval_groups.get(selected_clinician, {}) group_patients = group_data.get('all_patients', []) # Filter: only show patients that are in both the group AND the loaded patient list # Convert to strings for comparison group_patients_str = [str(p) for p in group_patients] patient_ids_list_str = [str(p) for p in patient_ids_list] filtered_patients = [p for p in patient_ids_list_str if p in group_patients_str] print(f"[DEBUG] Clinician {selected_clinician}: group has {len(group_patients)} patients, {len(filtered_patients)} match loaded data") print(f"[DEBUG] Group patients (as strings): {group_patients_str}") print(f"[DEBUG] Loaded patients (as strings): {patient_ids_list_str}") print(f"[DEBUG] Filtered patients: {filtered_patients}") if len(filtered_patients) == 0: print(f"[ERROR] No patients matched! Check if patient IDs match between groups and loaded data.") info_text = f"**{selected_clinician}**: {len(filtered_patients)} patients assigned" clinician_id_update = gr.update(value=selected_clinician) return gr.update(choices=filtered_patients, value=None), info_text, clinician_id_update eval_group_dropdown.change( fn=update_patient_list_by_group, inputs=[eval_group_dropdown], outputs=[eval_patient_radio, group_info_display, expert_name_input] ) # When patient changes, update display directly (single admission per patient) def on_patient_change(patient_id): return update_patient_eval_display(patient_id) eval_patient_radio.change( fn=on_patient_change, inputs=[eval_patient_radio], outputs=[patient_input_display, ai_summary_display] ) # Helper function to create alert popup HTML def create_alert_popup(message): """Create HTML for a modal popup alert""" # Escape HTML special characters escaped_msg = message.replace('&', '&').replace('<', '<').replace('>', '>').replace('"', '"').replace("'", ''') # Convert newlines to
formatted_msg = escaped_msg.replace('\n', '
') return f"""

⚠️ Warning

{formatted_msg}
""" # Handle patient evaluation submission def submit_patient_eval_wrapper(selected_patient_id, overall_rating, clinical_accuracy, completeness_coverage, clinical_relevance, clarity_structure, reasoning_risk, actionability, hallucination, critical_omission, feedback_text, expert_name_input, hallucination_comments, critical_omission_comments): # Debug: check if data is loaded if not eval_data_by_patient: error_message = "⚠️ ERROR: No patient data loaded. Check if data files exist." alert_html = create_alert_popup(error_message.replace("__ALERT__", "").replace("⚠️ ERROR: ", "")) return f"__ALERT__{error_message}", "", 0, 0, 0, 0, 0, 0, 0, 0, 0, "", "", "", alert_html if not selected_patient_id: error_message = "⚠️ ERROR: Please select a patient first." alert_html = create_alert_popup(error_message.replace("__ALERT__", "").replace("⚠️ ERROR: ", "")) return f"__ALERT__{error_message}", "", 0, 0, 0, 0, 0, 0, 0, 0, 0, "", "", "", alert_html recs = eval_data_by_patient.get(selected_patient_id, {}) if not recs: error_message = "⚠️ ERROR: Patient data not found." alert_html = create_alert_popup(error_message.replace("__ALERT__", "").replace("⚠️ ERROR: ", "")) return f"__ALERT__{error_message}", "AI summary not found", "", 0, 0, 0, 0, 0, 0, 0, 0, 0, "", "", "", alert_html hadm_key = list(recs.keys())[0] rec = recs.get(hadm_key) patient_id = rec["patient_id"] admission_id = rec["admission_id"] patient_input = rec["input_text"] # Input text to LLM ai_summary = rec["summary"] # AI generated summary # Submit the evaluation with admission info print(f"[Submit] Starting evaluation submission for patient {patient_id}, admission {admission_id}") print(f"[Submit] Expert: {expert_name_input}, Overall Rating: {overall_rating}") try: result = submit_patient_evaluation( f"{patient_id}_adm_{admission_id}", patient_input, ai_summary, overall_rating, clinical_accuracy, completeness_coverage, clinical_relevance, clarity_structure, reasoning_risk, actionability, hallucination, critical_omission, feedback_text, expert_name_input, hallucination_comments, critical_omission_comments ) status_msg = result[0] if isinstance(result, (list, tuple)) and len(result) > 0 else "" # If this is an alert message, treat it as a failed submission in logs if isinstance(status_msg, str) and status_msg.startswith("__ALERT__"): print(f"[Submit] Evaluation NOT submitted, alert returned: {status_msg}") # Extract alert message and create popup alert_message = status_msg.replace("__ALERT__", "").strip() alert_html = create_alert_popup(alert_message) # Return with alert popup return result[0], result[1], result[2], result[3], result[4], result[5], result[6], result[7], result[8], result[9], result[10], result[11], result[12], result[13], alert_html else: print(f"[Submit] Evaluation submitted successfully: {status_msg}") # Return without alert popup (empty HTML) return result[0], result[1], result[2], result[3], result[4], result[5], result[6], result[7], result[8], result[9], result[10], result[11], result[12], result[13], "" except Exception as e: import traceback error_msg = f"⚠️ ERROR: Error submitting evaluation: {str(e)}" error_trace = traceback.format_exc() print(f"[Submit] Exception occurred: {error_msg}") print(f"[Submit] Traceback: {error_trace}") alert_html = create_alert_popup(error_msg.replace("__ALERT__", "").replace("⚠️ ERROR: ", "")) return f"__ALERT__{error_msg}", feedback_text, overall_rating, clinical_accuracy, completeness_coverage, clinical_relevance, clarity_structure, reasoning_risk, actionability, hallucination, critical_omission, hallucination_comments, critical_omission_comments, expert_name_input, alert_html submit_eval_button.click( fn=submit_patient_eval_wrapper, inputs=[ eval_patient_radio, overall_rating, clinical_accuracy, completeness_coverage, clinical_relevance, clarity_structure, reasoning_risk, actionability, hallucination, critical_omission, feedback_text, expert_name_input, hallucination_comments, critical_omission_comments ], outputs=[ evaluation_status, feedback_text, overall_rating, clinical_accuracy, completeness_coverage, clinical_relevance, clarity_structure, reasoning_risk, actionability, hallucination, critical_omission, hallucination_comments, critical_omission_comments, expert_name_input, alert_popup ] ) # Google Drive functionality removed - using local storage only # Google Drive functionality removed - no auth check needed app.queue() # Check if we want to deploy to Hugging Face Spaces for permanent hosting import sys if "--deploy" in sys.argv: print("To deploy permanently to Hugging Face Spaces:") print("1. Install: pip install huggingface_hub[cli]") print("2. Login: huggingface-cli login") print("3. Deploy: gradio deploy") print("4. This will give you a permanent https://yourname-appname.hf.space URL") # Check if running on Hugging Face Spaces hf_space = os.getenv("SPACE_ID") is not None if hf_space: # Running on Hugging Face Spaces app.launch( server_name="0.0.0.0", server_port=7860, show_error=True ) else: # Running locally app.launch( share=True, max_threads=10, server_name="0.0.0.0", # Allow external connections server_port=7860, show_error=True )