chatbot-mimic-notes / chatgpt.py
Jesse Liu
Fix: remove openai-whisper from requirements, make whisper optional in code
f8c1982
import gradio as gr
import asyncio
import httpx
import tempfile
import os
import requests
import time
import threading
import json
import re
import csv
import html
import markdown
import base64
from datetime import datetime, timedelta
# Hugging Face Hub storage for evaluations
try:
from hf_storage import save_to_huggingface
HF_STORAGE_AVAILABLE = True
except ImportError:
HF_STORAGE_AVAILABLE = False
import shutil
import zipfile
# Optional Whisper ASR support (for local use); not required on Hugging Face Space
try:
import whisper # type: ignore
WHISPER_AVAILABLE = True
except ImportError:
whisper = None # type: ignore
WHISPER_AVAILABLE = False
# ICD-10 code to description mapping (common codes)
ICD10_DESCRIPTIONS = {
"I130": "Hypertensive heart and chronic kidney disease",
"I5033": "Acute on chronic diastolic heart failure",
"E872": "Hypokalemia",
"N184": "Chronic kidney disease, stage 4",
"E1122": "Type 2 diabetes with chronic kidney disease",
"N2581": "Secondary hyperparathyroidism of renal origin",
"I2510": "Atherosclerotic heart disease without angina",
"E11319": "Type 2 diabetes with unspecified diabetic retinopathy",
"D6489": "Other specified anemias",
"E785": "Hyperlipidemia, unspecified",
"Z955": "Presence of coronary angioplasty implant and graft",
"Z86718": "Personal history of other venous thrombosis",
"I252": "Old myocardial infarction",
"Z2239": "Encounter for screening for other suspected endocrine disorder",
"G4700": "Insomnia, unspecified",
"M1A9XX0": "Chronic gout, unspecified, without tophus",
"R0902": "Hypoxemia",
"E1151": "Type 2 diabetes with diabetic peripheral angiopathy without gangrene",
"Z794": "Long term use of insulin",
"E669": "Obesity, unspecified",
"Z6831": "Body mass index 31.0-31.9, adult",
"V4571": "Renal dialysis status",
"I10": "Essential hypertension",
"E119": "Type 2 diabetes without complications",
"J449": "Chronic obstructive pulmonary disease, unspecified",
"N18": "Chronic kidney disease",
}
# MIMIC-IV Lab Test Item ID to name mapping (common tests)
LAB_TEST_NAMES = {
"50934": "H-Hematocrit",
"50947": "I-Ionized Calcium",
"51678": "L-Lymphocytes",
"50868": "Anion Gap",
"50882": "Bicarbonate",
"50912": "Creatinine",
"50971": "Potassium",
"50983": "Sodium",
"51006": "Urea Nitrogen",
"50902": "Chloride",
"50931": "Glucose",
"51221": "Hematocrit",
"51222": "Hemoglobin",
"51265": "Platelet Count",
"51301": "White Blood Cells",
"50820": "pH",
"50821": "pO2",
"50818": "pCO2",
"51237": "INR",
"51274": "PT",
"51275": "PTT",
"50813": "Lactate",
"50960": "Magnesium",
"50970": "Phosphate",
"50893": "Calcium Total",
}
session = requests.Session()
if WHISPER_AVAILABLE:
model = whisper.load_model("base")
else:
model = None
base_url = "http://localhost:8080"
timeout = 60
concurrency_count=10
def start_backend_server():
"""Start the Flask backend (src.server) in a background thread for Spaces."""
try:
from src.server import create_app, configure_routes
class Args:
counselor_config_path = './src/configs/counselor_config.yaml'
store_dir = './user_data'
app = create_app()
configure_routes(app, Args())
def run():
app.run(port=8080, host='0.0.0.0', debug=False)
threading.Thread(target=run, daemon=True).start()
print("Backend server started on http://localhost:8080")
except Exception as e:
print(f"Failed to start backend server: {e}")
async def initialization(api_key, username):
url = f"{base_url}/api/initialization"
headers = {'Content-Type': 'application/json'}
data = {
'api_key': api_key,
'username': username,
}
async with httpx.AsyncClient(timeout=timeout) as client:
try:
response = await client.post(url, json=data, headers=headers)
if response.status_code == 200:
return "Initialization successful."
else:
return f"Initialization failed: {response.text}"
except asyncio.TimeoutError:
print("The request timed out")
return "Request timed out during initialization."
except Exception as e:
return f"Error in initialization: {str(e)}"
# def fetch_default_prompts(chatbot_type):
# url = f"{base_url}?chatbot_type={chatbot_type}"
# try:
# response = httpx.get(url, timeout=timeout)
# if response.status_code == 200:
# prompts = response.json()
# print(prompts)
# return prompts
# else:
# print(f"Failed to fetch prompts: {response.status_code} - {response.text}")
# return {}
# except Exception as e:
# print(f"Error fetching prompts: {str(e)}")
# return {}
async def get_backend_response(api_key, patient_prompt, username, chatbot_type):
url = f"{base_url}/responses/doctor"
headers = {'Content-Type': 'application/json'}
data = {
'username': username,
'patient_prompt': patient_prompt,
'chatbot_type': chatbot_type
}
async with httpx.AsyncClient(timeout=timeout) as client:
try:
response = await client.post(url, json=data, headers=headers)
if response.status_code == 200:
response_data = response.json()
return response_data
else:
return f"Failed to fetch response from backend: {response.text}"
except Exception as e:
return f"Error contacting backend service: {str(e)}"
async def save_conversation_and_memory(username, chatbot_type):
url = f"{base_url}/save/end_and_save"
headers = {'Content-Type': 'application/json'}
data = {
'username': username,
'chatbot_type': chatbot_type
}
async with httpx.AsyncClient(timeout=timeout) as client:
try:
response = await client.post(url, json=data, headers=headers)
if response.status_code == 200:
response_data = response.json()
return response_data.get('message', 'Saving Error!')
else:
return f"Failed to save conversations and memory graph: {response.text}"
except Exception as e:
return f"Error contacting backend service: {str(e)}"
async def get_conversation_histories(username, chatbot_type):
url = f"{base_url}/save/download_conversations"
headers = {'Content-Type': 'application/json'}
data = {
'username': username,
'chatbot_type': chatbot_type
}
async with httpx.AsyncClient(timeout=timeout) as client:
try:
response = await client.post(url, json=data, headers=headers)
if response.status_code == 200:
conversation_data = response.json()
return conversation_data
else:
return []
except Exception as e:
return []
def download_conversations(username, chatbot_type):
conversation_histories = asyncio.run(get_conversation_histories(username, chatbot_type))
files = []
temp_dir = tempfile.mkdtemp()
for conversation_entry in conversation_histories:
file_name = conversation_entry.get('file_name', f"Conversation_{len(files)+1}.txt")
conversation = conversation_entry.get('conversation', [])
conversation_text = ""
for message_pair in conversation:
if isinstance(message_pair, list) and len(message_pair) == 2:
speaker, message = message_pair
conversation_text += f"{speaker.capitalize()}: {message}\n\n"
else:
conversation_text += f"Unknown format: {message_pair}\n\n"
temp_file_path = os.path.join(temp_dir, file_name)
with open(temp_file_path, 'w') as temp_file:
temp_file.write(conversation_text)
files.append(temp_file_path)
return files
# async def get_biography(username, chatbot_type):
# url = f"{base_url}/save/generate_autobiography"
# headers = {'Content-Type': 'application/json'}
# data = {
# 'username': username,
# 'chatbot_type': chatbot_type
# }
# async with httpx.AsyncClient(timeout=timeout) as client:
# try:
# response = await client.post(url, json=data, headers=headers)
# if response.status_code == 200:
# biography_data = response.json()
# biography_text = biography_data.get('biography', '')
# return biography_text
# else:
# return "Failed to generate biography."
# except Exception as e:
# return f"Error contacting backend service: {str(e)}"
# def download_biography(username, chatbot_type):
# biography_text = asyncio.run(get_biography(username, chatbot_type))
# if not biography_text or "Failed" in biography_text or "Error" in biography_text:
# return gr.update(value=None, visible=False), gr.update(value=biography_text, visible=True)
# temp_dir = tempfile.mkdtemp()
# temp_file_path = os.path.join(temp_dir, "biography.txt")
# with open(temp_file_path, 'w') as temp_file:
# temp_file.write(biography_text)
# return temp_file_path, gr.update(value=biography_text, visible=True)
def transcribe_audio(audio_file):
if not WHISPER_AVAILABLE or model is None:
raise RuntimeError("Audio transcription model is not available in this deployment.")
transcription = model.transcribe(audio_file)["text"]
return transcription
def submit_text_and_respond(edited_text, api_key, username, selected_title, history, chatbot_type):
content = ''
for opt in options:
if opt['title'] == selected_title:
content = opt['text']
if content == '':
query_with_context = edited_text
else:
query_with_context = f'Given the following context information:\n {content}\n\n Answer the following question: {edited_text}'
print(query_with_context)
response = asyncio.run(get_backend_response(api_key, query_with_context, username, chatbot_type))
print('------')
print(response)
if isinstance(response, str):
history.append((edited_text, response))
return history, ""
doctor_response = response['doctor_response']
history.append((edited_text, doctor_response))
return history, ""
def set_initialize_button(api_key_input, username_input):
message = asyncio.run(initialization(api_key_input, username_input))
print(message)
return message, api_key_input
def save_conversation(username, chatbot_type):
response = asyncio.run(save_conversation_and_memory(username, chatbot_type))
return response
def start_recording(audio_file):
if not audio_file:
return ""
try:
transcription = transcribe_audio(audio_file)
return transcription
except Exception as e:
return f"Failed to transcribe: {str(e)}"
# Patient Sample Evaluation functions
def save_patient_evaluation(patient_id, patient_input, ai_summary, rating, feedback, expert_name, categories, comments=None):
"""Save patient sample evaluation data to a JSON file"""
timestamp = datetime.now().isoformat()
# Handle comments - can be dict with hallucination_comments and critical_omission_comments, or old format
if isinstance(comments, dict):
hallucination_comments = comments.get('hallucination_comments', '')
critical_omission_comments = comments.get('critical_omission_comments', '')
else:
hallucination_comments = ''
critical_omission_comments = ''
evaluation = {
"timestamp": timestamp,
"patient_id": patient_id,
"expert_name": expert_name,
"patient_input": patient_input,
"ai_summary": ai_summary,
"overall_rating": rating,
"feedback": feedback,
"categories": categories,
"hallucination_comments": hallucination_comments,
"critical_omission_comments": critical_omission_comments
}
# Create evaluations directory if it doesn't exist
eval_dir = "patient_evaluations"
if not os.path.exists(eval_dir):
os.makedirs(eval_dir)
# Save to JSON file
eval_file = os.path.join(eval_dir, f"patient_eval_{patient_id}_{timestamp.replace(':', '-')}.json")
with open(eval_file, 'w', encoding='utf-8') as f:
json.dump(evaluation, f, ensure_ascii=False, indent=2)
# Also append to a master CSV file for easier analysis
csv_file = os.path.join(eval_dir, "patient_evaluations_master.csv")
file_exists = os.path.isfile(csv_file)
with open(csv_file, 'a', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(['timestamp', 'patient_id', 'expert_name', 'overall_rating',
'clinical_accuracy', 'completeness_coverage', 'clinical_relevance', 'clarity_structure',
'reasoning_risk', 'actionability', 'hallucination', 'critical_omission',
'feedback', 'hallucination_comments', 'critical_omission_comments'])
writer.writerow([
timestamp, patient_id, expert_name, rating,
categories.get('clinical_accuracy', ''),
categories.get('completeness_coverage', ''),
categories.get('clinical_relevance', ''),
categories.get('clarity_structure', ''),
categories.get('reasoning_risk', ''),
categories.get('actionability', ''),
categories.get('hallucination', ''),
categories.get('critical_omission', ''),
feedback,
hallucination_comments,
critical_omission_comments
])
f.flush()
os.fsync(f.fileno())
# Save to local files (for immediate access)
# Also try to save to Hugging Face Hub if available
save_messages = []
if HF_STORAGE_AVAILABLE:
csv_row = [
timestamp, patient_id, expert_name, rating,
categories.get('clinical_accuracy', ''),
categories.get('completeness_coverage', ''),
categories.get('clinical_relevance', ''),
categories.get('clarity_structure', ''),
categories.get('reasoning_risk', ''),
categories.get('actionability', ''),
categories.get('hallucination', ''),
categories.get('critical_omission', ''),
feedback,
hallucination_comments,
critical_omission_comments
]
print(f"[HF Upload] Starting upload for patient {patient_id}...")
try:
success, msg = save_to_huggingface(evaluation, csv_row)
print(f"[HF Upload] Result: success={success}, msg={msg}")
if success:
save_messages.append(f"Uploaded to Hugging Face Hub")
else:
save_messages.append(f"HF upload failed: {msg}")
except Exception as e:
import traceback
error_detail = f"{str(e)}\n{traceback.format_exc()}"
print(f"[HF Upload] Exception occurred: {error_detail}")
save_messages.append(f"HF upload error: {str(e)}")
else:
print(f"[HF Upload] HF_STORAGE_AVAILABLE is False, skipping upload")
# Combine messages
base_msg = f"Patient evaluation saved successfully at {timestamp}"
if save_messages:
return f"{base_msg}\n" + "\n".join(save_messages)
return base_msg
def submit_patient_evaluation(patient_id, patient_input, ai_summary, overall_rating,
clinical_accuracy, completeness_coverage, clinical_relevance, clarity_structure,
reasoning_risk, actionability, hallucination, critical_omission,
feedback, expert_name, hallucination_comments, critical_omission_comments):
"""Process and save the patient evaluation"""
if not expert_name.strip():
error_message = "⚠️ ERROR: Please enter your Clinician ID before submitting evaluation."
# Trigger JavaScript alert by including a special marker
return f"__ALERT__{error_message}", feedback, overall_rating, clinical_accuracy, completeness_coverage, clinical_relevance, clarity_structure, reasoning_risk, actionability, hallucination, critical_omission, hallucination_comments, critical_omission_comments, expert_name
# Check all ratings and collect missing ones
rating_checks = {
'Overall Quality': overall_rating,
'Clinical Accuracy': clinical_accuracy,
'Completeness / Coverage': completeness_coverage,
'Clinical Relevance': clinical_relevance,
'Clarity and Structure': clarity_structure,
'Reasoning / Risk Stratification': reasoning_risk,
'Actionability': actionability,
'Hallucination': hallucination,
'Critical Omission': critical_omission
}
missing_ratings = [name for name, value in rating_checks.items() if value == 0]
if missing_ratings:
missing_list = "\n".join([f" • {name}" for name in missing_ratings])
error_message = f"⚠️ WARNING: Please rate all evaluation dimensions before submitting.\n\nMissing ratings:\n{missing_list}\n\nPlease provide ratings for all items above."
# Trigger JavaScript alert by including a special marker
return f"__ALERT__{error_message}", feedback, overall_rating, clinical_accuracy, completeness_coverage, clinical_relevance, clarity_structure, reasoning_risk, actionability, hallucination, critical_omission, hallucination_comments, critical_omission_comments, expert_name
categories = {
'clinical_accuracy': clinical_accuracy,
'completeness_coverage': completeness_coverage,
'clinical_relevance': clinical_relevance,
'clarity_structure': clarity_structure,
'reasoning_risk': reasoning_risk,
'actionability': actionability,
'hallucination': hallucination,
'critical_omission': critical_omission
}
# Store hallucination and critical omission comments
comments = {
'hallucination_comments': hallucination_comments.strip() if hallucination_comments else '',
'critical_omission_comments': critical_omission_comments.strip() if critical_omission_comments else ''
}
result = save_patient_evaluation(patient_id, patient_input, ai_summary,
overall_rating, feedback, expert_name, categories, comments)
# Reset form after successful submission (keep expert_name)
return result, "", 0, 0, 0, 0, 0, 0, 0, 0, 0, "", "", expert_name
def get_conversation_for_evaluation(history):
"""Get the last conversation pair for evaluation"""
if not history or len(history) == 0:
return "", ""
last_conversation = history[-1]
user_input = last_conversation[0] if len(last_conversation) > 0 else ""
bot_response = last_conversation[1] if len(last_conversation) > 1 else ""
return user_input, bot_response
def export_patient_evaluations():
"""Export patient evaluation data for analysis"""
eval_dir = "patient_evaluations"
csv_file = os.path.join(eval_dir, "patient_evaluations_master.csv")
if not os.path.exists(csv_file):
return None, "No patient evaluation data found."
return csv_file, f"Patient evaluation data exported."
def get_patient_evaluation_stats():
"""Get basic statistics about patient evaluations"""
eval_dir = "patient_evaluations"
csv_file = os.path.join(eval_dir, "patient_evaluations_master.csv")
if not os.path.exists(csv_file):
return "No patient evaluation data available."
try:
import pandas as pd
# Force reload from disk
df = pd.read_csv(csv_file)
if df.empty:
return "No patient evaluation data available."
total_evaluations = len(df)
avg_overall_rating = df['overall_rating'].mean() if 'overall_rating' in df.columns else 0
avg_medical_accuracy = df['medical_accuracy'].mean() if 'medical_accuracy' in df.columns else 0
avg_completeness = df['completeness'].mean() if 'completeness' in df.columns else 0
expert_count = df['expert_name'].nunique() if 'expert_name' in df.columns else 0
patient_count = df['patient_id'].nunique() if 'patient_id' in df.columns else 0
total_sentence_comments = df['sentence_comments_count'].sum() if 'sentence_comments_count' in df.columns else 0
stats = f"""
**Patient Evaluation Statistics**
- **Total Evaluations**: {total_evaluations}
- **Patients Evaluated**: {patient_count}
- **Average Overall Rating**: {avg_overall_rating:.2f}/5
- **Average Medical Accuracy**: {avg_medical_accuracy:.2f}/5
- **Average Completeness**: {avg_completeness:.2f}/5
- **Number of Experts**: {expert_count}
- **Total Sentence Comments**: {total_sentence_comments}
- **Latest Evaluation**: {df['timestamp'].iloc[-1] if not df.empty else 'N/A'}
"""
return stats
except ImportError:
# Fallback if pandas is not available
with open(csv_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
total_evaluations = len(lines) - 1 # Subtract header
return f"Total patient evaluations: {total_evaluations} (Install pandas for detailed stats)"
except Exception as e:
return f"Error reading patient evaluation data: {str(e)}"
def create_backup_zip():
"""Create a backup zip file of all evaluation data"""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_filename = f"patient_evaluations_backup_{timestamp}.zip"
with zipfile.ZipFile(backup_filename, 'w') as backup_zip:
# Add CSV file if it exists
csv_file = "patient_evaluations/patient_evaluations_master.csv"
if os.path.exists(csv_file):
backup_zip.write(csv_file, f"patient_evaluations_master_{timestamp}.csv")
# Add JSON files
eval_dir = "patient_evaluations"
if os.path.exists(eval_dir):
for filename in os.listdir(eval_dir):
if filename.endswith('.json'):
file_path = os.path.join(eval_dir, filename)
backup_zip.write(file_path, f"json_files/{filename}")
return backup_filename, f"Backup created successfully: {backup_filename}"
except Exception as e:
return None, f"Error creating backup: {str(e)}"
def update_methods(chapter):
return gr.update(choices=interview_protocols[chapter], value=interview_protocols[chapter][0])
# def update_memory_graph(memory_data):
# table_data = []
# for node in memory_data:
# table_data.append([
# node.get('date', ''),
# node.get('topic', ''),
# node.get('event_description', ''),
# node.get('people_involved', '')
# ])
# return table_data
# def update_prompts(chatbot_display_name):
# chatbot_type = display_to_value.get(chatbot_display_name, 'enhanced')
# prompts = fetch_default_prompts(chatbot_type)
# return (
# gr.update(value=prompts.get('system_prompt', '')),
# gr.update(value=prompts.get('conv_instruction_prompt', '')),
# gr.update(value=prompts.get('therapy_prompt', '')),
# gr.update(value=prompts.get('autobio_generation_prompt', '')),
# )
# def update_chatbot_type(chatbot_display_name):
# chatbot_type = display_to_value.get(chatbot_display_name, 'enhanced')
# return chatbot_type
# CSS to keep the buttons small
css = """
#start_button, #reset_button {
padding: 4px 10px !important;
font-size: 12px !important;
width: auto !important;
}
/* Force scrollable areas for long text */
#patient_input_display textarea {
max-height: 420px !important;
overflow-y: auto !important;
}
#ai_summary_display {
max-height: 420px !important;
overflow-y: auto !important;
}
#ai_summary_markdown {
max-height: 480px !important;
overflow-y: auto !important;
}
"""
# Add CSS for clickable summary sentences
css = css + """
.sum-sent {
cursor: pointer;
padding: 2px 4px;
border-radius: 3px;
transition: background-color 0.2s;
user-select: none; /* Prevent text selection */
-webkit-user-select: none;
-moz-user-select: none;
-ms-user-select: none;
}
.sum-sent:hover {
background-color: #e0e0e0;
text-decoration: underline;
}
.sum-sent:active {
background-color: #b0b0b0;
}
"""
# Add global JavaScript to fix form validation issues and ensure click handlers work
global_js = """
<script>
(function() {
// Fix form field attributes to resolve validation warnings
function fixFormFields() {
// Find all form fields without name or id
const inputs = document.querySelectorAll('input, textarea, select');
inputs.forEach((input, index) => {
if (!input.id && !input.name) {
// Generate a unique name based on label or index
const label = input.closest('.form')?.querySelector('label')?.textContent || '';
const name = label.toLowerCase().replace(/[^a-z0-9]/g, '_') || `field_${index}`;
input.name = name;
}
if (!input.id) {
input.id = input.name || `input_${index}`;
}
});
}
// Monitor evaluation status for alert markers and show pop-up
function setupEvaluationAlertMonitor() {
let lastCheckedValues = new Map();
let alertShown = new Set();
function checkForAlerts() {
// Prefer the dedicated Status box if present
const statusBox = document.querySelector('#evaluation_status_box textarea, #evaluation_status_box input');
const allInputs = statusBox ? [statusBox] : document.querySelectorAll('textarea, input[type="text"]');
allInputs.forEach(function(input, index) {
const value = input.value || '';
const key = index + '_' + (input.id || input.name || input.className || '');
// Check if value has changed and contains alert marker
const lastValue = lastCheckedValues.get(key);
if (value !== lastValue && value.includes('__ALERT__')) {
// Extract the alert message
const alertMessage = value.replace('__ALERT__', '').trim();
// Only show alert once per unique message to avoid duplicates
const alertKey = alertMessage.substring(0, 100);
if (!alertShown.has(alertKey) && alertMessage.length > 0) {
alertShown.add(alertKey);
// Use setTimeout to ensure alert shows after DOM update
setTimeout(function() {
// Show browser alert
alert(alertMessage);
// Remove the marker from the displayed text
if (input.value.includes('__ALERT__')) {
input.value = alertMessage;
// Trigger events to update Gradio
input.dispatchEvent(new Event('input', { bubbles: true }));
input.dispatchEvent(new Event('change', { bubbles: true }));
}
}, 100);
// Clear the alert key after 3 seconds to allow same alert again if needed
setTimeout(function() {
alertShown.delete(alertKey);
}, 3000);
}
}
// Update last checked value
lastCheckedValues.set(key, value);
});
}
// Use MutationObserver to watch for DOM changes
const observer = new MutationObserver(function(mutations) {
// Check immediately when DOM changes
setTimeout(checkForAlerts, 50);
});
// Observe the document body for changes
observer.observe(document.body, {
childList: true,
subtree: true,
characterData: true,
attributes: true,
attributeOldValue: true
});
// Check periodically (very frequent for better responsiveness)
setInterval(checkForAlerts, 150);
// Also check immediately and after delays
checkForAlerts();
setTimeout(checkForAlerts, 500);
setTimeout(checkForAlerts, 1000);
setTimeout(checkForAlerts, 2000);
// Listen for Gradio-specific events if available
document.addEventListener('DOMContentLoaded', checkForAlerts);
window.addEventListener('load', checkForAlerts);
}
// Run on page load and after dynamic updates
if (document.readyState === 'loading') {
document.addEventListener('DOMContentLoaded', function() {
fixFormFields();
setupEvaluationAlertMonitor();
});
} else {
fixFormFields();
setupEvaluationAlertMonitor();
}
// Also run after a delay to catch dynamically loaded content
setTimeout(function() {
fixFormFields();
setupEvaluationAlertMonitor();
}, 1000);
setInterval(fixFormFields, 2000); // Periodically check for new fields
})();
</script>
"""
with gr.Blocks() as app:
# Add CSS via HTML component (Gradio 3.50.2+ compatibility)
gr.HTML(f"<style>{css}</style>")
# Add JavaScript for form validation and alert monitoring
gr.HTML(global_js)
# Debug: Print HF storage status on startup
print("="*60)
print("HF Storage Status on Startup")
print("="*60)
print(f"HF_STORAGE_AVAILABLE: {HF_STORAGE_AVAILABLE}")
if HF_STORAGE_AVAILABLE:
try:
from hf_storage import get_hf_storage, HF_AVAILABLE
print(f"HF_AVAILABLE (in hf_storage): {HF_AVAILABLE}")
storage = get_hf_storage()
if storage:
print(f"Storage repo_id: {storage.repo_id}")
print(f"HF_TOKEN set: {'Yes' if os.getenv('HF_TOKEN') else 'No'}")
print(f"HF_EVAL_REPO_ID set: {os.getenv('HF_EVAL_REPO_ID') or 'No'}")
print(f"SPACE_ID: {os.getenv('SPACE_ID') or 'Not set'}")
except Exception as e:
print(f"Error checking storage: {e}")
print("="*60)
# In Spaces, start backend inside the same process
start_backend_server()
chatbot_type_state = gr.State('enhanced')
api_key_state = gr.State()
selected_title = gr.State()
is_running = gr.State()
target_timestamp = gr.State()
# Load patient data globally
def load_jsonl(filepath):
data = []
def _read(path):
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
data.append(json.loads(line))
try:
_read(filepath)
return data
except FileNotFoundError:
repo_dir = os.path.dirname(os.path.abspath(__file__))
local_prefix = "/Users/liuzijie/Desktop/chatbot-mimic-notes/"
if isinstance(filepath, str) and filepath.startswith(local_prefix):
relative_part = filepath[len(local_prefix):]
alt_path = os.path.join(repo_dir, relative_part)
if os.path.exists(alt_path):
_read(alt_path)
return data
alt_path2 = os.path.join(repo_dir, os.path.basename(filepath))
if os.path.exists(alt_path2):
_read(alt_path2)
return data
return []
# Safe JSON file loader
def load_json(path):
try:
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception:
return None
# Load per admission data (optional; skip if missing in Space)
# Prefer env var ADMISSION_JSONL; otherwise resolve relative to this file
script_dir_for_adm = os.path.dirname(os.path.abspath(__file__))
default_adm_path = os.path.join(script_dir_for_adm, "per_admission_summaries", "llama-3.2-3b_per_admission.jsonl")
jsonl_path = os.environ.get("ADMISSION_JSONL", default_adm_path)
admission_data = load_jsonl(jsonl_path) if os.path.exists(jsonl_path) else []
# Group admissions by patient_id
patient_admissions = {}
for admission in admission_data:
patient_id = admission['patient_id']
if patient_id not in patient_admissions:
patient_admissions[patient_id] = []
patient_admissions[patient_id].append(admission)
# Create options with admission tabs for each patient
options = []
for patient_id, admissions in patient_admissions.items():
for i, admission in enumerate(admissions):
admission_id = admission['hadm_id']
admit_time = admission['admittime']
summary = admission.get('summary', '')
input_text = admission.get('input_text', '')
# Create a unique title for each admission
title = f"Patient {patient_id} - Admission {i+1} (ID: {admission_id})"
options.append({
'title': title,
'text': input_text, # Use the actual input text that was given to LLM
'note': summary, # AI generated summary
'patient_id': patient_id,
'admission_id': admission_id,
'admission_index': i+1,
'admit_time': admit_time,
'raw_data': admission.get('raw', {})
})
# ----------------------
# Load new summaries data (grouped by patient -> admissions)
# Directory: /Users/liuzijie/Desktop/chatbot-mimic-notes/summaries
# Filenames: subject_{patient}_hadm_{hadm}_model_input.txt and subject_{patient}_hadm_{hadm}_summary.txt
# ----------------------
# Prefer env var, then path relative to this file (repo root)
script_dir = os.path.dirname(os.path.abspath(__file__))
default_summaries_dir = os.path.join(script_dir, "summaries")
summaries_dir = os.environ.get("SUMMARIES_DIR", default_summaries_dir)
def load_text_file(path):
try:
with open(path, 'r', encoding='utf-8') as f:
return f.read()
except Exception:
return ""
date_regex = re.compile(r"(\d{4}-\d{2}-\d{2}(?:[ T]\d{2}:\d{2}(?::\d{2})?)?)")
def extract_timestamp_from_text(text):
if not text:
return ""
m = date_regex.search(text)
return m.group(1) if m else ""
# Clean summary output by removing JSON/object appendix and code fences
def clean_summary_output(text):
"""Remove trailing JSON block or code fences from model output, and extract only markdown summary."""
if not text:
return text
# Remove reasoning tags
text = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL | re.IGNORECASE)
# Extract only the markdown summary part (A) PREOP SUMMARY section)
# Look for "A) PREOP" or "A) PREOP SUMMARY" and extract until "---" or "B) JSON"
markdown_pattern = r'(A\)\s*PREOP.*?)(?:---|B\)\s*JSON)'
match = re.search(markdown_pattern, text, re.DOTALL | re.IGNORECASE)
if match:
text = match.group(1).strip()
# Remove the "A) PREOP SUMMARY" or "A) PREOP" heading line
text = re.sub(r'^A\)\s*PREOP\s+SUMMARY\s*:?\s*', '', text, flags=re.IGNORECASE | re.MULTILINE)
text = re.sub(r'^A\)\s*PREOP\s*:?\s*', '', text, flags=re.IGNORECASE | re.MULTILINE)
text = re.sub(r'^A\)\s*PREOP\s+SUMMARY\s+with\s+headings\s*:?\s*', '', text, flags=re.IGNORECASE | re.MULTILINE)
else:
# Fallback: remove JSON markers if markdown pattern not found
cut_markers = ['**JSON Object:**', '```json', '```', '\n{', 'B) JSON', '---']
for marker in cut_markers:
if marker in text:
text = text.split(marker)[0].strip()
return text.strip()
# Load from new JSONL format first (single admission per record)
# IMPORTANT: Only load from Qwen JSONL, skip all other data sources
jsonl_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'Qwen__Qwen2.5-7B-Instruct_io.jsonl')
jsonl_loaded = False
eval_data_by_patient = {} # Initialize empty to ensure clean start
# FORCE: Only load from Qwen JSONL, completely skip all other data sources
if os.path.exists(jsonl_path):
print(f"[INFO] ========== LOADING FROM QWEN JSONL ONLY ==========")
print(f"Loading data from JSONL: {jsonl_path}")
jsonl_data = load_jsonl(jsonl_path)
print(f"Found {len(jsonl_data)} entries in JSONL file")
entry_count = 0
for entry in jsonl_data:
entry_count += 1
full_id = entry.get('patient_id', '') or ''
# Extract patient_id from format like "11318742_admission_29646478_input"
# Only keep the patient_id part (e.g., "11318742")
if '_admission_' in full_id:
patient_id = full_id.split('_admission_')[0]
# Extract admission_id from the remaining part (e.g., "29646478_input" -> "29646478")
admission_part = full_id.split('_admission_', 1)[1]
admission_id = admission_part.split('_')[0] if '_' in admission_part else admission_part
else:
# Fallback: if format doesn't match, try to extract just the numeric patient ID
patient_id = full_id.split('_')[0] if '_' in full_id else full_id
admission_id = '1' # Default admission ID
# Get input and output fields (support multiple schema variants)
# New Qwen JSONL uses 'patient_input' and 'output_text';
# older versions used 'input' and 'output'.
input_text = (
entry.get('patient_input')
or entry.get('input')
or entry.get('patient_input_text')
or ''
)
raw_summary = (
entry.get('output')
or entry.get('output_text')
or entry.get('summary')
or ''
)
# Clean the output to extract only markdown summary
summary = clean_summary_output(raw_summary)
# Extract admission time from input text
admittime = None
if input_text:
adm_match = re.search(r'Admission Time:\s*(\d{4}-\d{2}-\d{2}[\sT]\d{2}:\d{2}:\d{2})', input_text)
if adm_match:
admittime = adm_match.group(1)
# Store data: input goes to patient raw data, cleaned output goes to AI summary
patient_dict = eval_data_by_patient.setdefault(patient_id, {})
patient_dict[admission_id] = {
'patient_id': patient_id,
'admission_id': admission_id,
'admission_index': 1,
'input_text': input_text, # This will be shown in "Patient raw data from EHR"
'summary': summary, # This will be shown in "AI Generated Summary" (cleaned)
'admittime': admittime,
'timestamp': admittime,
'highlights': []
}
jsonl_loaded = True
print(f"Processed {entry_count} entries from JSONL")
print(f"Loaded {len(eval_data_by_patient)} unique patients from JSONL")
print(f"Patient IDs: {sorted(list(eval_data_by_patient.keys()))}")
print(f"[INFO] JSONL loading complete. jsonl_loaded = {jsonl_loaded}")
print(f"[INFO] ================================================")
else:
print(f"[WARNING] JSONL file not found: {jsonl_path}")
print(f"[WARNING] Will attempt to load from fallback sources")
# Fallback to old format if JSONL not found
# IMPORTANT: Only load from fallback if JSONL was NOT loaded
if (not jsonl_loaded) and os.path.exists(summaries_dir):
print(f"[WARNING] ========== FALLBACK: Loading from summaries_dir ==========")
print(f"[INFO] Loading from fallback summaries_dir: {summaries_dir}")
print(f"[WARNING] This should NOT happen if Qwen JSONL exists!")
files = os.listdir(summaries_dir)
pattern = re.compile(r"^subject_(\d+)_hadm_(\d+)_(model_input|summary)\.txt$")
for fname in files:
match = pattern.match(fname)
if not match:
continue
patient_id, hadm_id, kind = match.groups()
patient_dict = eval_data_by_patient.setdefault(patient_id, {})
record = patient_dict.setdefault(hadm_id, {
'patient_id': patient_id,
'admission_id': hadm_id,
'input_text': "",
'summary': "",
'timestamp': "",
'admittime': None,
'highlights': []
})
full_path = os.path.join(summaries_dir, fname)
if kind == 'model_input':
record['input_text'] = load_text_file(full_path)
# prefer embedded timestamp if present
if not record.get('timestamp'):
embedded = extract_timestamp_from_text(record['input_text'])
if embedded:
record['timestamp'] = embedded
record['_input_mtime'] = os.path.getmtime(full_path)
elif kind == 'summary':
record['summary'] = load_text_file(full_path)
if not record.get('timestamp'):
embedded = extract_timestamp_from_text(record['summary'])
if embedded:
record['timestamp'] = embedded
record['_summary_mtime'] = os.path.getmtime(full_path)
# finalize timestamps: do NOT fallback to mtime; leave as None if not found in content
for p_id, hadm_map in eval_data_by_patient.items():
for h_id, rec in hadm_map.items():
if not rec.get('timestamp'):
rec['timestamp'] = None
# cleanup internal fields
if '_input_mtime' in rec:
del rec['_input_mtime']
if '_summary_mtime' in rec:
del rec['_summary_mtime']
# Merge per-admission structured JSONs (input_text, summary, admittime)
# IMPORTANT: Only load if JSONL was NOT loaded (to avoid duplicate patients)
per_adm_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'per_admission_results_30patient_2ad')
if (not jsonl_loaded) and os.path.exists(per_adm_dir):
print(f"[WARNING] ========== FALLBACK: Loading from per_admission_results_30patient_2ad ==========")
print(f"[WARNING] JSONL not loaded, falling back to per_admission_results_30patient_2ad directory")
print(f"[WARNING] This should NOT happen if Qwen JSONL exists!")
print(f"[WARNING] jsonl_loaded flag = {jsonl_loaded}")
for fname in os.listdir(per_adm_dir):
if not fname.startswith('patient_') or not fname.endswith('.json') or fname == 'all_patients_per_admission.json':
continue
pjson = load_json(os.path.join(per_adm_dir, fname))
if not pjson:
continue
p_id = str(pjson.get('patient_id') or '')
admissions = pjson.get('admissions', [])
if not p_id:
continue
for idx, adm in enumerate(admissions, start=1):
base_hadm_id = str(adm.get('hadm_id'))
if not base_hadm_id:
continue
# Use composite key to disambiguate duplicates: {hadm_id}#{index}
hadm_key = f"{base_hadm_id}#{idx}"
patient_dict = eval_data_by_patient.setdefault(p_id, {})
rec = patient_dict.setdefault(hadm_key, {
'patient_id': p_id,
'admission_id': base_hadm_id,
'admission_index': idx,
'input_text': '',
'summary': '',
'timestamp': None,
'admittime': None,
'highlights': []
})
# Merge/override from structured JSON
if not rec.get('input_text'):
rec['input_text'] = adm.get('input_text', '')
if not rec.get('summary'):
rec['summary'] = adm.get('summary', '')
if not rec.get('admittime'):
rec['admittime'] = adm.get('admittime') or adm.get('raw_admission_data', {}).get('admittime')
# Load trace-back results and attach matches (only if JSONL was loaded)
# This only adds highlights, doesn't add new patients
trace_back_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'trace_back_results_30patient_2ad')
if jsonl_loaded and os.path.exists(trace_back_dir):
print(f"[INFO] Loading trace-back results for existing patients only")
for p_id, hadm_map in list(eval_data_by_patient.items()):
tb_path = os.path.join(trace_back_dir, f"patient_{p_id}_trace_back.json")
tb = load_json(tb_path)
if not tb:
continue
tb_adms = tb.get('admissions', [])
# Build mapping idx -> hadm_key for this patient (sorted by admission_index)
idx_to_key = {}
for hadm_key, rec in hadm_map.items():
idx = rec.get('admission_index')
if idx:
idx_to_key[int(idx)] = hadm_key
for adm_entry in tb_adms:
idx = int(adm_entry.get('admission_idx', 0)) + 1
hadm_key = idx_to_key.get(idx)
if not hadm_key:
continue
rec = hadm_map.get(hadm_key)
if not rec:
continue
cos_list = adm_entry.get('cosine_results', [])
att_list = adm_entry.get('attention_results', [])
# Split summary into lines to enumerate sentences (simple split by newline)
summary_lines = [ln for ln in (rec.get('summary', '').split('\n')) if ln.strip()]
sentence_map = {}
for sidx, sline in enumerate(summary_lines):
sentence_map[sidx] = {
'summary_sentence_idx': sidx,
'summary_sentence': sline,
'cosine_matches': [],
'attention_matches': []
}
for item in cos_list:
sidx = item.get('summary_sentence_idx')
matches = item.get('top_10_matches', [])
if sidx in sentence_map:
sentence_map[sidx]['cosine_matches'] = matches
for item in att_list:
sidx = item.get('summary_sentence_idx')
matches = item.get('top_10_matches', [])
if sidx in sentence_map:
sentence_map[sidx]['attention_matches'] = matches
rec['highlights'] = [sentence_map[k] for k in sorted(sentence_map.keys())]
# Precompute patient id list for UI (single admission per patient)
patient_ids_list = sorted(list(eval_data_by_patient.keys()))
print(f"[DEBUG] ========== FINAL DATA LOAD SUMMARY ==========")
print(f"[DEBUG] JSONL loaded: {jsonl_loaded}")
print(f"[DEBUG] Total patients in UI list: {len(patient_ids_list)}")
print(f"[DEBUG] Patient IDs: {patient_ids_list}")
# NOTE: Patient count can change (e.g., dataset expanded from 24 → 30).
# Avoid hard-coding an expected number; rely on jsonl_loaded/fallback logs above.
print(f"[DEBUG] Patient count check skipped (dynamic dataset size)")
print(f"[DEBUG] ============================================")
with gr.Tabs():
with gr.TabItem("Expert Evaluation"):
# Load evaluation groups — always load if the file exists so the
# Clinician dropdown and per-clinician filtering are active by default.
eval_groups = {}
eval_groups_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'eval_groups_5groups_pairwise.json')
if os.path.exists(eval_groups_file):
try:
with open(eval_groups_file, 'r') as f:
eval_groups_data = json.load(f)
eval_groups = eval_groups_data.get('groups', {})
print(f"[INFO] Loaded eval groups from {eval_groups_file} ({len(eval_groups)} clinicians)")
except Exception as e:
print(f"[WARNING] Failed to load eval groups file {eval_groups_file}: {e}")
eval_groups = {}
with gr.Row():
with gr.Column(scale=1):
# Patient Sample Selection for Evaluation
with gr.Box():
gr.Markdown("## Select Patient Sample")
# Clinician ID selection
if eval_groups:
gr.Markdown("### Evaluation Clinician ID")
# eval_groups keys are already "Clinician_1", "Clinician_2", etc.
clinician_choices = sorted(eval_groups.keys())
eval_group_dropdown = gr.Dropdown(
choices=clinician_choices,
label="Select your ID to see assigned patient cases",
value=None,
interactive=True
)
group_info_display = gr.Markdown(
value="",
visible=True
)
option_titles = [option["title"] for option in options]
# Single patient selector (one admission per patient)
# Default to empty list - patients will be shown only after clinician selects their ID
patient_ids = [] if eval_groups else patient_ids_list
eval_patient_radio = gr.Radio(
choices=patient_ids,
label="Choose Patient",
interactive=True,
value=None
)
with gr.Column(scale=4):
# Patient Sample Evaluation Module
with gr.Box():
gr.Markdown("## 🏥 Patient Sample Evaluation Module")
gr.Markdown("Evaluate AI-generated patient summaries based on patient data")
with gr.Row():
expert_name_input = gr.Textbox(
label="Clinician ID",
placeholder="Enter your ID",
scale=2
)
evaluation_status = gr.Textbox(
label="Status",
interactive=False,
scale=1,
elem_id="evaluation_status_box"
)
# HTML component for alert popup (visible when there's content)
alert_popup = gr.HTML(
value="",
visible=True,
elem_id="alert_popup_container"
)
# Patient Data Display (based on selected sample)
gr.Markdown("### Patient Raw Data from EHR")
with gr.Accordion("Patient raw data from EHR", open=True):
patient_input_display = gr.HTML(
value="Select a patient sample to view data",
label="Original Patient Data",
elem_id="patient_input_display"
)
# AI Summary Display (scrollable)
gr.Markdown("### AI Generated Summary")
with gr.Accordion("🤖 AI Generated Summary", open=True):
ai_summary_display = gr.Markdown(
value="Select a patient to view AI summary...",
label="AI Generated Summary",
elem_id="ai_summary_display"
)
# Overall Quality Rating
gr.Markdown("<span style='color: red; font-weight: bold;'>⚠️ Please rate all dimensions below before submitting your evaluation.</span>")
gr.Markdown("### Overall evaluation (1 = Poor, 10 = Excellent)")
gr.Markdown("*A global assessment integrating all dimensions (accuracy, completeness, relevance...)*")
overall_rating = gr.Slider(
minimum=1,
maximum=10,
step=1,
label="Overall Quality",
value=0
)
# Core Evaluation Dimensions
gr.Markdown("### Core Evaluation Dimensions (1 = Poor, 10 = Excellent)")
with gr.Row():
with gr.Column():
gr.Markdown("**Clinical Accuracy**: the extent to which the content is factually correct and consistent with the source EHR.")
clinical_accuracy = gr.Slider(
minimum=1,
maximum=10,
step=1,
label="",
value=0
)
with gr.Column():
gr.Markdown("**Completeness / Coverage**: whether all information required for a comprehensive preoperative evaluation is included.")
completeness_coverage = gr.Slider(
minimum=1,
maximum=10,
step=1,
label="",
value=0
)
with gr.Row():
with gr.Column():
gr.Markdown("**Clinical Relevance**: the degree to which the summary prioritizes information most important for preoperative decision-making.")
clinical_relevance = gr.Slider(
minimum=1,
maximum=10,
step=1,
label="",
value=0
)
with gr.Column():
gr.Markdown("**Clarity and Structure**: how easily clinicians can understand, navigate, and apply the summary in practice.")
clarity_structure = gr.Slider(
minimum=1,
maximum=10,
step=1,
label="",
value=0
)
with gr.Row():
with gr.Column():
gr.Markdown("**Reasoning / Risk Stratification**: accurate identification of perioperative risk factors, appropriate interpretation, and required follow-up actions.")
reasoning_risk = gr.Slider(
minimum=1,
maximum=10,
step=1,
label="",
value=0
)
with gr.Column():
gr.Markdown("**Actionability**: whether the summary supports clear clinical decisions and next steps.")
actionability = gr.Slider(
minimum=1,
maximum=10,
step=1,
label="",
value=0
)
# Critical Error Assessment
gr.Markdown("### Critical Error Assessment")
gr.Markdown("*Number of hallucinations and omission of critical information that could directly lead to adverse events*")
with gr.Row():
hallucination = gr.Slider(
minimum=1,
maximum=10,
step=1,
label="Hallucination",
value=0
)
critical_omission = gr.Slider(
minimum=1,
maximum=10,
step=1,
label="Critical Omission",
value=0
)
# Hallucination and Critical Omission Comments
gr.Markdown("*If critical errors were identified, please provide detail information below*")
with gr.Row():
hallucination_comments = gr.Textbox(
label="Hallucination",
placeholder="Copy the relevant portion of the summary and provide your feedback directly underneath",
lines=5,
value=""
)
critical_omission_comments = gr.Textbox(
label="Critical Omission",
placeholder="Copy the relevant portion of the summary and provide your feedback directly underneath",
lines=5,
value=""
)
# Detailed feedback
gr.Markdown("### Other Feedback and Comment")
feedback_text = gr.Textbox(
label="Other Feedback and Comment",
placeholder="Please provide any additional feedback or comments...",
lines=3
)
# Submit evaluation button
submit_eval_button = gr.Button(
"Submit Patient Evaluation",
variant="primary",
size="large"
)
# Connect patient evaluation functionality
def update_highlight_by_sentence(patient_id, admission_label, sentence_text, mode):
"""Highlight the matching input sentence for the selected summary sentence"""
print(f"\n{'='*60}")
print(f"[Highlight] Button clicked!")
print(f"[Highlight] Patient ID: {patient_id}")
print(f"[Highlight] Admission: {admission_label}")
print(f"[Highlight] Mode: {mode}")
print(f"[Highlight] Selected sentence: {sentence_text[:100] if sentence_text else 'None'}...")
if not patient_id or not sentence_text:
print(f"[Highlight] Missing required parameters")
return gr.update() # No update
# Single admission per patient: use first record
pid = str(patient_id)
recs = eval_data_by_patient.get(pid, {})
if not recs:
print(f"[Highlight] No records for patient {pid}")
return gr.update()
hadm_key = list(recs.keys())[0]
print(f"[Highlight] Using hadm_key: {hadm_key}")
rec = recs.get(hadm_key)
if not rec:
print(f"[Highlight] Record not found for patient {patient_id}, hadm {hadm_key}")
return gr.update()
# Find the matching input sentence
highlights = rec.get('highlights', [])
print(f"[Highlight] Total highlights available: {len(highlights)}")
target_sentence = ""
# Try to find exact match
target_matches = []
found_match = False
for idx, h in enumerate(highlights):
stored_sentence = h.get('summary_sentence', '').strip()
if stored_sentence == sentence_text:
found_match = True
print(f"[Highlight] Found matching sentence at index {idx}")
if mode == "Cosine Only":
matches = h.get('cosine_matches', [])
else:
matches = h.get('attention_matches', [])
print(f"[Highlight] Total matches: {len(matches)}")
# Get all matches with similarity > 0 (top 10)
if matches and len(matches) > 0:
# Filter matches with similarity > 0
valid_matches = [m for m in matches if m.get('similarity', 0) > 0]
print(f"[Highlight] Valid matches (similarity > 0): {len(valid_matches)}")
if valid_matches:
for i, m in enumerate(valid_matches[:5]):
print(f"[Highlight] Match #{i+1}: similarity={m.get('similarity', 0):.4f}, sentence={m.get('input_sentence', '')[:60]}...")
target_matches = valid_matches[:10] # Keep top 10
else:
print(f"[Highlight] No matches found")
break
if not found_match:
print(f"[Highlight] WARNING: Could not find exact match for selected sentence")
print(f"[Highlight] Final target_matches count: {len(target_matches)}")
# Get base text
base = rec.get('input_text', '')
if not base:
return gr.update(value="<pre>No input data</pre>")
print(f"[Highlight] Input text length: {len(base)} chars")
print(f"[Highlight] Input text first 100 chars: {base[:100]}")
# Escape HTML
html_text = base.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
# Apply highlights for all top 10 matches
if target_matches:
import re
print(f"[Highlight] Applying highlights to {len(target_matches)} matches")
# Sort by similarity (highest first) to highlight in order
sorted_matches = sorted(target_matches, key=lambda x: x.get('similarity', 0), reverse=True)
# Define color intensity based on rank
highlighted_count = 0
for idx, match in enumerate(sorted_matches):
sentence = match.get('input_sentence', '')
similarity = match.get('similarity', 0)
if not sentence:
print(f"[Highlight] Match #{idx+1}: Empty sentence, skipping")
continue
# Check if sentence exists in original text BEFORE escaping
if sentence not in base:
print(f"[Highlight] Match #{idx+1}: Sentence not in original text")
print(f"[Highlight] Looking for: {sentence[:80]}")
# Try to find partial match
sentence_parts = sentence.split('\n')[0] # Try first line
if sentence_parts in base:
print(f"[Highlight] Found partial match: {sentence_parts[:60]}")
continue
# Escape HTML in sentence
escaped_sentence = sentence.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
# Determine color based on rank and similarity
if idx == 0:
# Top match: strong highlight
bg_color = '#ffeb3b' if mode == "Cosine Only" else '#4caf50'
opacity = '1.0'
elif idx < 3:
# Top 3: medium highlight
bg_color = '#ffeb3b' if mode == "Cosine Only" else '#4caf50'
opacity = '0.7'
elif idx < 5:
# Top 5: lighter highlight
bg_color = '#ffeb3b' if mode == "Cosine Only" else '#4caf50'
opacity = '0.5'
else:
# Top 10: very light highlight
bg_color = '#ffeb3b' if mode == "Cosine Only" else '#4caf50'
opacity = '0.3'
# Create highlight with rank number
esc = re.escape(escaped_sentence)
rank_label = f'<sup style="font-size:0.7em;font-weight:bold;">#{idx+1}</sup>'
highlighted = f'<mark style="background-color:{bg_color};opacity:{opacity};padding:2px 4px;border-radius:3px;" title="Rank {idx+1}, Similarity: {similarity:.4f}">{rank_label}{escaped_sentence}</mark>'
# Replace first occurrence only
before_count = html_text.count(escaped_sentence)
html_text = re.sub(esc, highlighted, html_text, count=1, flags=re.IGNORECASE)
after_count = html_text.count(escaped_sentence)
if before_count > after_count:
highlighted_count += 1
print(f"[Highlight] Match #{idx+1}: Applied highlight (similarity={similarity:.4f})")
else:
print(f"[Highlight] Match #{idx+1}: NOT FOUND in text (similarity={similarity:.4f})")
print(f"[Highlight] Looking for: {escaped_sentence[:80]}...")
print(f"[Highlight] Total highlights applied: {highlighted_count}/{len(target_matches)}")
else:
print(f"[Highlight] No target matches to highlight")
# Wrap in scrollable div
result = f'<div id="patient_input_text" style="max-height: 400px; overflow-y: auto; white-space: pre-wrap; font-family: monospace;">{html_text}</div>'
print(f"{'='*60}\n")
return gr.update(value=result)
def render_clickable_summary(summary_text, highlights_data=None):
"""Return summary text and list of sentences for Radio"""
if not summary_text or summary_text == "Select a patient to view AI summary...":
return summary_text, []
# Extract non-empty lines as sentence choices
sentences = [ln.strip() for ln in summary_text.split('\n') if ln.strip()]
return summary_text, sentences
# End of render_clickable_summary
def replace_icd_codes_and_lab_ids(text):
"""Replace ICD codes and Lab Item IDs with descriptions"""
# Replace ICD-10 codes
for code, description in ICD10_DESCRIPTIONS.items():
# Match patterns like "ICD-10: CODE" or "ICD-9: CODE"
text = re.sub(
rf'\b(ICD-10|ICD-9):\s*{re.escape(code)}\b',
rf'\1: {code} ({description})',
text,
flags=re.IGNORECASE
)
# Replace Lab Item IDs
for item_id, test_name in LAB_TEST_NAMES.items():
# Match patterns like "Item ID 50934"
text = re.sub(
rf'\bItem ID\s+{item_id}\b',
f'{test_name} (ID {item_id})',
text
)
return text
def update_patient_eval_display(patient_id, admission_label=None):
"""
Return patient input HTML and summary markdown for the selected patient.
Single-admission mode: always uses the first (only) record for that patient.
"""
try:
if not patient_id:
return "No patient selected", "Select a patient to view AI summary..."
pid = str(patient_id)
if pid not in eval_data_by_patient or not eval_data_by_patient[pid]:
return "Patient data not found", "AI summary not found"
# Single admission per patient
first_key = list(eval_data_by_patient[pid].keys())[0]
rec = eval_data_by_patient[pid][first_key]
summary_text = clean_summary_output(rec.get('summary', '') or 'No summary available')
input_text = rec.get('input_text', '') or 'No input data available'
# Replace ICD codes and Lab IDs with descriptions
input_text_with_descriptions = replace_icd_codes_and_lab_ids(input_text)
sanitized_input = (
input_text_with_descriptions.replace('&', '&amp;')
.replace('<', '&lt;')
.replace('>', '&gt;')
)
input_html = (
'<pre id="patient_input_text" '
'style="max-height: 400px; overflow-y: auto; white-space: pre-wrap; '
'font-family: monospace; margin: 0;">'
f'{sanitized_input}'
'</pre>'
)
# Return raw markdown text; scrolling controlled by CSS on elem_id
return input_html, summary_text
except Exception as e:
import traceback
error_msg = f"Error loading patient data: {str(e)}\n{traceback.format_exc()}"
print(f"[ERROR] {error_msg}")
return "Error loading data", f"Error: {str(e)}"
# Handle clinician ID selection to filter patient list
if eval_groups:
def update_patient_list_by_group(selected_clinician):
"""Filter patient list based on selected clinician ID and auto-fill clinician ID field"""
if not selected_clinician:
filtered_patients = []
info_text = ""
clinician_id_update = gr.update()
else:
# Directly use selected_clinician as key (no mapping needed)
group_data = eval_groups.get(selected_clinician, {})
group_patients = group_data.get('all_patients', [])
# Filter: only show patients that are in both the group AND the loaded patient list
# Convert to strings for comparison
group_patients_str = [str(p) for p in group_patients]
patient_ids_list_str = [str(p) for p in patient_ids_list]
filtered_patients = [p for p in patient_ids_list_str if p in group_patients_str]
print(f"[DEBUG] Clinician {selected_clinician}: group has {len(group_patients)} patients, {len(filtered_patients)} match loaded data")
print(f"[DEBUG] Group patients (as strings): {group_patients_str}")
print(f"[DEBUG] Loaded patients (as strings): {patient_ids_list_str}")
print(f"[DEBUG] Filtered patients: {filtered_patients}")
if len(filtered_patients) == 0:
print(f"[ERROR] No patients matched! Check if patient IDs match between groups and loaded data.")
info_text = f"**{selected_clinician}**: {len(filtered_patients)} patients assigned"
clinician_id_update = gr.update(value=selected_clinician)
return gr.update(choices=filtered_patients, value=None), info_text, clinician_id_update
eval_group_dropdown.change(
fn=update_patient_list_by_group,
inputs=[eval_group_dropdown],
outputs=[eval_patient_radio, group_info_display, expert_name_input]
)
# When patient changes, update display directly (single admission per patient)
def on_patient_change(patient_id):
return update_patient_eval_display(patient_id)
eval_patient_radio.change(
fn=on_patient_change,
inputs=[eval_patient_radio],
outputs=[patient_input_display, ai_summary_display]
)
# Helper function to create alert popup HTML
def create_alert_popup(message):
"""Create HTML for a modal popup alert"""
# Escape HTML special characters
escaped_msg = message.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;').replace('"', '&quot;').replace("'", '&#39;')
# Convert newlines to <br>
formatted_msg = escaped_msg.replace('\n', '<br>')
return f"""
<div id="gradio_alert_modal" style="
position: fixed;
top: 0;
left: 0;
width: 100%;
height: 100%;
background-color: rgba(0, 0, 0, 0.5);
z-index: 10000;
display: flex;
justify-content: center;
align-items: center;
">
<div style="
background-color: white;
padding: 30px;
border-radius: 10px;
max-width: 600px;
max-height: 80%;
overflow-y: auto;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.3);
">
<h2 style="color: #d32f2f; margin-top: 0; font-size: 24px; font-weight: bold;">⚠️ Warning</h2>
<div style="margin: 20px 0; line-height: 1.8; white-space: pre-wrap; color: #8B0000; font-size: 17px; font-weight: 500; background-color: #ffffff;">{formatted_msg}</div>
<button onclick="document.getElementById('gradio_alert_modal').remove();" style="
background-color: #d32f2f;
color: white;
border: none;
padding: 10px 20px;
border-radius: 5px;
cursor: pointer;
font-size: 16px;
">OK</button>
</div>
</div>
<script>
// Auto-remove modal after 10 seconds if user doesn't click
setTimeout(function() {{
const modal = document.getElementById('gradio_alert_modal');
if (modal) modal.remove();
}}, 10000);
</script>
"""
# Handle patient evaluation submission
def submit_patient_eval_wrapper(selected_patient_id, overall_rating, clinical_accuracy, completeness_coverage,
clinical_relevance, clarity_structure, reasoning_risk, actionability,
hallucination, critical_omission, feedback_text, expert_name_input,
hallucination_comments, critical_omission_comments):
# Debug: check if data is loaded
if not eval_data_by_patient:
error_message = "⚠️ ERROR: No patient data loaded. Check if data files exist."
alert_html = create_alert_popup(error_message.replace("__ALERT__", "").replace("⚠️ ERROR: ", ""))
return f"__ALERT__{error_message}", "", 0, 0, 0, 0, 0, 0, 0, 0, 0, "", "", "", alert_html
if not selected_patient_id:
error_message = "⚠️ ERROR: Please select a patient first."
alert_html = create_alert_popup(error_message.replace("__ALERT__", "").replace("⚠️ ERROR: ", ""))
return f"__ALERT__{error_message}", "", 0, 0, 0, 0, 0, 0, 0, 0, 0, "", "", "", alert_html
recs = eval_data_by_patient.get(selected_patient_id, {})
if not recs:
error_message = "⚠️ ERROR: Patient data not found."
alert_html = create_alert_popup(error_message.replace("__ALERT__", "").replace("⚠️ ERROR: ", ""))
return f"__ALERT__{error_message}", "AI summary not found", "", 0, 0, 0, 0, 0, 0, 0, 0, 0, "", "", "", alert_html
hadm_key = list(recs.keys())[0]
rec = recs.get(hadm_key)
patient_id = rec["patient_id"]
admission_id = rec["admission_id"]
patient_input = rec["input_text"] # Input text to LLM
ai_summary = rec["summary"] # AI generated summary
# Submit the evaluation with admission info
print(f"[Submit] Starting evaluation submission for patient {patient_id}, admission {admission_id}")
print(f"[Submit] Expert: {expert_name_input}, Overall Rating: {overall_rating}")
try:
result = submit_patient_evaluation(
f"{patient_id}_adm_{admission_id}", patient_input, ai_summary, overall_rating,
clinical_accuracy, completeness_coverage, clinical_relevance, clarity_structure,
reasoning_risk, actionability, hallucination, critical_omission,
feedback_text, expert_name_input, hallucination_comments, critical_omission_comments
)
status_msg = result[0] if isinstance(result, (list, tuple)) and len(result) > 0 else ""
# If this is an alert message, treat it as a failed submission in logs
if isinstance(status_msg, str) and status_msg.startswith("__ALERT__"):
print(f"[Submit] Evaluation NOT submitted, alert returned: {status_msg}")
# Extract alert message and create popup
alert_message = status_msg.replace("__ALERT__", "").strip()
alert_html = create_alert_popup(alert_message)
# Return with alert popup
return result[0], result[1], result[2], result[3], result[4], result[5], result[6], result[7], result[8], result[9], result[10], result[11], result[12], result[13], alert_html
else:
print(f"[Submit] Evaluation submitted successfully: {status_msg}")
# Return without alert popup (empty HTML)
return result[0], result[1], result[2], result[3], result[4], result[5], result[6], result[7], result[8], result[9], result[10], result[11], result[12], result[13], ""
except Exception as e:
import traceback
error_msg = f"⚠️ ERROR: Error submitting evaluation: {str(e)}"
error_trace = traceback.format_exc()
print(f"[Submit] Exception occurred: {error_msg}")
print(f"[Submit] Traceback: {error_trace}")
alert_html = create_alert_popup(error_msg.replace("__ALERT__", "").replace("⚠️ ERROR: ", ""))
return f"__ALERT__{error_msg}", feedback_text, overall_rating, clinical_accuracy, completeness_coverage, clinical_relevance, clarity_structure, reasoning_risk, actionability, hallucination, critical_omission, hallucination_comments, critical_omission_comments, expert_name_input, alert_html
submit_eval_button.click(
fn=submit_patient_eval_wrapper,
inputs=[
eval_patient_radio,
overall_rating,
clinical_accuracy,
completeness_coverage,
clinical_relevance,
clarity_structure,
reasoning_risk,
actionability,
hallucination,
critical_omission,
feedback_text,
expert_name_input,
hallucination_comments,
critical_omission_comments
],
outputs=[
evaluation_status,
feedback_text,
overall_rating,
clinical_accuracy,
completeness_coverage,
clinical_relevance,
clarity_structure,
reasoning_risk,
actionability,
hallucination,
critical_omission,
hallucination_comments,
critical_omission_comments,
expert_name_input,
alert_popup
]
)
# Google Drive functionality removed - using local storage only
# Google Drive functionality removed - no auth check needed
app.queue()
# Check if we want to deploy to Hugging Face Spaces for permanent hosting
import sys
if "--deploy" in sys.argv:
print("To deploy permanently to Hugging Face Spaces:")
print("1. Install: pip install huggingface_hub[cli]")
print("2. Login: huggingface-cli login")
print("3. Deploy: gradio deploy")
print("4. This will give you a permanent https://yourname-appname.hf.space URL")
# Check if running on Hugging Face Spaces
hf_space = os.getenv("SPACE_ID") is not None
if hf_space:
# Running on Hugging Face Spaces
app.launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True
)
else:
# Running locally
app.launch(
share=True,
max_threads=10,
server_name="0.0.0.0", # Allow external connections
server_port=7860,
show_error=True
)