import flask from flask import Flask, render_template, request, redirect, url_for, session, flash, send_file import pandas as pd import numpy as np import random import csv from datetime import datetime import os import io import shutil import traceback import chardet # Configure data directory for HF Spaces compatibility if 'SPACE_ID' in os.environ: # Running on HF Spaces - use persistent directory DATA_DIR = os.path.join(os.getcwd(), 'data') else: # Local development DATA_DIR = os.environ.get('DATA_DIR', '/tmp/human_notes_evaluator') # Configure the Flask app app = Flask(__name__) app.secret_key = os.environ.get('SECRET_KEY', 'your-secret-key-here') # Configure session app.config['SESSION_PERMANENT'] = False app.config['SESSION_TYPE'] = 'filesystem' # Constants CRITERIA = [ "Up-to-date", "Accurate", "Thorough", "Relevant", "Well-organized", "Clear", "Concise", "Thoughtful", "Internally consistent" ] CRITERIA_DESCRIPTIONS = [ "The note contains the most recent test results and recommendations.", "The note is true. It is free of incorrect information.", "The note is complete and documents all of the issues of importance to the patient.", "The note is extremely relevant, providing valuable information and/or analysis.", "The note is well-formed and structured in a way that helps the reader understand the patient's clinical course.", "The note is clear, without ambiguity or sections that are difficult to understand.", "The note is brief, to the point, and without redundancy.", "The note reflects the author's understanding of the patient's status and ability to develop a plan of care.", "No part of the note ignores or contradicts any other part." ] # Note origin options NOTE_ORIGINS = [ "Generative AI note", "Human written note", "I am not sure" ] ERROR_LOG = [] # Store recent errors for debugging def log_error(error_msg): """Log an error message for debugging.""" ERROR_LOG.append(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {error_msg}") # Keep only the most recent 10 errors while len(ERROR_LOG) > 10: ERROR_LOG.pop(0) print(f"[LOG] {error_msg}") # Also print to console def ensure_data_directory(): """Ensure data directory exists in a persistent location""" global DATA_DIR # For HF Spaces, use the current working directory which persists if 'SPACE_ID' in os.environ: DATA_DIR = os.path.join(os.getcwd(), 'data') log_error(f"Running on HF Spaces, using data directory: {DATA_DIR}") try: os.makedirs(DATA_DIR, exist_ok=True) os.makedirs(os.path.join(DATA_DIR, 'sessions'), exist_ok=True) log_error(f"Created/verified data directory at {DATA_DIR}") # Create template files if they don't exist create_template_files() except Exception as e: log_error(f"Error creating data directory: {str(e)}") raise def create_template_files(): """Create template CSV and instructions files if they don't exist""" # Create sample documents template template_path = os.path.join(DATA_DIR, 'sample_documents_template.csv') if not os.path.exists(template_path): template_data = [ ['filename', 'description', 'mrn', 'note'], ['sample1.txt', 'Example Clinical Note', 'MRN12345', 'This is a sample clinical note for evaluation. Patient presents with...'], ['sample2.txt', 'Example Progress Note', 'MRN67890', 'Patient returns for follow-up visit. Current medications include...'] ] with open(template_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerows(template_data) log_error(f"Created template file at {template_path}") # Create instructions.md if it doesn't exist instructions_path = os.path.join(DATA_DIR, 'instructions.md') if not os.path.exists(instructions_path): with open(instructions_path, 'w', encoding='utf-8') as f: f.write("# Instructions for Human Notes Evaluator\n\n") f.write("## How to Use This Application\n\n") f.write("1. Upload a CSV file with your documents\n") f.write("2. Enter your name as the evaluator\n") f.write("3. Rate each document on the 9 criteria\n") f.write("4. Export results when complete\n") log_error(f"Created instructions at {instructions_path}") def detect_encoding(file_content): """Detect the encoding of file content.""" if isinstance(file_content, str): file_content = file_content.encode() result = chardet.detect(file_content) return result['encoding'] or 'utf-8' def load_documents(): """Load all documents from CSV file.""" try: file_path = os.path.join(DATA_DIR, 'documents.csv') if not os.path.exists(file_path): log_error(f"Documents file not found at {file_path}") return [] # Read file and detect encoding with open(file_path, 'rb') as f: content = f.read() encoding = detect_encoding(content) log_error(f"Detected encoding: {encoding}") # Parse CSV df = pd.read_csv(io.BytesIO(content), encoding=encoding) log_error("Successfully parsed CSV") # Convert columns to string to ensure compatibility for col in df.columns: df[col] = df[col].astype(str).replace('nan', '') # Log stats log_error(f"DataFrame columns: {list(df.columns)}") log_error(f"DataFrame shape: {df.shape}") # Convert to list of dictionaries documents = df.to_dict('records') log_error(f"Loaded {len(documents)} documents for evaluation") return documents except Exception as e: log_error(f"Error in load_documents: {str(e)}") return [] def save_evaluation(data): """Save evaluation data to CSV file.""" try: ensure_data_directory() log_error(f"Saving evaluation for {data.get('document_title')} by {data.get('investigator_name')}") eval_path = os.path.join(DATA_DIR, 'evaluations.csv') # Add timestamp data['timestamp'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # Check if file exists file_exists = os.path.exists(eval_path) # Define column order columns = ['timestamp', 'document_title', 'description', 'mrn', 'investigator_name', 'session_id'] + CRITERIA + ['note_origin'] # Ensure all columns exist in data for col in columns: if col not in data: data[col] = '' # Write to CSV with open(eval_path, 'a', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=columns, extrasaction='ignore') if not file_exists: writer.writeheader() log_error("Created new evaluations.csv with header") writer.writerow(data) log_error(f"Successfully saved evaluation to {eval_path}") return True except Exception as e: log_error(f"Error saving evaluation: {str(e)}") return False def get_results(): """Get all evaluation results.""" try: eval_path = os.path.join(DATA_DIR, 'evaluations.csv') if not os.path.exists(eval_path): return pd.DataFrame(), {}, {} # Read evaluations eval_df = pd.read_csv(eval_path) # Load documents for descriptions and MRNs try: docs_df = pd.read_csv(os.path.join(DATA_DIR, 'documents.csv')) filename_to_desc = dict(zip(docs_df['filename'], docs_df['description'])) filename_to_mrn = dict(zip(docs_df['filename'], docs_df['mrn'])) except: filename_to_desc = {} filename_to_mrn = {} return eval_df, filename_to_desc, filename_to_mrn except Exception as e: log_error(f"Error in get_results: {str(e)}") return pd.DataFrame(), {}, {} # Progress tracking functions def get_progress_file(evaluator_name): """Get path to progress file for an evaluator.""" safe_name = "".join(c for c in evaluator_name if c.isalnum() or c in (' ', '-', '_')).rstrip() return os.path.join(DATA_DIR, 'sessions', f'{safe_name}_progress.txt') def save_current_index(evaluator_name, index): """Save current document index to file.""" try: os.makedirs(os.path.join(DATA_DIR, 'sessions'), exist_ok=True) with open(get_progress_file(evaluator_name), 'w') as f: f.write(str(index)) return True except Exception as e: log_error(f"Error saving progress: {str(e)}") return False def load_current_index(evaluator_name): """Load current document index from file.""" try: progress_file = get_progress_file(evaluator_name) if os.path.exists(progress_file): with open(progress_file, 'r') as f: return int(f.read().strip()) return 1 except Exception as e: log_error(f"Error loading progress: {str(e)}") return 1 def store_evaluator_name(name): """Store evaluator name in a file for persistence.""" try: ensure_data_directory() with open(os.path.join(DATA_DIR, 'current_evaluator.txt'), 'w') as f: f.write(name) log_error(f"Stored evaluator name: {name}") return True except Exception as e: log_error(f"Error storing evaluator name: {str(e)}") return False def get_stored_evaluator_name(): """Get stored evaluator name from file.""" try: file_path = os.path.join(DATA_DIR, 'current_evaluator.txt') if os.path.exists(file_path): with open(file_path, 'r') as f: return f.read().strip() return None except Exception as e: log_error(f"Error retrieving evaluator name: {str(e)}") return None @app.route('/', methods=['GET', 'POST']) def index(): """Home page with file upload and evaluator name.""" if request.method == 'POST': ensure_data_directory() # Get evaluator name evaluator_name = request.form.get('evaluator_name', '').strip() if not evaluator_name: flash("Please enter your name as the evaluator.") return render_template('index.html') # Process file upload if 'file' in request.files: file = request.files['file'] if file.filename == '': flash("No file selected.") return render_template('index.html') if file and file.filename.endswith('.csv'): try: # Read file content file_content = file.read() # Detect encoding and parse CSV encoding = detect_encoding(file_content) csv_text = file_content.decode(encoding) df = pd.read_csv(io.StringIO(csv_text)) # Validate columns required_columns = ['filename', 'description', 'mrn', 'note'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: flash(f"Missing required columns: {', '.join(missing_columns)}") return render_template('index.html') # Save documents documents_path = os.path.join(DATA_DIR, 'documents.csv') df.to_csv(documents_path, index=False) # Set session session['evaluator_name'] = evaluator_name session['session_id'] = f"{evaluator_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" # Store evaluator name store_evaluator_name(evaluator_name) flash(f"File uploaded successfully! Found {len(df)} documents.") return redirect(url_for('evaluate')) except Exception as e: flash(f"Error processing file: {str(e)}") log_error(f"File upload error: {str(e)}") else: flash("Please upload a CSV file.") return render_template('index.html') # GET request evaluator_name = session.get('evaluator_name', '') or get_stored_evaluator_name() or '' return render_template('index.html', evaluator_name=evaluator_name) @app.route('/evaluate', methods=['GET', 'POST']) def evaluate(): """Document evaluation page.""" # Get evaluator name from multiple sources evaluator_name = ( session.get('evaluator_name') or request.args.get('evaluator') or get_stored_evaluator_name() ) if not evaluator_name: flash("Please enter your name before evaluating documents.") return redirect(url_for('index')) # Update session session['evaluator_name'] = evaluator_name if 'session_id' not in session: session['session_id'] = f"{evaluator_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" # Ensure directories exist ensure_data_directory() # Load documents documents = load_documents() if not documents: return render_template('no_documents.html') # Get current index current_index = load_current_index(evaluator_name) # Handle jump requests jump_to = request.args.get('jump_to', type=int) if jump_to and 1 <= jump_to <= len(documents): current_index = jump_to save_current_index(evaluator_name, current_index) # Handle POST requests if request.method == 'POST': action = request.form.get('action', 'submit') # Handle skip and stop_save BEFORE any validation if action == 'skip': current_index = min(current_index + 1, len(documents) + 1) save_current_index(evaluator_name, current_index) flash("Document skipped.") return redirect(url_for('evaluate')) elif action == 'stop_save': flash("Progress saved. You can resume later.") return redirect(url_for('results', session_saved=True)) # Only validate fields for submit action elif action == 'submit': if current_index <= len(documents): current_doc = documents[current_index - 1] # Prepare evaluation data eval_data = { 'document_title': current_doc.get('filename', ''), 'description': current_doc.get('description', ''), 'mrn': current_doc.get('mrn', ''), 'investigator_name': evaluator_name, 'session_id': session.get('session_id', ''), 'note_origin': request.form.get('note_origin', '') } # Add criteria scores all_scores_present = True for i, criterion in enumerate(CRITERIA): score = request.form.get(f'criteria_{i}') if score: eval_data[criterion] = score else: all_scores_present = False flash(f"Please rate: {criterion}") # Check note origin if not eval_data['note_origin']: all_scores_present = False flash("Please select a note origin assessment.") # Save if all data present if all_scores_present: if save_evaluation(eval_data): current_index = min(current_index + 1, len(documents) + 1) save_current_index(evaluator_name, current_index) flash("Evaluation saved successfully!") else: flash("Error saving evaluation. Please try again.") return redirect(url_for('evaluate')) # Check if all documents evaluated if current_index > len(documents): flash("All documents have been evaluated. Thank you!") return redirect(url_for('results')) # Get current document document = documents[current_index - 1] # Calculate progress evaluated_docs = current_index - 1 progress = int((evaluated_docs / len(documents)) * 100) if documents else 0 return render_template('evaluate.html', current_note_number=current_index, evaluator_name=evaluator_name, note=document.get('note', ''), # Removed description from here mrn=document.get('mrn', ''), criteria=CRITERIA, descriptions=CRITERIA_DESCRIPTIONS, score_range=range(1, 6), note_origins=NOTE_ORIGINS, total_docs=len(documents), evaluated_docs=evaluated_docs, progress=progress ) @app.route('/jump', methods=['POST']) def jump_to_document(): """Jump to a specific document number.""" try: document_number = int(request.form.get('document_number', 1)) documents = load_documents() if document_number < 1: flash("Document number must be 1 or greater.") elif document_number > len(documents): flash(f"Document number cannot be greater than {len(documents)}.") else: return redirect(url_for('evaluate', jump_to=document_number)) except ValueError: flash("Please enter a valid document number.") return redirect(url_for('evaluate')) @app.route('/results') def results(): """Results page showing all evaluations.""" try: eval_df, filename_to_desc, filename_to_mrn = get_results() # Convert to list of dicts and enhance with descriptions/MRNs evaluations = [] if not eval_df.empty: for _, row in eval_df.iterrows(): eval_dict = row.to_dict() doc_title = eval_dict.get('document_title', '') # Add description and MRN if not already present if 'description' not in eval_dict or pd.isna(eval_dict['description']): eval_dict['description'] = filename_to_desc.get(doc_title, '') if 'mrn' not in eval_dict or pd.isna(eval_dict['mrn']): eval_dict['mrn'] = filename_to_mrn.get(doc_title, '') evaluations.append(eval_dict) session_saved = request.args.get('session_saved', False) return render_template('results.html', evaluations=evaluations, criteria=CRITERIA, descriptions=CRITERIA_DESCRIPTIONS, session_saved=session_saved ) except Exception as e: log_error(f"Error in results route: {str(e)}") flash(f"Error loading results: {str(e)}") return redirect(url_for('index')) @app.route('/export-csv') def export_csv(): """Export evaluations to CSV.""" try: eval_df, _, _ = get_results() if eval_df.empty: flash('No evaluations available to export.') return redirect(url_for('results')) # Create CSV in memory output = io.StringIO() eval_df.to_csv(output, index=False, quoting=csv.QUOTE_ALL) output.seek(0) # Convert to bytes mem = io.BytesIO() mem.write(output.getvalue().encode('utf-8')) mem.seek(0) return send_file( mem, mimetype='text/csv', as_attachment=True, download_name=f'evaluations_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv' ) except Exception as e: flash(f'Error exporting CSV: {str(e)}') log_error(f"Export error: {str(e)}") return redirect(url_for('results')) @app.route('/upload-documents', methods=['GET', 'POST']) def upload_documents(): """Alternative document upload page.""" if request.method == 'POST': try: if 'file' not in request.files: flash('No file selected') return redirect(request.url) file = request.files['file'] if file.filename == '': flash('No file selected') return redirect(request.url) if file and file.filename.endswith('.csv'): ensure_data_directory() # Save file file_path = os.path.join(DATA_DIR, 'documents.csv') file.save(file_path) # Verify file try: df = pd.read_csv(file_path) flash(f'Documents uploaded successfully! Found {len(df)} documents.') return redirect(url_for('index')) except Exception as e: flash(f'File uploaded but could not be parsed: {str(e)}') else: flash('Please upload a CSV file') except Exception as e: flash(f'Error uploading file: {str(e)}') log_error(f"Upload error: {str(e)}") return render_template('upload_documents.html') @app.route('/debug') def debug(): """Debug page showing application state.""" ensure_data_directory() documents = load_documents() eval_df, _, _ = get_results() evaluations = [] if eval_df.empty else eval_df.to_dict('records') debug_info = { 'data_dir': DATA_DIR, 'data_dir_exists': os.path.exists(DATA_DIR), 'data_dir_writable': os.access(DATA_DIR, os.W_OK) if os.path.exists(DATA_DIR) else False, 'current_working_dir': os.getcwd(), 'session_id': session.get('session_id', 'None'), 'evaluator_name': session.get('evaluator_name', 'None'), 'documents_count': len(documents), 'evaluations_count': len(evaluations), 'environment': 'HF Spaces' if 'SPACE_ID' in os.environ else 'Local' } return render_template('debug.html', documents=documents, evaluations=evaluations, documents_exists=os.path.exists(os.path.join(DATA_DIR, 'documents.csv')), evaluations_exists=os.path.exists(os.path.join(DATA_DIR, 'evaluations.csv')), errors=ERROR_LOG, debug_info=debug_info ) @app.route('/instructions') def view_instructions(): """Display instructions page.""" return render_template('instructions.html', criteria=CRITERIA, descriptions=CRITERIA_DESCRIPTIONS ) @app.route('/download/instructions') def download_instructions(): """Download instructions as markdown.""" try: instructions_path = os.path.join(DATA_DIR, 'instructions.md') return send_file(instructions_path, mimetype='text/markdown', download_name='instructions.md', as_attachment=True ) except FileNotFoundError: flash('Instructions file not found.') return redirect(url_for('index')) @app.route('/download/template') def download_template(): """Download sample template CSV.""" try: template_path = os.path.join(DATA_DIR, 'sample_documents_template.csv') return send_file(template_path, mimetype='text/csv', download_name='sample_documents_template.csv', as_attachment=True ) except FileNotFoundError: flash('Template file not found.') return redirect(url_for('index')) @app.route('/reset', methods=['POST']) def reset(): """Reset session and clear evaluations.""" session.clear() # Backup and remove evaluations evaluations_path = os.path.join(DATA_DIR, 'evaluations.csv') if os.path.exists(evaluations_path): backup_path = f"{evaluations_path}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}" try: shutil.copy(evaluations_path, backup_path) log_error(f"Created backup at {backup_path}") except Exception as e: log_error(f"Could not create backup: {str(e)}") os.remove(evaluations_path) log_error("Removed evaluations.csv") flash('Session reset. All evaluation data cleared.') return redirect(url_for('index')) @app.route('/clear-corrupted-data', methods=['POST']) def clear_corrupted_data(): """Clear corrupted evaluations file.""" evaluations_path = os.path.join(DATA_DIR, 'evaluations.csv') if os.path.exists(evaluations_path): backup_path = f"{evaluations_path}.corrupted.{datetime.now().strftime('%Y%m%d_%H%M%S')}" try: shutil.copy(evaluations_path, backup_path) log_error(f"Backed up corrupted file to {backup_path}") except Exception as e: log_error(f"Could not backup: {str(e)}") os.remove(evaluations_path) flash('Corrupted evaluation data cleared.') else: flash('No evaluation data file found.') return redirect(url_for('results')) @app.route('/error') def error_page(): """Display error information.""" error_message = request.args.get('message', 'An unknown error occurred') error_details = request.args.get('details', '') return render_template('error.html', error_message=error_message, error_details=error_details ) if __name__ == '__main__': print("\n" + "="*60) print(f"Application Starting at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("="*60 + "\n") # Initialize ensure_data_directory() # Log startup info print(f"Data directory: {DATA_DIR}") print(f"Data directory exists: {os.path.exists(DATA_DIR)}") print(f"Environment: {'HF Spaces' if 'SPACE_ID' in os.environ else 'Local'}") if 'SPACE_ID' in os.environ: print(f"Space ID: {os.environ.get('SPACE_ID')}") print(f"Space Author: {os.environ.get('SPACE_AUTHOR_NAME')}") print(f"Data directory contents: {os.listdir(DATA_DIR) if os.path.exists(DATA_DIR) else 'N/A'}") print("\n" + "="*60 + "\n") # Run the app app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 7860)), debug=True)