human_evaluator / app.py
iyadsultan's picture
Refactor evaluation process and enhance navigation controls
d016624
import flask
from flask import Flask, render_template, request, redirect, url_for, session, flash, send_file
import pandas as pd
import numpy as np
import random
import csv
from datetime import datetime
import os
import io
import shutil
import traceback
import chardet
# Configure data directory for HF Spaces compatibility
if 'SPACE_ID' in os.environ:
# Running on HF Spaces - use persistent directory
DATA_DIR = os.path.join(os.getcwd(), 'data')
else:
# Local development
DATA_DIR = os.environ.get('DATA_DIR', '/tmp/human_notes_evaluator')
# Configure the Flask app
app = Flask(__name__)
app.secret_key = os.environ.get('SECRET_KEY', 'your-secret-key-here')
# Configure session
app.config['SESSION_PERMANENT'] = False
app.config['SESSION_TYPE'] = 'filesystem'
# Constants
CRITERIA = [
"Up-to-date",
"Accurate",
"Thorough",
"Relevant",
"Well-organized",
"Clear",
"Concise",
"Thoughtful",
"Internally consistent"
]
CRITERIA_DESCRIPTIONS = [
"The note contains the most recent test results and recommendations.",
"The note is true. It is free of incorrect information.",
"The note is complete and documents all of the issues of importance to the patient.",
"The note is extremely relevant, providing valuable information and/or analysis.",
"The note is well-formed and structured in a way that helps the reader understand the patient's clinical course.",
"The note is clear, without ambiguity or sections that are difficult to understand.",
"The note is brief, to the point, and without redundancy.",
"The note reflects the author's understanding of the patient's status and ability to develop a plan of care.",
"No part of the note ignores or contradicts any other part."
]
# Note origin options
NOTE_ORIGINS = [
"Generative AI note",
"Human written note",
"I am not sure"
]
ERROR_LOG = [] # Store recent errors for debugging
def log_error(error_msg):
"""Log an error message for debugging."""
ERROR_LOG.append(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {error_msg}")
# Keep only the most recent 10 errors
while len(ERROR_LOG) > 10:
ERROR_LOG.pop(0)
print(f"[LOG] {error_msg}") # Also print to console
def ensure_data_directory():
"""Ensure data directory exists in a persistent location"""
global DATA_DIR
# For HF Spaces, use the current working directory which persists
if 'SPACE_ID' in os.environ:
DATA_DIR = os.path.join(os.getcwd(), 'data')
log_error(f"Running on HF Spaces, using data directory: {DATA_DIR}")
try:
os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(os.path.join(DATA_DIR, 'sessions'), exist_ok=True)
log_error(f"Created/verified data directory at {DATA_DIR}")
# Create template files if they don't exist
create_template_files()
except Exception as e:
log_error(f"Error creating data directory: {str(e)}")
raise
def create_template_files():
"""Create template CSV and instructions files if they don't exist"""
# Create sample documents template
template_path = os.path.join(DATA_DIR, 'sample_documents_template.csv')
if not os.path.exists(template_path):
template_data = [
['filename', 'description', 'mrn', 'note'],
['sample1.txt', 'Example Clinical Note', 'MRN12345', 'This is a sample clinical note for evaluation. Patient presents with...'],
['sample2.txt', 'Example Progress Note', 'MRN67890', 'Patient returns for follow-up visit. Current medications include...']
]
with open(template_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerows(template_data)
log_error(f"Created template file at {template_path}")
# Create instructions.md if it doesn't exist
instructions_path = os.path.join(DATA_DIR, 'instructions.md')
if not os.path.exists(instructions_path):
with open(instructions_path, 'w', encoding='utf-8') as f:
f.write("# Instructions for Human Notes Evaluator\n\n")
f.write("## How to Use This Application\n\n")
f.write("1. Upload a CSV file with your documents\n")
f.write("2. Enter your name as the evaluator\n")
f.write("3. Rate each document on the 9 criteria\n")
f.write("4. Export results when complete\n")
log_error(f"Created instructions at {instructions_path}")
def detect_encoding(file_content):
"""Detect the encoding of file content."""
if isinstance(file_content, str):
file_content = file_content.encode()
result = chardet.detect(file_content)
return result['encoding'] or 'utf-8'
def load_documents():
"""Load all documents from CSV file."""
try:
file_path = os.path.join(DATA_DIR, 'documents.csv')
if not os.path.exists(file_path):
log_error(f"Documents file not found at {file_path}")
return []
# Read file and detect encoding
with open(file_path, 'rb') as f:
content = f.read()
encoding = detect_encoding(content)
log_error(f"Detected encoding: {encoding}")
# Parse CSV
df = pd.read_csv(io.BytesIO(content), encoding=encoding)
log_error("Successfully parsed CSV")
# Convert columns to string to ensure compatibility
for col in df.columns:
df[col] = df[col].astype(str).replace('nan', '')
# Log stats
log_error(f"DataFrame columns: {list(df.columns)}")
log_error(f"DataFrame shape: {df.shape}")
# Convert to list of dictionaries
documents = df.to_dict('records')
log_error(f"Loaded {len(documents)} documents for evaluation")
return documents
except Exception as e:
log_error(f"Error in load_documents: {str(e)}")
return []
def save_evaluation(data):
"""Save evaluation data to CSV file."""
try:
ensure_data_directory()
log_error(f"Saving evaluation for {data.get('document_title')} by {data.get('investigator_name')}")
eval_path = os.path.join(DATA_DIR, 'evaluations.csv')
# Add timestamp
data['timestamp'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Check if file exists
file_exists = os.path.exists(eval_path)
# Define column order
columns = ['timestamp', 'document_title', 'description', 'mrn', 'investigator_name',
'session_id'] + CRITERIA + ['note_origin']
# Ensure all columns exist in data
for col in columns:
if col not in data:
data[col] = ''
# Write to CSV
with open(eval_path, 'a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=columns, extrasaction='ignore')
if not file_exists:
writer.writeheader()
log_error("Created new evaluations.csv with header")
writer.writerow(data)
log_error(f"Successfully saved evaluation to {eval_path}")
return True
except Exception as e:
log_error(f"Error saving evaluation: {str(e)}")
return False
def get_results():
"""Get all evaluation results."""
try:
eval_path = os.path.join(DATA_DIR, 'evaluations.csv')
if not os.path.exists(eval_path):
return pd.DataFrame(), {}, {}
# Read evaluations
eval_df = pd.read_csv(eval_path)
# Load documents for descriptions and MRNs
try:
docs_df = pd.read_csv(os.path.join(DATA_DIR, 'documents.csv'))
filename_to_desc = dict(zip(docs_df['filename'], docs_df['description']))
filename_to_mrn = dict(zip(docs_df['filename'], docs_df['mrn']))
except:
filename_to_desc = {}
filename_to_mrn = {}
return eval_df, filename_to_desc, filename_to_mrn
except Exception as e:
log_error(f"Error in get_results: {str(e)}")
return pd.DataFrame(), {}, {}
# Progress tracking functions
def get_progress_file(evaluator_name):
"""Get path to progress file for an evaluator."""
safe_name = "".join(c for c in evaluator_name if c.isalnum() or c in (' ', '-', '_')).rstrip()
return os.path.join(DATA_DIR, 'sessions', f'{safe_name}_progress.txt')
def save_current_index(evaluator_name, index):
"""Save current document index to file."""
try:
os.makedirs(os.path.join(DATA_DIR, 'sessions'), exist_ok=True)
with open(get_progress_file(evaluator_name), 'w') as f:
f.write(str(index))
return True
except Exception as e:
log_error(f"Error saving progress: {str(e)}")
return False
def load_current_index(evaluator_name):
"""Load current document index from file."""
try:
progress_file = get_progress_file(evaluator_name)
if os.path.exists(progress_file):
with open(progress_file, 'r') as f:
return int(f.read().strip())
return 1
except Exception as e:
log_error(f"Error loading progress: {str(e)}")
return 1
def store_evaluator_name(name):
"""Store evaluator name in a file for persistence."""
try:
ensure_data_directory()
with open(os.path.join(DATA_DIR, 'current_evaluator.txt'), 'w') as f:
f.write(name)
log_error(f"Stored evaluator name: {name}")
return True
except Exception as e:
log_error(f"Error storing evaluator name: {str(e)}")
return False
def get_stored_evaluator_name():
"""Get stored evaluator name from file."""
try:
file_path = os.path.join(DATA_DIR, 'current_evaluator.txt')
if os.path.exists(file_path):
with open(file_path, 'r') as f:
return f.read().strip()
return None
except Exception as e:
log_error(f"Error retrieving evaluator name: {str(e)}")
return None
@app.route('/', methods=['GET', 'POST'])
def index():
"""Home page with file upload and evaluator name."""
if request.method == 'POST':
ensure_data_directory()
# Get evaluator name
evaluator_name = request.form.get('evaluator_name', '').strip()
if not evaluator_name:
flash("Please enter your name as the evaluator.")
return render_template('index.html')
# Process file upload
if 'file' in request.files:
file = request.files['file']
if file.filename == '':
flash("No file selected.")
return render_template('index.html')
if file and file.filename.endswith('.csv'):
try:
# Read file content
file_content = file.read()
# Detect encoding and parse CSV
encoding = detect_encoding(file_content)
csv_text = file_content.decode(encoding)
df = pd.read_csv(io.StringIO(csv_text))
# Validate columns
required_columns = ['filename', 'description', 'mrn', 'note']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
flash(f"Missing required columns: {', '.join(missing_columns)}")
return render_template('index.html')
# Save documents
documents_path = os.path.join(DATA_DIR, 'documents.csv')
df.to_csv(documents_path, index=False)
# Set session
session['evaluator_name'] = evaluator_name
session['session_id'] = f"{evaluator_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Store evaluator name
store_evaluator_name(evaluator_name)
flash(f"File uploaded successfully! Found {len(df)} documents.")
return redirect(url_for('evaluate'))
except Exception as e:
flash(f"Error processing file: {str(e)}")
log_error(f"File upload error: {str(e)}")
else:
flash("Please upload a CSV file.")
return render_template('index.html')
# GET request
evaluator_name = session.get('evaluator_name', '') or get_stored_evaluator_name() or ''
return render_template('index.html', evaluator_name=evaluator_name)
@app.route('/evaluate', methods=['GET', 'POST'])
def evaluate():
"""Document evaluation page."""
# Get evaluator name from multiple sources
evaluator_name = (
session.get('evaluator_name') or
request.args.get('evaluator') or
get_stored_evaluator_name()
)
if not evaluator_name:
flash("Please enter your name before evaluating documents.")
return redirect(url_for('index'))
# Update session
session['evaluator_name'] = evaluator_name
if 'session_id' not in session:
session['session_id'] = f"{evaluator_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Ensure directories exist
ensure_data_directory()
# Load documents
documents = load_documents()
if not documents:
return render_template('no_documents.html')
# Get current index
current_index = load_current_index(evaluator_name)
# Handle jump requests
jump_to = request.args.get('jump_to', type=int)
if jump_to and 1 <= jump_to <= len(documents):
current_index = jump_to
save_current_index(evaluator_name, current_index)
# Handle POST requests
if request.method == 'POST':
action = request.form.get('action', 'submit')
# Handle skip and stop_save BEFORE any validation
if action == 'skip':
current_index = min(current_index + 1, len(documents) + 1)
save_current_index(evaluator_name, current_index)
flash("Document skipped.")
return redirect(url_for('evaluate'))
elif action == 'stop_save':
flash("Progress saved. You can resume later.")
return redirect(url_for('results', session_saved=True))
# Only validate fields for submit action
elif action == 'submit':
if current_index <= len(documents):
current_doc = documents[current_index - 1]
# Prepare evaluation data
eval_data = {
'document_title': current_doc.get('filename', ''),
'description': current_doc.get('description', ''),
'mrn': current_doc.get('mrn', ''),
'investigator_name': evaluator_name,
'session_id': session.get('session_id', ''),
'note_origin': request.form.get('note_origin', '')
}
# Add criteria scores
all_scores_present = True
for i, criterion in enumerate(CRITERIA):
score = request.form.get(f'criteria_{i}')
if score:
eval_data[criterion] = score
else:
all_scores_present = False
flash(f"Please rate: {criterion}")
# Check note origin
if not eval_data['note_origin']:
all_scores_present = False
flash("Please select a note origin assessment.")
# Save if all data present
if all_scores_present:
if save_evaluation(eval_data):
current_index = min(current_index + 1, len(documents) + 1)
save_current_index(evaluator_name, current_index)
flash("Evaluation saved successfully!")
else:
flash("Error saving evaluation. Please try again.")
return redirect(url_for('evaluate'))
# Check if all documents evaluated
if current_index > len(documents):
flash("All documents have been evaluated. Thank you!")
return redirect(url_for('results'))
# Get current document
document = documents[current_index - 1]
# Calculate progress
evaluated_docs = current_index - 1
progress = int((evaluated_docs / len(documents)) * 100) if documents else 0
return render_template('evaluate.html',
current_note_number=current_index,
evaluator_name=evaluator_name,
note=document.get('note', ''),
# Removed description from here
mrn=document.get('mrn', ''),
criteria=CRITERIA,
descriptions=CRITERIA_DESCRIPTIONS,
score_range=range(1, 6),
note_origins=NOTE_ORIGINS,
total_docs=len(documents),
evaluated_docs=evaluated_docs,
progress=progress
)
@app.route('/jump', methods=['POST'])
def jump_to_document():
"""Jump to a specific document number."""
try:
document_number = int(request.form.get('document_number', 1))
documents = load_documents()
if document_number < 1:
flash("Document number must be 1 or greater.")
elif document_number > len(documents):
flash(f"Document number cannot be greater than {len(documents)}.")
else:
return redirect(url_for('evaluate', jump_to=document_number))
except ValueError:
flash("Please enter a valid document number.")
return redirect(url_for('evaluate'))
@app.route('/results')
def results():
"""Results page showing all evaluations."""
try:
eval_df, filename_to_desc, filename_to_mrn = get_results()
# Convert to list of dicts and enhance with descriptions/MRNs
evaluations = []
if not eval_df.empty:
for _, row in eval_df.iterrows():
eval_dict = row.to_dict()
doc_title = eval_dict.get('document_title', '')
# Add description and MRN if not already present
if 'description' not in eval_dict or pd.isna(eval_dict['description']):
eval_dict['description'] = filename_to_desc.get(doc_title, '')
if 'mrn' not in eval_dict or pd.isna(eval_dict['mrn']):
eval_dict['mrn'] = filename_to_mrn.get(doc_title, '')
evaluations.append(eval_dict)
session_saved = request.args.get('session_saved', False)
return render_template('results.html',
evaluations=evaluations,
criteria=CRITERIA,
descriptions=CRITERIA_DESCRIPTIONS,
session_saved=session_saved
)
except Exception as e:
log_error(f"Error in results route: {str(e)}")
flash(f"Error loading results: {str(e)}")
return redirect(url_for('index'))
@app.route('/export-csv')
def export_csv():
"""Export evaluations to CSV."""
try:
eval_df, _, _ = get_results()
if eval_df.empty:
flash('No evaluations available to export.')
return redirect(url_for('results'))
# Create CSV in memory
output = io.StringIO()
eval_df.to_csv(output, index=False, quoting=csv.QUOTE_ALL)
output.seek(0)
# Convert to bytes
mem = io.BytesIO()
mem.write(output.getvalue().encode('utf-8'))
mem.seek(0)
return send_file(
mem,
mimetype='text/csv',
as_attachment=True,
download_name=f'evaluations_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
)
except Exception as e:
flash(f'Error exporting CSV: {str(e)}')
log_error(f"Export error: {str(e)}")
return redirect(url_for('results'))
@app.route('/upload-documents', methods=['GET', 'POST'])
def upload_documents():
"""Alternative document upload page."""
if request.method == 'POST':
try:
if 'file' not in request.files:
flash('No file selected')
return redirect(request.url)
file = request.files['file']
if file.filename == '':
flash('No file selected')
return redirect(request.url)
if file and file.filename.endswith('.csv'):
ensure_data_directory()
# Save file
file_path = os.path.join(DATA_DIR, 'documents.csv')
file.save(file_path)
# Verify file
try:
df = pd.read_csv(file_path)
flash(f'Documents uploaded successfully! Found {len(df)} documents.')
return redirect(url_for('index'))
except Exception as e:
flash(f'File uploaded but could not be parsed: {str(e)}')
else:
flash('Please upload a CSV file')
except Exception as e:
flash(f'Error uploading file: {str(e)}')
log_error(f"Upload error: {str(e)}")
return render_template('upload_documents.html')
@app.route('/debug')
def debug():
"""Debug page showing application state."""
ensure_data_directory()
documents = load_documents()
eval_df, _, _ = get_results()
evaluations = [] if eval_df.empty else eval_df.to_dict('records')
debug_info = {
'data_dir': DATA_DIR,
'data_dir_exists': os.path.exists(DATA_DIR),
'data_dir_writable': os.access(DATA_DIR, os.W_OK) if os.path.exists(DATA_DIR) else False,
'current_working_dir': os.getcwd(),
'session_id': session.get('session_id', 'None'),
'evaluator_name': session.get('evaluator_name', 'None'),
'documents_count': len(documents),
'evaluations_count': len(evaluations),
'environment': 'HF Spaces' if 'SPACE_ID' in os.environ else 'Local'
}
return render_template('debug.html',
documents=documents,
evaluations=evaluations,
documents_exists=os.path.exists(os.path.join(DATA_DIR, 'documents.csv')),
evaluations_exists=os.path.exists(os.path.join(DATA_DIR, 'evaluations.csv')),
errors=ERROR_LOG,
debug_info=debug_info
)
@app.route('/instructions')
def view_instructions():
"""Display instructions page."""
return render_template('instructions.html',
criteria=CRITERIA,
descriptions=CRITERIA_DESCRIPTIONS
)
@app.route('/download/instructions')
def download_instructions():
"""Download instructions as markdown."""
try:
instructions_path = os.path.join(DATA_DIR, 'instructions.md')
return send_file(instructions_path,
mimetype='text/markdown',
download_name='instructions.md',
as_attachment=True
)
except FileNotFoundError:
flash('Instructions file not found.')
return redirect(url_for('index'))
@app.route('/download/template')
def download_template():
"""Download sample template CSV."""
try:
template_path = os.path.join(DATA_DIR, 'sample_documents_template.csv')
return send_file(template_path,
mimetype='text/csv',
download_name='sample_documents_template.csv',
as_attachment=True
)
except FileNotFoundError:
flash('Template file not found.')
return redirect(url_for('index'))
@app.route('/reset', methods=['POST'])
def reset():
"""Reset session and clear evaluations."""
session.clear()
# Backup and remove evaluations
evaluations_path = os.path.join(DATA_DIR, 'evaluations.csv')
if os.path.exists(evaluations_path):
backup_path = f"{evaluations_path}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}"
try:
shutil.copy(evaluations_path, backup_path)
log_error(f"Created backup at {backup_path}")
except Exception as e:
log_error(f"Could not create backup: {str(e)}")
os.remove(evaluations_path)
log_error("Removed evaluations.csv")
flash('Session reset. All evaluation data cleared.')
return redirect(url_for('index'))
@app.route('/clear-corrupted-data', methods=['POST'])
def clear_corrupted_data():
"""Clear corrupted evaluations file."""
evaluations_path = os.path.join(DATA_DIR, 'evaluations.csv')
if os.path.exists(evaluations_path):
backup_path = f"{evaluations_path}.corrupted.{datetime.now().strftime('%Y%m%d_%H%M%S')}"
try:
shutil.copy(evaluations_path, backup_path)
log_error(f"Backed up corrupted file to {backup_path}")
except Exception as e:
log_error(f"Could not backup: {str(e)}")
os.remove(evaluations_path)
flash('Corrupted evaluation data cleared.')
else:
flash('No evaluation data file found.')
return redirect(url_for('results'))
@app.route('/error')
def error_page():
"""Display error information."""
error_message = request.args.get('message', 'An unknown error occurred')
error_details = request.args.get('details', '')
return render_template('error.html',
error_message=error_message,
error_details=error_details
)
if __name__ == '__main__':
print("\n" + "="*60)
print(f"Application Starting at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*60 + "\n")
# Initialize
ensure_data_directory()
# Log startup info
print(f"Data directory: {DATA_DIR}")
print(f"Data directory exists: {os.path.exists(DATA_DIR)}")
print(f"Environment: {'HF Spaces' if 'SPACE_ID' in os.environ else 'Local'}")
if 'SPACE_ID' in os.environ:
print(f"Space ID: {os.environ.get('SPACE_ID')}")
print(f"Space Author: {os.environ.get('SPACE_AUTHOR_NAME')}")
print(f"Data directory contents: {os.listdir(DATA_DIR) if os.path.exists(DATA_DIR) else 'N/A'}")
print("\n" + "="*60 + "\n")
# Run the app
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 7860)), debug=True)