import gradio as gr
import asyncio
import httpx
import tempfile
import os
import requests
import time
import threading
import json
import re
import csv
import html
import markdown
import base64
from datetime import datetime, timedelta
# Hugging Face Hub storage for evaluations
try:
from hf_storage import save_to_huggingface
HF_STORAGE_AVAILABLE = True
except ImportError:
HF_STORAGE_AVAILABLE = False
import shutil
import zipfile
# Optional Whisper ASR support (for local use); not required on Hugging Face Space
try:
import whisper # type: ignore
WHISPER_AVAILABLE = True
except ImportError:
whisper = None # type: ignore
WHISPER_AVAILABLE = False
# ICD-10 code to description mapping (common codes)
ICD10_DESCRIPTIONS = {
"I130": "Hypertensive heart and chronic kidney disease",
"I5033": "Acute on chronic diastolic heart failure",
"E872": "Hypokalemia",
"N184": "Chronic kidney disease, stage 4",
"E1122": "Type 2 diabetes with chronic kidney disease",
"N2581": "Secondary hyperparathyroidism of renal origin",
"I2510": "Atherosclerotic heart disease without angina",
"E11319": "Type 2 diabetes with unspecified diabetic retinopathy",
"D6489": "Other specified anemias",
"E785": "Hyperlipidemia, unspecified",
"Z955": "Presence of coronary angioplasty implant and graft",
"Z86718": "Personal history of other venous thrombosis",
"I252": "Old myocardial infarction",
"Z2239": "Encounter for screening for other suspected endocrine disorder",
"G4700": "Insomnia, unspecified",
"M1A9XX0": "Chronic gout, unspecified, without tophus",
"R0902": "Hypoxemia",
"E1151": "Type 2 diabetes with diabetic peripheral angiopathy without gangrene",
"Z794": "Long term use of insulin",
"E669": "Obesity, unspecified",
"Z6831": "Body mass index 31.0-31.9, adult",
"V4571": "Renal dialysis status",
"I10": "Essential hypertension",
"E119": "Type 2 diabetes without complications",
"J449": "Chronic obstructive pulmonary disease, unspecified",
"N18": "Chronic kidney disease",
}
# MIMIC-IV Lab Test Item ID to name mapping (common tests)
LAB_TEST_NAMES = {
"50934": "H-Hematocrit",
"50947": "I-Ionized Calcium",
"51678": "L-Lymphocytes",
"50868": "Anion Gap",
"50882": "Bicarbonate",
"50912": "Creatinine",
"50971": "Potassium",
"50983": "Sodium",
"51006": "Urea Nitrogen",
"50902": "Chloride",
"50931": "Glucose",
"51221": "Hematocrit",
"51222": "Hemoglobin",
"51265": "Platelet Count",
"51301": "White Blood Cells",
"50820": "pH",
"50821": "pO2",
"50818": "pCO2",
"51237": "INR",
"51274": "PT",
"51275": "PTT",
"50813": "Lactate",
"50960": "Magnesium",
"50970": "Phosphate",
"50893": "Calcium Total",
}
session = requests.Session()
if WHISPER_AVAILABLE:
model = whisper.load_model("base")
else:
model = None
base_url = "http://localhost:8080"
timeout = 60
concurrency_count=10
def start_backend_server():
"""Start the Flask backend (src.server) in a background thread for Spaces."""
try:
from src.server import create_app, configure_routes
class Args:
counselor_config_path = './src/configs/counselor_config.yaml'
store_dir = './user_data'
app = create_app()
configure_routes(app, Args())
def run():
app.run(port=8080, host='0.0.0.0', debug=False)
threading.Thread(target=run, daemon=True).start()
print("Backend server started on http://localhost:8080")
except Exception as e:
print(f"Failed to start backend server: {e}")
async def initialization(api_key, username):
url = f"{base_url}/api/initialization"
headers = {'Content-Type': 'application/json'}
data = {
'api_key': api_key,
'username': username,
}
async with httpx.AsyncClient(timeout=timeout) as client:
try:
response = await client.post(url, json=data, headers=headers)
if response.status_code == 200:
return "Initialization successful."
else:
return f"Initialization failed: {response.text}"
except asyncio.TimeoutError:
print("The request timed out")
return "Request timed out during initialization."
except Exception as e:
return f"Error in initialization: {str(e)}"
# def fetch_default_prompts(chatbot_type):
# url = f"{base_url}?chatbot_type={chatbot_type}"
# try:
# response = httpx.get(url, timeout=timeout)
# if response.status_code == 200:
# prompts = response.json()
# print(prompts)
# return prompts
# else:
# print(f"Failed to fetch prompts: {response.status_code} - {response.text}")
# return {}
# except Exception as e:
# print(f"Error fetching prompts: {str(e)}")
# return {}
async def get_backend_response(api_key, patient_prompt, username, chatbot_type):
url = f"{base_url}/responses/doctor"
headers = {'Content-Type': 'application/json'}
data = {
'username': username,
'patient_prompt': patient_prompt,
'chatbot_type': chatbot_type
}
async with httpx.AsyncClient(timeout=timeout) as client:
try:
response = await client.post(url, json=data, headers=headers)
if response.status_code == 200:
response_data = response.json()
return response_data
else:
return f"Failed to fetch response from backend: {response.text}"
except Exception as e:
return f"Error contacting backend service: {str(e)}"
async def save_conversation_and_memory(username, chatbot_type):
url = f"{base_url}/save/end_and_save"
headers = {'Content-Type': 'application/json'}
data = {
'username': username,
'chatbot_type': chatbot_type
}
async with httpx.AsyncClient(timeout=timeout) as client:
try:
response = await client.post(url, json=data, headers=headers)
if response.status_code == 200:
response_data = response.json()
return response_data.get('message', 'Saving Error!')
else:
return f"Failed to save conversations and memory graph: {response.text}"
except Exception as e:
return f"Error contacting backend service: {str(e)}"
async def get_conversation_histories(username, chatbot_type):
url = f"{base_url}/save/download_conversations"
headers = {'Content-Type': 'application/json'}
data = {
'username': username,
'chatbot_type': chatbot_type
}
async with httpx.AsyncClient(timeout=timeout) as client:
try:
response = await client.post(url, json=data, headers=headers)
if response.status_code == 200:
conversation_data = response.json()
return conversation_data
else:
return []
except Exception as e:
return []
def download_conversations(username, chatbot_type):
conversation_histories = asyncio.run(get_conversation_histories(username, chatbot_type))
files = []
temp_dir = tempfile.mkdtemp()
for conversation_entry in conversation_histories:
file_name = conversation_entry.get('file_name', f"Conversation_{len(files)+1}.txt")
conversation = conversation_entry.get('conversation', [])
conversation_text = ""
for message_pair in conversation:
if isinstance(message_pair, list) and len(message_pair) == 2:
speaker, message = message_pair
conversation_text += f"{speaker.capitalize()}: {message}\n\n"
else:
conversation_text += f"Unknown format: {message_pair}\n\n"
temp_file_path = os.path.join(temp_dir, file_name)
with open(temp_file_path, 'w') as temp_file:
temp_file.write(conversation_text)
files.append(temp_file_path)
return files
# async def get_biography(username, chatbot_type):
# url = f"{base_url}/save/generate_autobiography"
# headers = {'Content-Type': 'application/json'}
# data = {
# 'username': username,
# 'chatbot_type': chatbot_type
# }
# async with httpx.AsyncClient(timeout=timeout) as client:
# try:
# response = await client.post(url, json=data, headers=headers)
# if response.status_code == 200:
# biography_data = response.json()
# biography_text = biography_data.get('biography', '')
# return biography_text
# else:
# return "Failed to generate biography."
# except Exception as e:
# return f"Error contacting backend service: {str(e)}"
# def download_biography(username, chatbot_type):
# biography_text = asyncio.run(get_biography(username, chatbot_type))
# if not biography_text or "Failed" in biography_text or "Error" in biography_text:
# return gr.update(value=None, visible=False), gr.update(value=biography_text, visible=True)
# temp_dir = tempfile.mkdtemp()
# temp_file_path = os.path.join(temp_dir, "biography.txt")
# with open(temp_file_path, 'w') as temp_file:
# temp_file.write(biography_text)
# return temp_file_path, gr.update(value=biography_text, visible=True)
def transcribe_audio(audio_file):
if not WHISPER_AVAILABLE or model is None:
raise RuntimeError("Audio transcription model is not available in this deployment.")
transcription = model.transcribe(audio_file)["text"]
return transcription
def submit_text_and_respond(edited_text, api_key, username, selected_title, history, chatbot_type):
content = ''
for opt in options:
if opt['title'] == selected_title:
content = opt['text']
if content == '':
query_with_context = edited_text
else:
query_with_context = f'Given the following context information:\n {content}\n\n Answer the following question: {edited_text}'
print(query_with_context)
response = asyncio.run(get_backend_response(api_key, query_with_context, username, chatbot_type))
print('------')
print(response)
if isinstance(response, str):
history.append((edited_text, response))
return history, ""
doctor_response = response['doctor_response']
history.append((edited_text, doctor_response))
return history, ""
def set_initialize_button(api_key_input, username_input):
message = asyncio.run(initialization(api_key_input, username_input))
print(message)
return message, api_key_input
def save_conversation(username, chatbot_type):
response = asyncio.run(save_conversation_and_memory(username, chatbot_type))
return response
def start_recording(audio_file):
if not audio_file:
return ""
try:
transcription = transcribe_audio(audio_file)
return transcription
except Exception as e:
return f"Failed to transcribe: {str(e)}"
# Patient Sample Evaluation functions
def save_patient_evaluation(patient_id, patient_input, ai_summary, rating, feedback, expert_name, categories, comments=None):
"""Save patient sample evaluation data to a JSON file"""
timestamp = datetime.now().isoformat()
# Handle comments - can be dict with hallucination_comments and critical_omission_comments, or old format
if isinstance(comments, dict):
hallucination_comments = comments.get('hallucination_comments', '')
critical_omission_comments = comments.get('critical_omission_comments', '')
else:
hallucination_comments = ''
critical_omission_comments = ''
evaluation = {
"timestamp": timestamp,
"patient_id": patient_id,
"expert_name": expert_name,
"patient_input": patient_input,
"ai_summary": ai_summary,
"overall_rating": rating,
"feedback": feedback,
"categories": categories,
"hallucination_comments": hallucination_comments,
"critical_omission_comments": critical_omission_comments
}
# Create evaluations directory if it doesn't exist
eval_dir = "patient_evaluations"
if not os.path.exists(eval_dir):
os.makedirs(eval_dir)
# Save to JSON file
eval_file = os.path.join(eval_dir, f"patient_eval_{patient_id}_{timestamp.replace(':', '-')}.json")
with open(eval_file, 'w', encoding='utf-8') as f:
json.dump(evaluation, f, ensure_ascii=False, indent=2)
# Also append to a master CSV file for easier analysis
csv_file = os.path.join(eval_dir, "patient_evaluations_master.csv")
file_exists = os.path.isfile(csv_file)
with open(csv_file, 'a', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(['timestamp', 'patient_id', 'expert_name', 'overall_rating',
'clinical_accuracy', 'completeness_coverage', 'clinical_relevance', 'clarity_structure',
'reasoning_risk', 'actionability', 'hallucination', 'critical_omission',
'feedback', 'hallucination_comments', 'critical_omission_comments'])
writer.writerow([
timestamp, patient_id, expert_name, rating,
categories.get('clinical_accuracy', ''),
categories.get('completeness_coverage', ''),
categories.get('clinical_relevance', ''),
categories.get('clarity_structure', ''),
categories.get('reasoning_risk', ''),
categories.get('actionability', ''),
categories.get('hallucination', ''),
categories.get('critical_omission', ''),
feedback,
hallucination_comments,
critical_omission_comments
])
f.flush()
os.fsync(f.fileno())
# Save to local files (for immediate access)
# Also try to save to Hugging Face Hub if available
save_messages = []
if HF_STORAGE_AVAILABLE:
csv_row = [
timestamp, patient_id, expert_name, rating,
categories.get('clinical_accuracy', ''),
categories.get('completeness_coverage', ''),
categories.get('clinical_relevance', ''),
categories.get('clarity_structure', ''),
categories.get('reasoning_risk', ''),
categories.get('actionability', ''),
categories.get('hallucination', ''),
categories.get('critical_omission', ''),
feedback,
hallucination_comments,
critical_omission_comments
]
print(f"[HF Upload] Starting upload for patient {patient_id}...")
try:
success, msg = save_to_huggingface(evaluation, csv_row)
print(f"[HF Upload] Result: success={success}, msg={msg}")
if success:
save_messages.append(f"Uploaded to Hugging Face Hub")
else:
save_messages.append(f"HF upload failed: {msg}")
except Exception as e:
import traceback
error_detail = f"{str(e)}\n{traceback.format_exc()}"
print(f"[HF Upload] Exception occurred: {error_detail}")
save_messages.append(f"HF upload error: {str(e)}")
else:
print(f"[HF Upload] HF_STORAGE_AVAILABLE is False, skipping upload")
# Combine messages
base_msg = f"Patient evaluation saved successfully at {timestamp}"
if save_messages:
return f"{base_msg}\n" + "\n".join(save_messages)
return base_msg
def submit_patient_evaluation(patient_id, patient_input, ai_summary, overall_rating,
clinical_accuracy, completeness_coverage, clinical_relevance, clarity_structure,
reasoning_risk, actionability, hallucination, critical_omission,
feedback, expert_name, hallucination_comments, critical_omission_comments):
"""Process and save the patient evaluation"""
if not expert_name.strip():
error_message = "⚠️ ERROR: Please enter your Clinician ID before submitting evaluation."
# Trigger JavaScript alert by including a special marker
return f"__ALERT__{error_message}", feedback, overall_rating, clinical_accuracy, completeness_coverage, clinical_relevance, clarity_structure, reasoning_risk, actionability, hallucination, critical_omission, hallucination_comments, critical_omission_comments, expert_name
# Check all ratings and collect missing ones
rating_checks = {
'Overall Quality': overall_rating,
'Clinical Accuracy': clinical_accuracy,
'Completeness / Coverage': completeness_coverage,
'Clinical Relevance': clinical_relevance,
'Clarity and Structure': clarity_structure,
'Reasoning / Risk Stratification': reasoning_risk,
'Actionability': actionability,
'Hallucination': hallucination,
'Critical Omission': critical_omission
}
missing_ratings = [name for name, value in rating_checks.items() if value == 0]
if missing_ratings:
missing_list = "\n".join([f" • {name}" for name in missing_ratings])
error_message = f"⚠️ WARNING: Please rate all evaluation dimensions before submitting.\n\nMissing ratings:\n{missing_list}\n\nPlease provide ratings for all items above."
# Trigger JavaScript alert by including a special marker
return f"__ALERT__{error_message}", feedback, overall_rating, clinical_accuracy, completeness_coverage, clinical_relevance, clarity_structure, reasoning_risk, actionability, hallucination, critical_omission, hallucination_comments, critical_omission_comments, expert_name
categories = {
'clinical_accuracy': clinical_accuracy,
'completeness_coverage': completeness_coverage,
'clinical_relevance': clinical_relevance,
'clarity_structure': clarity_structure,
'reasoning_risk': reasoning_risk,
'actionability': actionability,
'hallucination': hallucination,
'critical_omission': critical_omission
}
# Store hallucination and critical omission comments
comments = {
'hallucination_comments': hallucination_comments.strip() if hallucination_comments else '',
'critical_omission_comments': critical_omission_comments.strip() if critical_omission_comments else ''
}
result = save_patient_evaluation(patient_id, patient_input, ai_summary,
overall_rating, feedback, expert_name, categories, comments)
# Reset form after successful submission (keep expert_name)
return result, "", 0, 0, 0, 0, 0, 0, 0, 0, 0, "", "", expert_name
def get_conversation_for_evaluation(history):
"""Get the last conversation pair for evaluation"""
if not history or len(history) == 0:
return "", ""
last_conversation = history[-1]
user_input = last_conversation[0] if len(last_conversation) > 0 else ""
bot_response = last_conversation[1] if len(last_conversation) > 1 else ""
return user_input, bot_response
def export_patient_evaluations():
"""Export patient evaluation data for analysis"""
eval_dir = "patient_evaluations"
csv_file = os.path.join(eval_dir, "patient_evaluations_master.csv")
if not os.path.exists(csv_file):
return None, "No patient evaluation data found."
return csv_file, f"Patient evaluation data exported."
def get_patient_evaluation_stats():
"""Get basic statistics about patient evaluations"""
eval_dir = "patient_evaluations"
csv_file = os.path.join(eval_dir, "patient_evaluations_master.csv")
if not os.path.exists(csv_file):
return "No patient evaluation data available."
try:
import pandas as pd
# Force reload from disk
df = pd.read_csv(csv_file)
if df.empty:
return "No patient evaluation data available."
total_evaluations = len(df)
avg_overall_rating = df['overall_rating'].mean() if 'overall_rating' in df.columns else 0
avg_medical_accuracy = df['medical_accuracy'].mean() if 'medical_accuracy' in df.columns else 0
avg_completeness = df['completeness'].mean() if 'completeness' in df.columns else 0
expert_count = df['expert_name'].nunique() if 'expert_name' in df.columns else 0
patient_count = df['patient_id'].nunique() if 'patient_id' in df.columns else 0
total_sentence_comments = df['sentence_comments_count'].sum() if 'sentence_comments_count' in df.columns else 0
stats = f"""
**Patient Evaluation Statistics**
- **Total Evaluations**: {total_evaluations}
- **Patients Evaluated**: {patient_count}
- **Average Overall Rating**: {avg_overall_rating:.2f}/5
- **Average Medical Accuracy**: {avg_medical_accuracy:.2f}/5
- **Average Completeness**: {avg_completeness:.2f}/5
- **Number of Experts**: {expert_count}
- **Total Sentence Comments**: {total_sentence_comments}
- **Latest Evaluation**: {df['timestamp'].iloc[-1] if not df.empty else 'N/A'}
"""
return stats
except ImportError:
# Fallback if pandas is not available
with open(csv_file, 'r', encoding='utf-8') as f:
lines = f.readlines()
total_evaluations = len(lines) - 1 # Subtract header
return f"Total patient evaluations: {total_evaluations} (Install pandas for detailed stats)"
except Exception as e:
return f"Error reading patient evaluation data: {str(e)}"
def create_backup_zip():
"""Create a backup zip file of all evaluation data"""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_filename = f"patient_evaluations_backup_{timestamp}.zip"
with zipfile.ZipFile(backup_filename, 'w') as backup_zip:
# Add CSV file if it exists
csv_file = "patient_evaluations/patient_evaluations_master.csv"
if os.path.exists(csv_file):
backup_zip.write(csv_file, f"patient_evaluations_master_{timestamp}.csv")
# Add JSON files
eval_dir = "patient_evaluations"
if os.path.exists(eval_dir):
for filename in os.listdir(eval_dir):
if filename.endswith('.json'):
file_path = os.path.join(eval_dir, filename)
backup_zip.write(file_path, f"json_files/{filename}")
return backup_filename, f"Backup created successfully: {backup_filename}"
except Exception as e:
return None, f"Error creating backup: {str(e)}"
def update_methods(chapter):
return gr.update(choices=interview_protocols[chapter], value=interview_protocols[chapter][0])
# def update_memory_graph(memory_data):
# table_data = []
# for node in memory_data:
# table_data.append([
# node.get('date', ''),
# node.get('topic', ''),
# node.get('event_description', ''),
# node.get('people_involved', '')
# ])
# return table_data
# def update_prompts(chatbot_display_name):
# chatbot_type = display_to_value.get(chatbot_display_name, 'enhanced')
# prompts = fetch_default_prompts(chatbot_type)
# return (
# gr.update(value=prompts.get('system_prompt', '')),
# gr.update(value=prompts.get('conv_instruction_prompt', '')),
# gr.update(value=prompts.get('therapy_prompt', '')),
# gr.update(value=prompts.get('autobio_generation_prompt', '')),
# )
# def update_chatbot_type(chatbot_display_name):
# chatbot_type = display_to_value.get(chatbot_display_name, 'enhanced')
# return chatbot_type
# CSS to keep the buttons small
css = """
#start_button, #reset_button {
padding: 4px 10px !important;
font-size: 12px !important;
width: auto !important;
}
/* Force scrollable areas for long text */
#patient_input_display textarea {
max-height: 420px !important;
overflow-y: auto !important;
}
#ai_summary_display {
max-height: 420px !important;
overflow-y: auto !important;
}
#ai_summary_markdown {
max-height: 480px !important;
overflow-y: auto !important;
}
"""
# Add CSS for clickable summary sentences
css = css + """
.sum-sent {
cursor: pointer;
padding: 2px 4px;
border-radius: 3px;
transition: background-color 0.2s;
user-select: none; /* Prevent text selection */
-webkit-user-select: none;
-moz-user-select: none;
-ms-user-select: none;
}
.sum-sent:hover {
background-color: #e0e0e0;
text-decoration: underline;
}
.sum-sent:active {
background-color: #b0b0b0;
}
"""
# Add global JavaScript to fix form validation issues and ensure click handlers work
global_js = """
"""
with gr.Blocks() as app:
# Add CSS via HTML component (Gradio 3.50.2+ compatibility)
gr.HTML(f"")
# Add JavaScript for form validation and alert monitoring
gr.HTML(global_js)
# Debug: Print HF storage status on startup
print("="*60)
print("HF Storage Status on Startup")
print("="*60)
print(f"HF_STORAGE_AVAILABLE: {HF_STORAGE_AVAILABLE}")
if HF_STORAGE_AVAILABLE:
try:
from hf_storage import get_hf_storage, HF_AVAILABLE
print(f"HF_AVAILABLE (in hf_storage): {HF_AVAILABLE}")
storage = get_hf_storage()
if storage:
print(f"Storage repo_id: {storage.repo_id}")
print(f"HF_TOKEN set: {'Yes' if os.getenv('HF_TOKEN') else 'No'}")
print(f"HF_EVAL_REPO_ID set: {os.getenv('HF_EVAL_REPO_ID') or 'No'}")
print(f"SPACE_ID: {os.getenv('SPACE_ID') or 'Not set'}")
except Exception as e:
print(f"Error checking storage: {e}")
print("="*60)
# In Spaces, start backend inside the same process
start_backend_server()
chatbot_type_state = gr.State('enhanced')
api_key_state = gr.State()
selected_title = gr.State()
is_running = gr.State()
target_timestamp = gr.State()
# Load patient data globally
def load_jsonl(filepath):
data = []
def _read(path):
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
data.append(json.loads(line))
try:
_read(filepath)
return data
except FileNotFoundError:
repo_dir = os.path.dirname(os.path.abspath(__file__))
local_prefix = "/Users/liuzijie/Desktop/chatbot-mimic-notes/"
if isinstance(filepath, str) and filepath.startswith(local_prefix):
relative_part = filepath[len(local_prefix):]
alt_path = os.path.join(repo_dir, relative_part)
if os.path.exists(alt_path):
_read(alt_path)
return data
alt_path2 = os.path.join(repo_dir, os.path.basename(filepath))
if os.path.exists(alt_path2):
_read(alt_path2)
return data
return []
# Safe JSON file loader
def load_json(path):
try:
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception:
return None
# Load per admission data (optional; skip if missing in Space)
# Prefer env var ADMISSION_JSONL; otherwise resolve relative to this file
script_dir_for_adm = os.path.dirname(os.path.abspath(__file__))
default_adm_path = os.path.join(script_dir_for_adm, "per_admission_summaries", "llama-3.2-3b_per_admission.jsonl")
jsonl_path = os.environ.get("ADMISSION_JSONL", default_adm_path)
admission_data = load_jsonl(jsonl_path) if os.path.exists(jsonl_path) else []
# Group admissions by patient_id
patient_admissions = {}
for admission in admission_data:
patient_id = admission['patient_id']
if patient_id not in patient_admissions:
patient_admissions[patient_id] = []
patient_admissions[patient_id].append(admission)
# Create options with admission tabs for each patient
options = []
for patient_id, admissions in patient_admissions.items():
for i, admission in enumerate(admissions):
admission_id = admission['hadm_id']
admit_time = admission['admittime']
summary = admission.get('summary', '')
input_text = admission.get('input_text', '')
# Create a unique title for each admission
title = f"Patient {patient_id} - Admission {i+1} (ID: {admission_id})"
options.append({
'title': title,
'text': input_text, # Use the actual input text that was given to LLM
'note': summary, # AI generated summary
'patient_id': patient_id,
'admission_id': admission_id,
'admission_index': i+1,
'admit_time': admit_time,
'raw_data': admission.get('raw', {})
})
# ----------------------
# Load new summaries data (grouped by patient -> admissions)
# Directory: /Users/liuzijie/Desktop/chatbot-mimic-notes/summaries
# Filenames: subject_{patient}_hadm_{hadm}_model_input.txt and subject_{patient}_hadm_{hadm}_summary.txt
# ----------------------
# Prefer env var, then path relative to this file (repo root)
script_dir = os.path.dirname(os.path.abspath(__file__))
default_summaries_dir = os.path.join(script_dir, "summaries")
summaries_dir = os.environ.get("SUMMARIES_DIR", default_summaries_dir)
def load_text_file(path):
try:
with open(path, 'r', encoding='utf-8') as f:
return f.read()
except Exception:
return ""
date_regex = re.compile(r"(\d{4}-\d{2}-\d{2}(?:[ T]\d{2}:\d{2}(?::\d{2})?)?)")
def extract_timestamp_from_text(text):
if not text:
return ""
m = date_regex.search(text)
return m.group(1) if m else ""
# Clean summary output by removing JSON/object appendix and code fences
def clean_summary_output(text):
"""Remove trailing JSON block or code fences from model output, and extract only markdown summary."""
if not text:
return text
# Remove reasoning tags
text = re.sub(r'
No input data") print(f"[Highlight] Input text length: {len(base)} chars") print(f"[Highlight] Input text first 100 chars: {base[:100]}") # Escape HTML html_text = base.replace('&', '&').replace('<', '<').replace('>', '>') # Apply highlights for all top 10 matches if target_matches: import re print(f"[Highlight] Applying highlights to {len(target_matches)} matches") # Sort by similarity (highest first) to highlight in order sorted_matches = sorted(target_matches, key=lambda x: x.get('similarity', 0), reverse=True) # Define color intensity based on rank highlighted_count = 0 for idx, match in enumerate(sorted_matches): sentence = match.get('input_sentence', '') similarity = match.get('similarity', 0) if not sentence: print(f"[Highlight] Match #{idx+1}: Empty sentence, skipping") continue # Check if sentence exists in original text BEFORE escaping if sentence not in base: print(f"[Highlight] Match #{idx+1}: Sentence not in original text") print(f"[Highlight] Looking for: {sentence[:80]}") # Try to find partial match sentence_parts = sentence.split('\n')[0] # Try first line if sentence_parts in base: print(f"[Highlight] Found partial match: {sentence_parts[:60]}") continue # Escape HTML in sentence escaped_sentence = sentence.replace('&', '&').replace('<', '<').replace('>', '>') # Determine color based on rank and similarity if idx == 0: # Top match: strong highlight bg_color = '#ffeb3b' if mode == "Cosine Only" else '#4caf50' opacity = '1.0' elif idx < 3: # Top 3: medium highlight bg_color = '#ffeb3b' if mode == "Cosine Only" else '#4caf50' opacity = '0.7' elif idx < 5: # Top 5: lighter highlight bg_color = '#ffeb3b' if mode == "Cosine Only" else '#4caf50' opacity = '0.5' else: # Top 10: very light highlight bg_color = '#ffeb3b' if mode == "Cosine Only" else '#4caf50' opacity = '0.3' # Create highlight with rank number esc = re.escape(escaped_sentence) rank_label = f'#{idx+1}' highlighted = f'{rank_label}{escaped_sentence}' # Replace first occurrence only before_count = html_text.count(escaped_sentence) html_text = re.sub(esc, highlighted, html_text, count=1, flags=re.IGNORECASE) after_count = html_text.count(escaped_sentence) if before_count > after_count: highlighted_count += 1 print(f"[Highlight] Match #{idx+1}: Applied highlight (similarity={similarity:.4f})") else: print(f"[Highlight] Match #{idx+1}: NOT FOUND in text (similarity={similarity:.4f})") print(f"[Highlight] Looking for: {escaped_sentence[:80]}...") print(f"[Highlight] Total highlights applied: {highlighted_count}/{len(target_matches)}") else: print(f"[Highlight] No target matches to highlight") # Wrap in scrollable div result = f'
'
f'{sanitized_input}'
''
)
# Return raw markdown text; scrolling controlled by CSS on elem_id
return input_html, summary_text
except Exception as e:
import traceback
error_msg = f"Error loading patient data: {str(e)}\n{traceback.format_exc()}"
print(f"[ERROR] {error_msg}")
return "Error loading data", f"Error: {str(e)}"
# Handle clinician ID selection to filter patient list
if eval_groups:
def update_patient_list_by_group(selected_clinician):
"""Filter patient list based on selected clinician ID and auto-fill clinician ID field"""
if not selected_clinician:
filtered_patients = []
info_text = ""
clinician_id_update = gr.update()
else:
# Directly use selected_clinician as key (no mapping needed)
group_data = eval_groups.get(selected_clinician, {})
group_patients = group_data.get('all_patients', [])
# Filter: only show patients that are in both the group AND the loaded patient list
# Convert to strings for comparison
group_patients_str = [str(p) for p in group_patients]
patient_ids_list_str = [str(p) for p in patient_ids_list]
filtered_patients = [p for p in patient_ids_list_str if p in group_patients_str]
print(f"[DEBUG] Clinician {selected_clinician}: group has {len(group_patients)} patients, {len(filtered_patients)} match loaded data")
print(f"[DEBUG] Group patients (as strings): {group_patients_str}")
print(f"[DEBUG] Loaded patients (as strings): {patient_ids_list_str}")
print(f"[DEBUG] Filtered patients: {filtered_patients}")
if len(filtered_patients) == 0:
print(f"[ERROR] No patients matched! Check if patient IDs match between groups and loaded data.")
info_text = f"**{selected_clinician}**: {len(filtered_patients)} patients assigned"
clinician_id_update = gr.update(value=selected_clinician)
return gr.update(choices=filtered_patients, value=None), info_text, clinician_id_update
eval_group_dropdown.change(
fn=update_patient_list_by_group,
inputs=[eval_group_dropdown],
outputs=[eval_patient_radio, group_info_display, expert_name_input]
)
# When patient changes, update display directly (single admission per patient)
def on_patient_change(patient_id):
return update_patient_eval_display(patient_id)
eval_patient_radio.change(
fn=on_patient_change,
inputs=[eval_patient_radio],
outputs=[patient_input_display, ai_summary_display]
)
# Helper function to create alert popup HTML
def create_alert_popup(message):
"""Create HTML for a modal popup alert"""
# Escape HTML special characters
escaped_msg = message.replace('&', '&').replace('<', '<').replace('>', '>').replace('"', '"').replace("'", ''')
# Convert newlines to