dvd_evaluator / combined.py
iyadsultan's picture
Enhance project configuration: Updated .gitignore to include more file types, modified Dockerfile for improved structure and environment variable handling, and revised README.md to provide comprehensive project details and usage instructions. The application now consistently listens on port 7860 across all components.
d7e8f11
# Combined Python files
# Generated from directory: C:\Users\USER\Documents\DVD_hf
# Total files found: 7
# Contents from: .\templates\index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Document vs. Document Evaluator</title>
<link href="https://cdn.jsdelivr.net/npm/tailwindcss@2.2.19/dist/tailwind.min.css" rel="stylesheet">
</head>
<body class="bg-gray-100 min-h-screen">
<div class="container mx-auto px-4 py-8 max-w-7xl">
<!-- Header -->
<header class="mb-8">
<h1 class="text-4xl font-bold text-gray-800">Document vs. Document Evaluator</h1>
<p class="mt-2 text-gray-600">Compare and analyze two documents for content similarity</p>
</header>
<!-- Main Content -->
<main>
<!-- Upload Form -->
<section class="bg-white rounded-lg shadow-md p-6 mb-8">
<!-- API Key Input -->
<div class="mb-6">
<label class="block text-sm font-medium text-gray-700 mb-2">
OpenAI API Key <span class="text-red-500">*</span>
</label>
<input type="password"
id="apiKey"
class="w-full border rounded-md px-3 py-2"
placeholder="Enter your OpenAI API key"
required>
</div>
<form id="uploadForm" class="space-y-6">
<div class="grid md:grid-cols-2 gap-6">
<!-- Document 1 Upload -->
<div>
<label class="block text-sm font-medium text-gray-700 mb-2">
Document 1 <span class="text-red-500">*</span>
</label>
<input type="file"
name="doc1"
id="doc1"
accept=".txt"
required
class="w-full border rounded-md px-3 py-2">
<p id="doc1Name" class="mt-2 text-sm text-gray-500"></p>
</div>
<!-- Document 2 Upload -->
<div>
<label class="block text-sm font-medium text-gray-700 mb-2">
Document 2 <span class="text-red-500">*</span>
</label>
<input type="file"
name="doc2"
id="doc2"
accept=".txt"
required
class="w-full border rounded-md px-3 py-2">
<p id="doc2Name" class="mt-2 text-sm text-gray-500"></p>
</div>
</div>
<!-- Model Selection -->
<div>
<label class="block text-sm font-medium text-gray-700 mb-2">Model</label>
<select name="model" id="model" class="w-full border rounded-md px-3 py-2">
<option value="gpt-4o-mini">GPT-4o-mini</option>
<option value="gpt-4o">GPT-4o</option>
</select>
</div>
<!-- Document Type Selection -->
<div>
<label class="block text-sm font-medium text-gray-700 mb-2">Document Type</label>
<select name="document_type" id="documentType" class="w-full border rounded-md px-3 py-2">
<option value="discharge_note">Discharge Note</option>
<option value="admission_note">Admission Note</option>
</select>
</div>
<!-- Submit Button -->
<button type="submit"
class="w-full bg-blue-600 text-white px-4 py-2 rounded-md hover:bg-blue-700 transition-colors">
Compare Documents
</button>
</form>
</section>
<!-- Loading Overlay -->
<div id="loading" class="hidden fixed inset-0 bg-black bg-opacity-50 flex items-center justify-center z-50">
<div class="bg-white p-8 rounded-lg shadow-xl text-center max-w-md mx-4">
<div class="animate-spin rounded-full h-16 w-16 border-b-4 border-blue-600 mx-auto"></div>
<p class="mt-4 text-lg">Processing documents...<br>This may take a few minutes</p>
</div>
</div>
<!-- Results Section -->
<div id="results" class="hidden space-y-8">
<!-- Summary Stats -->
<section class="bg-white rounded-lg shadow-md p-6">
<h2 class="text-2xl font-bold mb-4">Summary Statistics</h2>
<div class="grid grid-cols-2 md:grid-cols-3 gap-4">
<div class="p-4 bg-gray-50 rounded-md">
<div class="text-sm text-gray-500">Total Tokens Used</div>
<div id="totalTokens" class="text-xl font-semibold">-</div>
</div>
<div class="p-4 bg-gray-50 rounded-md">
<div class="text-sm text-gray-500">DVD Ratio</div>
<div id="dvdRatio" class="text-xl font-semibold">-</div>
</div>
</div>
</section>
<!-- Document Results -->
<div class="grid md:grid-cols-2 gap-8">
<!-- Document 1 Results -->
<section class="bg-white rounded-lg shadow-md p-6">
<h2 class="text-2xl font-bold mb-4">Document 1 Analysis</h2>
<div id="doc1Results">
<div class="mb-4">
<h3 class="font-semibold">Score:</h3>
<p id="doc1Score" class="text-lg">-</p>
</div>
<div id="doc1Questions" class="space-y-4"></div>
</div>
</section>
<!-- Document 2 Results -->
<section class="bg-white rounded-lg shadow-md p-6">
<h2 class="text-2xl font-bold mb-4">Document 2 Analysis</h2>
<div id="doc2Results">
<div class="mb-4">
<h3 class="font-semibold">Score:</h3>
<p id="doc2Score" class="text-lg">-</p>
</div>
<div id="doc2Questions" class="space-y-4"></div>
</div>
</section>
</div>
<!-- Original Documents -->
<section class="grid md:grid-cols-2 gap-8">
<div class="bg-white rounded-lg shadow-md p-6">
<h2 class="text-2xl font-bold mb-4">Document 1 Text</h2>
<pre id="doc1Text" class="whitespace-pre-wrap text-sm bg-gray-50 p-4 rounded-md overflow-auto max-h-96"></pre>
</div>
<div class="bg-white rounded-lg shadow-md p-6">
<h2 class="text-2xl font-bold mb-4">Document 2 Text</h2>
<pre id="doc2Text" class="whitespace-pre-wrap text-sm bg-gray-50 p-4 rounded-md overflow-auto max-h-96"></pre>
</div>
</section>
</div>
</main>
</div>
<script>
// Utility functions
const utils = {
safeGetElement: (id) => document.getElementById(id),
safeUpdateElement: (id, value) => {
const element = document.getElementById(id);
if (element) element.textContent = value;
},
calculateScore: (analysis) => {
if (!analysis?.score) return { score: 0, percentage: 0 };
const [correct, total] = analysis.score.split('/').map(Number);
return {
score: analysis.score,
percentage: total > 0 ? (correct / total) * 100 : 0
};
},
renderQuestion: (question, container) => {
const questionDiv = document.createElement('div');
questionDiv.className = 'p-4 bg-gray-50 rounded-lg';
// Question text and status
const questionText = document.createElement('div');
questionText.className = 'mb-3';
const isCorrect = question.model_answer === question.ideal_answer;
questionText.innerHTML = `
<span class="font-medium">${question.question}</span>
<span class="ml-2 ${isCorrect ? 'text-green-600' : 'text-red-600'}">
${isCorrect ? '✅' : '❌'}
</span>
`;
questionDiv.appendChild(questionText);
// Options
const optionsDiv = document.createElement('div');
optionsDiv.className = 'space-y-2 ml-4';
question.options.forEach((option, idx) => {
const isCorrectAnswer = option === question.ideal_answer;
const isSelectedAnswer = option === question.model_answer;
const optionElement = document.createElement('div');
optionElement.className = [
isCorrectAnswer ? 'font-bold text-green-700' : '',
isSelectedAnswer && !isCorrectAnswer ? 'text-red-600' : ''
].join(' ').trim();
const letter = String.fromCharCode(65 + idx); // A, B, C, D, E
optionElement.textContent = `${letter}. ${option}`;
if (isCorrectAnswer) {
const correctLabel = document.createElement('span');
correctLabel.className = 'ml-2 text-sm';
correctLabel.textContent = '(Correct Answer)';
optionElement.appendChild(correctLabel);
}
if (isSelectedAnswer && !isCorrectAnswer) {
const selectedLabel = document.createElement('span');
selectedLabel.className = 'ml-2 text-sm';
selectedLabel.textContent = '(Selected Answer)';
optionElement.appendChild(selectedLabel);
}
optionsDiv.appendChild(optionElement);
});
questionDiv.appendChild(optionsDiv);
container.appendChild(questionDiv);
},
displayResults: (docId, analysis) => {
// Update score
const score = utils.calculateScore(analysis);
utils.safeUpdateElement(`${docId}Score`,
`${score.score} (${score.percentage.toFixed(1)}%)`);
// Clear and update questions
const questionsContainer = utils.safeGetElement(`${docId}Questions`);
if (questionsContainer) {
questionsContainer.innerHTML = '';
// Combine all questions
const allQuestions = [
...(analysis.attempted_answers || []),
...(analysis.unknown_answers || []).map(q => ({
...q,
model_answer: "I don't know"
}))
];
// Render all questions
allQuestions.forEach(question => {
utils.renderQuestion(question, questionsContainer);
});
}
}
};
// Form manager
const formManager = {
initializeFileInputs: () => {
['doc1', 'doc2'].forEach(id => {
const input = utils.safeGetElement(id);
const nameDisplay = utils.safeGetElement(`${id}Name`);
if (input && nameDisplay) {
input.addEventListener('change', (e) => {
const fileName = e.target.files[0]?.name || 'No file selected';
nameDisplay.textContent = `Selected: ${fileName}`;
});
}
});
},
validateForm: () => {
const apiKey = utils.safeGetElement('apiKey')?.value;
if (!apiKey) {
throw new Error('Please enter your OpenAI API key');
}
const doc1 = utils.safeGetElement('doc1')?.files[0];
const doc2 = utils.safeGetElement('doc2')?.files[0];
if (!doc1 || !doc2) {
throw new Error('Please select both documents');
}
if (!doc1.name.toLowerCase().endsWith('.txt') || !doc2.name.toLowerCase().endsWith('.txt')) {
throw new Error('Only .txt files are allowed');
}
},
handleSubmit: async (e) => {
e.preventDefault();
try {
formManager.validateForm();
const loading = utils.safeGetElement('loading');
const results = utils.safeGetElement('results');
loading.classList.remove('hidden');
results.classList.add('hidden');
const formData = new FormData();
formData.append('api_key', utils.safeGetElement('apiKey').value);
formData.append('doc1', utils.safeGetElement('doc1').files[0]);
formData.append('doc2', utils.safeGetElement('doc2').files[0]);
formData.append('model', utils.safeGetElement('model').value);
formData.append('document_type', utils.safeGetElement('documentType').value);
const response = await fetch('/compare', {
method: 'POST',
body: formData
});
const data = await response.json();
if (!response.ok) {
throw new Error(data.error || 'An error occurred');
}
// Update summary statistics
utils.safeUpdateElement('totalTokens', data.total_tokens);
const doc1Score = utils.calculateScore(data.doc1_analysis);
const doc2Score = utils.calculateScore(data.doc2_analysis);
const dvdRatio = doc1Score.percentage > 0 ?
(doc2Score.percentage / doc1Score.percentage).toFixed(2) : 'N/A';
utils.safeUpdateElement('dvdRatio', dvdRatio);
// Update document texts
utils.safeUpdateElement('doc1Text', data.doc1_content || '');
utils.safeUpdateElement('doc2Text', data.doc2_content || '');
// Display results for both documents
utils.displayResults('doc1', data.doc1_analysis);
utils.displayResults('doc2', data.doc2_analysis);
results.classList.remove('hidden');
} catch (error) {
console.error('Error:', error);
alert(error.message || 'An error occurred while processing the documents');
} finally {
loading.classList.add('hidden');
}
}
};
// Initialize application
document.addEventListener('DOMContentLoaded', () => {
formManager.initializeFileInputs();
const form = utils.safeGetElement('uploadForm');
if (form) {
form.addEventListener('submit', formManager.handleSubmit);
}
});
</script>
</body>
</html>
# Contents from: .\note_criteria.json
{
"note_types": {
"discharge_note": {
"name": "Discharge Note",
"relevancy_criteria": [
"Hospital Admission and Discharge Details",
"Reason for Hospitalization",
"Hospital Course Summary",
"Discharge Diagnosis",
"Procedures Performed",
"Imaging studies",
"Medications at Discharge",
"Discharge Instructions",
"Follow-Up Care",
"Patient's Condition at Discharge",
"Patient Education and Counseling",
"Pending Results",
"Advance Directives and Legal Considerations",
"Important Abnormal (not normal)lab results, e.g. bacterial cultures, urine cultures, electrolyte disturbances, etc.",
"Important abnormal vital signs, e.g. fever, tachycardia, hypotension, etc.",
"Admission to ICU",
"comorbidities, e.g. diabetes, hypertension, etc.",
"Equipment needed at discharge, e.g. wheelchair, crutches, etc.",
"Prosthetics and tubes, e.g. Foley catheter, etc.",
"Allergies",
"Consultations (e.g., specialty or ancillary services)",
"Functional Capacity (ADLs and mobility status)",
"Lifestyle Modifications (diet, exercise, smoking cessation, etc.)",
"Wound Care or Other Specific Care Instructions"
]
},
"admission_note": {
"name": "Admission Note",
"relevancy_criteria": [
"Patient Demographics and Identification",
"Chief Complaint",
"History of Present Illness",
"Past Medical History",
"Past Surgical History",
"Current Medications",
"Allergies",
"Social History (including smoking, alcohol, drugs)",
"Family History",
"Review of Systems",
"Physical Examination Findings",
"Vital Signs on Admission",
"Initial Laboratory Results",
"Initial Imaging Results",
"Initial Assessment/Impression",
"Differential Diagnosis",
"Initial Treatment Plan",
"Admission Orders",
"Code Status and Advance Directives",
"Consultations Requested",
"Anticipated Course of Stay",
"Functional Status on Admission",
"Mental Status Assessment",
"Pain Assessment",
"Admission Precautions (isolation, fall risk, etc.)"
]
}
}
}
# Contents from: .\DOCKER_README.md
# Document vs Document Evaluator
## Deployment Instructions
1. The application requires the following environment variables:
- PORT: Set by Hugging Face Spaces automatically
- OPENAI_API_KEY: Provided by users through the interface
2. Required files:
- app.py: Main application file
- dvd_evaluator.py: Core evaluation logic
- note_criteria.json: Evaluation criteria
- requirements.txt: Dependencies
- templates/index.html: Web interface
3. The application uses Flask for the web interface and requires the following ports:
- Default port: 7860 (Hugging Face Spaces default)
# Contents from: .\README.md
---
title: Document vs Document Evaluator
emoji: 📄
colorFrom: blue
colorTo: indigo
sdk: static
sdk_version: "3.1.0"
app_file: app.py
pinned: false
---
Check out the configuration reference at https://huggingface.co/docs/hub/spaces-config-reference
# Contents from: .\app.py
from flask import Flask, render_template, request, jsonify
import os
import tempfile
import pandas as pd
from werkzeug.utils import secure_filename
import csv
from datetime import datetime
from typing import List, Dict, Any, Optional, Union
from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import tiktoken
import json
from dotenv import load_dotenv
from dvd_evaluator import (
generate_mcqs_for_note,
present_mcqs_to_content,
MCQ,
Document
)
# Load environment variables
load_dotenv()
# Define data models
class MCQ(BaseModel):
question: str
options: List[str]
correct_answer: str
source_name: str = Field(default="Unknown")
class Document(BaseModel):
name: str = ''
content: str
mcqs: List[MCQ] = Field(default_factory=list)
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = tempfile.mkdtemp()
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size
ALLOWED_EXTENSIONS = {'txt'}
MODELS = ['gpt-4o', 'gpt-4o-mini', 'gpt-3.5-turbo'] # Update with supported models
with open('note_criteria.json', 'r') as f:
NOTE_CRITERIA = json.load(f)['note_types'] # Note the ['note_types'] key
def allowed_file(filename):
"""Check if the uploaded file has an allowed extension."""
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def num_tokens_from_messages(messages, model="gpt-4o"):
"""
Estimate token usage for messages using tiktoken.
"""
encoding = tiktoken.encoding_for_model(model)
num_tokens = 0
for message in messages:
num_tokens += 4 # every message follows <im_start>{role/name}\n{content}<im_end>\n
for key, value in message.items():
num_tokens += len(encoding.encode(value))
num_tokens += 2 # every reply is primed with <im_start>assistant
return num_tokens
def generate_mcqs_for_note(note_content: str, total_tokens: List[int], source_name: str = '', document_type: str = 'discharge_note') -> List[MCQ]:
"""
Generate Multiple Choice Questions (MCQs) from medical notes.
"""
# Get relevancy criteria for selected document type
criteria = NOTE_CRITERIA[document_type]['relevancy_criteria']
criteria_list = "\n".join(f"{i+1}. {criterion}" for i, criterion in enumerate(criteria))
system_prompt = f"""
You are an expert in creating MCQs based on medical notes. Generate 20 MCQs that ONLY focus on these key areas:
{criteria_list}
Rules and Format:
1. Each question must relate to specific content from these areas
2. Skip areas not mentioned in the note
3. Each question must have exactly 5 options (A-D plus E="I don't know")
4. Provide only questions and answers, no explanations
5. Use this exact format:
Question: [text]
A. [option]
B. [option]
C. [option]
D. [option]
E. I don't know
Correct Answer: [letter]
"""
def parse_mcq(mcq_text: str) -> Optional[MCQ]:
"""Parse a single MCQ from text format into an MCQ object."""
try:
lines = [line.strip() for line in mcq_text.split('\n') if line.strip()]
if len(lines) < 7: # Question + 5 options + correct answer
return None
# Extract question
if not lines[0].startswith('Question:'):
return None
question = lines[0].replace('Question:', '', 1).strip()
# Extract options
options = []
for i, line in enumerate(lines[1:6], 1):
if not line.startswith(chr(ord('A') + i - 1) + '.'):
return None
option = line.split('.', 1)[1].strip()
options.append(option)
# Extract correct answer
correct_line = lines[6]
if not correct_line.lower().startswith('correct answer:'):
return None
correct_letter = correct_line.split(':', 1)[1].strip().upper()
if correct_letter not in 'ABCDE':
return None
correct_index = ord(correct_letter) - ord('A')
correct_answer = options[correct_index] if correct_index < len(options) else options[-1]
return MCQ(
question=question,
options=options,
correct_answer=correct_answer,
source_name=source_name
)
except Exception as e:
print(f"Error parsing MCQ: {str(e)}")
return None
# Generate MCQs using LLM
try:
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=f"Create MCQs from this note:\n\n{note_content}")
]
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
response = llm(messages)
# Update token count
tokens_used = num_tokens_from_messages([
{"role": "system", "content": system_prompt},
{"role": "user", "content": note_content},
{"role": "assistant", "content": response.content}
], model="gpt-4")
total_tokens[0] += tokens_used
# Parse MCQs from response
mcqs = []
for mcq_text in response.content.strip().split('\n\n'):
if mcq := parse_mcq(mcq_text):
mcqs.append(mcq)
return mcqs
except Exception as e:
print(f"Error in MCQ generation: {str(e)}")
return []
def present_mcqs_to_content(mcqs: List[MCQ], content: str, total_tokens: List[int]) -> List[Dict]:
"""
Present MCQs to content and collect responses.
"""
user_responses = []
batch_size = 20
llm = ChatOpenAI(model="gpt-4", temperature=0)
for i in range(0, len(mcqs), batch_size):
batch_mcqs = mcqs[i:i + batch_size]
questions_text = "\n\n".join([
f"Question {j+1}: {mcq.question}\n"
f"A. {mcq.options[0]}\n"
f"B. {mcq.options[1]}\n"
f"C. {mcq.options[2]}\n"
f"D. {mcq.options[3]}\n"
f"E. I don't know"
for j, mcq in enumerate(batch_mcqs)
])
batch_prompt = f"""
You are an expert medical knowledge evaluator. Given a medical note and multiple questions:
1. For each question, verify if it can be answered from the given content
2. If a question cannot be answered from the content, choose 'E' (I don't know)
3. If a question can be answered, choose the most accurate option based ONLY on the given content
Document Content: {content}
{questions_text}
Respond with ONLY the question numbers and corresponding letters, one per line, like this:
1: A
2: B
etc.
"""
messages = [HumanMessage(content=batch_prompt)]
response = llm(messages)
tokens_used = num_tokens_from_messages([
{"role": "user", "content": batch_prompt},
{"role": "assistant", "content": response.content}
], model="gpt-4o-mini")
total_tokens[0] += tokens_used
try:
response_lines = response.content.strip().split('\n')
for j, line in enumerate(response_lines):
if j >= len(batch_mcqs):
break
mcq = batch_mcqs[j]
try:
# Get the letter answer (A, B, C, D, or E)
answer_letter = line.split(':')[1].strip().upper()
if answer_letter not in ['A', 'B', 'C', 'D', 'E']:
answer_letter = 'E'
# Convert letter to corresponding option text
if answer_letter == 'E':
user_answer_text = "I don't know"
else:
# Get the index (0-3) from the letter (A-D)
option_index = ord(answer_letter) - ord('A')
user_answer_text = mcq.options[option_index]
except (IndexError, ValueError):
user_answer_text = "I don't know"
user_responses.append({
"question": mcq.question,
"user_answer": user_answer_text,
"correct_answer": mcq.correct_answer
})
except Exception as e:
print(f"Error processing batch responses: {str(e)}")
# If something fails, default the remainder to "I don't know"
for mcq in batch_mcqs[len(user_responses):]:
user_responses.append({
"question": mcq.question,
"user_answer": "I don't know",
"correct_answer": mcq.correct_answer
})
return user_responses
def run_evaluation(ai_content: str, ai_mcqs: List[MCQ], note_content: str, note_mcqs: List[MCQ],
note_name: str, original_note_number: int, total_tokens: List[int]) -> List[Dict]:
# For Doc1: use questions from Doc2 (note_mcqs)
# For Doc2: use questions from Doc1 (ai_mcqs)
mcqs_to_use = ai_mcqs if note_name == 'Doc2' else note_mcqs
content_to_evaluate = note_content
responses = present_mcqs_to_content(mcqs_to_use, content_to_evaluate, total_tokens)
results = []
for i, mcq in enumerate(mcqs_to_use):
results.append({
"original_note_number": original_note_number,
"new_note_name": note_name,
"question": mcq.question,
"options": mcq.options,
"source_document": 'Doc2' if note_name == 'Doc1' else 'Doc1',
"ideal_answer": mcq.correct_answer,
"model_answer": responses[i]["user_answer"],
"is_correct": responses[i]["user_answer"] == mcq.correct_answer
})
return results
import concurrent.futures
import concurrent.futures
import csv
import os
from flask import jsonify, request
@app.route('/compare', methods=['POST'])
def compare_documents():
"""
Compare two documents by generating and answering MCQs for each document.
Returns analysis of how well each document contains information from the other.
"""
print("\n=== Starting document comparison ===")
try:
# Validate API key
api_key = request.form.get('api_key')
if not api_key:
return jsonify({"error": "OpenAI API key is required"}), 400
os.environ['OPENAI_API_KEY'] = api_key
# Get model and document type selection
model = request.form.get('model', 'gpt-4o-mini')
document_type = request.form.get('document_type', 'discharge_note')
# Initialize OpenAI client with selected model
llm = ChatOpenAI(model=model, temperature=0)
# Validate file uploads
if 'doc1' not in request.files or 'doc2' not in request.files:
print("Error: Missing files in request")
return jsonify({"error": "Both doc1 and doc2 are required"}), 400
doc1_file = request.files['doc1']
doc2_file = request.files['doc2']
print(f"Received files: {doc1_file.filename} and {doc2_file.filename}")
# Validate filenames
if not all([doc1_file.filename, doc2_file.filename]):
print("Error: Empty filename(s)")
return jsonify({"error": "Both documents need valid filenames"}), 400
# Validate file types
if not all(allowed_file(f.filename) for f in [doc1_file, doc2_file]):
print("Error: Invalid file type(s)")
return jsonify({"error": "Only .txt files are allowed"}), 400
# Read document contents
try:
doc1_text = doc1_file.read().decode('utf-8')
doc2_text = doc2_file.read().decode('utf-8')
print(f"Doc1 length: {len(doc1_text)} chars")
print(f"Doc2 length: {len(doc2_text)} chars")
except UnicodeDecodeError as e:
print(f"Decode error: {str(e)}")
return jsonify({"error": "Error decoding one of the documents"}), 400
# Initialize token counter
total_tokens = [0]
# Generate MCQs for both documents
print("\nGenerating MCQs for Doc1...")
doc1_mcqs = generate_mcqs_for_note(
note_content=doc1_text,
total_tokens=total_tokens,
source_name='Doc1',
document_type=document_type
)
print(f"Generated {len(doc1_mcqs)} MCQs for Doc1")
print("\nGenerating MCQs for Doc2...")
doc2_mcqs = generate_mcqs_for_note(
note_content=doc2_text,
total_tokens=total_tokens,
source_name='Doc2',
document_type=document_type
)
print(f"Generated {len(doc2_mcqs)} MCQs for Doc2")
# Present each doc's MCQs to the other doc
print("\nGetting answers for Doc1...")
doc1_responses = present_mcqs_to_content(doc2_mcqs, doc1_text, total_tokens)
print(f"Received {len(doc1_responses)} answers for Doc1")
print("\nGetting answers for Doc2...")
doc2_responses = present_mcqs_to_content(doc1_mcqs, doc2_text, total_tokens)
print(f"Received {len(doc2_responses)} answers for Doc2")
def process_mcq_results(responses, mcqs):
"""Process MCQ responses and organize into categories."""
attempted = []
unknown = []
correct_count = 0
total_count = len(responses)
for i, response in enumerate(responses):
if i >= len(mcqs): # Safety check
continue
mcq = mcqs[i]
answer = response.get("user_answer", "I don't know")
result = {
"question": mcq.question,
"options": mcq.options,
"ideal_answer": mcq.correct_answer,
"model_answer": answer,
}
if answer == "I don't know":
unknown.append(result)
else:
is_correct = answer == mcq.correct_answer
if is_correct:
correct_count += 1
result["is_correct"] = is_correct
attempted.append(result)
return {
"score": f"{correct_count}/{total_count}",
"attempted_answers": attempted,
"unknown_answers": unknown
}
# Process results for both documents
doc1_analysis = process_mcq_results(doc1_responses, doc2_mcqs)
doc2_analysis = process_mcq_results(doc2_responses, doc1_mcqs)
# Prepare response
response = {
"doc1_analysis": doc1_analysis,
"doc2_analysis": doc2_analysis,
"total_tokens": total_tokens[0],
"doc1_content": doc1_text,
"doc2_content": doc2_text
}
print("\nSending response...")
print(f"Total tokens used: {total_tokens[0]}")
return jsonify(response), 200
except Exception as e:
import traceback
print(f"\nERROR in compare_documents:")
print(traceback.format_exc())
return jsonify({"error": str(e)}), 500
finally:
print("=== Comparison complete ===\n")
def process_responses(responses, mcqs, doc_name):
"""Process responses and organize them into categories."""
attempted = []
unknown = []
correct_count = 0
for i, response in enumerate(responses):
mcq = mcqs[i]
answer_text = response['user_answer']
if answer_text == "I don't know": # Changed from 'E' to "I don't know"
unknown.append({
'question': mcq.question,
'options': mcq.options,
'ideal_answer': mcq.correct_answer
})
else:
is_correct = response['user_answer'] == response['correct_answer']
if is_correct:
correct_count += 1
attempted.append({
'question': mcq.question,
'options': mcq.options,
'ideal_answer': mcq.correct_answer,
'model_answer': answer_text, # Use the answer text directly
'is_correct': is_correct
})
return {
'total_score': f"{correct_count}/{len(responses)}",
'attempted_answers': attempted,
'unknown_answers': unknown
}
@app.route('/')
def index():
"""Serve the main page."""
return render_template('index.html', models=MODELS)
if __name__ == "__main__":
# Get port from environment variable for Hugging Face Spaces
port = int(os.environ.get("PORT", 7860))
app.run(host="0.0.0.0", port=port)
# Contents from: .\combine.py
import os
def get_files_recursively(directory, extensions):
"""
Recursively get all files with specified extensions from directory and subdirectories,
excluding combined.py and venv folder.
"""
file_list = []
for root, dirs, files in os.walk(directory):
# Exclude the 'venv' directory from the search
if 'venv' in root:
continue
for file in files:
# Exclude the 'combined.py' file from the results
if file == 'combined.py':
continue
if any(file.endswith(ext) for ext in extensions):
file_list.append(os.path.join(root, file))
return file_list
def combine_files(output_file, file_list):
"""
Combine contents of all files in file_list into output_file
"""
with open(output_file, 'a', encoding='utf-8') as outfile:
for fname in file_list:
# Add a header comment to show which file's contents follow
outfile.write(f"\n\n# Contents from: {fname}\n")
try:
with open(fname, 'r', encoding='utf-8') as infile:
for line in infile:
outfile.write(line)
except Exception as e:
outfile.write(f"# Error reading file {fname}: {str(e)}\n")
def main():
# Define the base directory (current directory in this case)
base_directory = "."
output_file = 'combined.py'
extensions = ('.py','html', 'md', 'json') # Ensure this is a tuple
# Remove output file if it exists
if os.path.exists(output_file):
try:
os.remove(output_file)
except Exception as e:
print(f"Error removing existing {output_file}: {str(e)}")
return
# Get all files recursively
all_files = get_files_recursively(base_directory, extensions)
# Sort files by extension and then by name
all_files.sort(key=lambda x: (os.path.splitext(x)[1], x))
# Add a header to the output file
with open(output_file, 'w', encoding='utf-8') as outfile:
outfile.write("# Combined Python files\n")
outfile.write(f"# Generated from directory: {os.path.abspath(base_directory)}\n")
outfile.write(f"# Total files found: {len(all_files)}\n\n")
# Combine all files
combine_files(output_file, all_files)
print(f"Successfully combined {len(all_files)} files into {output_file}")
print("Files processed:")
for file in all_files:
print(f" - {file}")
if __name__ == "__main__":
main()
# Contents from: .\dvd_evaluator.py
import os
import csv
import argparse
import pandas as pd
from typing import List, Dict, Any
from datetime import datetime
from pydantic import BaseModel, Field
from tqdm import tqdm
import tiktoken
from typing import List, Dict, Any, Optional
import json
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from dotenv import load_dotenv
load_dotenv()
# Define data models
class MCQ(BaseModel):
question: str
options: List[str]
correct_answer: str
source_name: str = Field(default="Unknown") # Add source_name field with default value
class Document(BaseModel):
name: str = ''
content: str
mcqs: List[MCQ] = Field(default_factory=list)
# Load note criteria at module level
with open('note_criteria.json', 'r') as f:
NOTE_CRITERIA = json.load(f)['note_types']
def num_tokens_from_messages(messages, model="gpt-4"):
"""
Estimate token usage for messages using tiktoken.
Args:
messages: List of message dictionaries
model (str): Model name for token counting. Defaults to 'gpt-4'
"""
try:
encoding = tiktoken.encoding_for_model(model)
num_tokens = 0
for message in messages:
num_tokens += 4
for key, value in message.items():
num_tokens += len(encoding.encode(value))
num_tokens += 2
return num_tokens
except Exception as e:
print(f"Warning: Error counting tokens: {str(e)}")
return 0
def generate_mcqs_for_note(note_content: str, total_tokens: List[int], source_name: str = '', document_type: str = 'discharge_note') -> List[MCQ]:
"""
Generate Multiple Choice Questions (MCQs) from medical notes.
"""
# Get criteria based on document type
criteria = NOTE_CRITERIA.get(document_type, NOTE_CRITERIA['discharge_note'])
criteria_points = criteria['relevancy_criteria']
# Create dynamic system prompt based on document type
system_prompt = f"""
You are an expert in creating MCQs based on {criteria['name']}s. Generate 20 MCQs that ONLY focus on these key areas:
{chr(10).join(f"{i+1}. {point}" for i, point in enumerate(criteria_points))}
Rules and Format:
1. Each question must relate to specific content from these areas
2. Skip areas not mentioned in the note
3. Each question must have exactly 5 options (A-D plus E="I don't know")
4. Provide only questions and answers, no explanations
5. Use this exact format:
Question: [text]
A. [option]
B. [option]
C. [option]
D. [option]
E. I don't know
Correct Answer: [letter]
"""
def parse_mcq(mcq_text: str) -> Optional[MCQ]:
"""Parse a single MCQ from text format into an MCQ object."""
try:
lines = [line.strip() for line in mcq_text.split('\n') if line.strip()]
if len(lines) < 7: # Question + 5 options + correct answer
return None
# Extract question
if not lines[0].startswith('Question:'):
return None
question = lines[0].replace('Question:', '', 1).strip()
# Extract options
options = []
for i, line in enumerate(lines[1:6], 1):
if not line.startswith(chr(ord('A') + i - 1) + '.'):
return None
option = line.split('.', 1)[1].strip()
options.append(option)
# Extract correct answer
correct_line = lines[6]
if not correct_line.lower().startswith('correct answer:'):
return None
correct_letter = correct_line.split(':', 1)[1].strip().upper()
if correct_letter not in 'ABCDE':
return None
correct_index = ord(correct_letter) - ord('A')
correct_answer = options[correct_index] if correct_index < len(options) else options[-1]
return MCQ(
question=question,
options=options,
correct_answer=correct_answer,
source_name=source_name
)
except Exception as e:
print(f"Error parsing MCQ: {str(e)}")
return None
try:
messages = [
SystemMessage(content=system_prompt),
HumanMessage(content=f"Create MCQs from this {criteria['name'].lower()}:\n\n{note_content}")
]
llm = ChatOpenAI(temperature=0)
response = llm(messages)
# Update token count with default model
tokens_used = num_tokens_from_messages([
{"role": "system", "content": system_prompt},
{"role": "user", "content": note_content},
{"role": "assistant", "content": response.content}
])
total_tokens[0] += tokens_used
# Parse MCQs from response
mcqs = []
for mcq_text in response.content.strip().split('\n\n'):
if mcq := parse_mcq(mcq_text):
mcq.source_name = source_name
mcqs.append(mcq)
return mcqs
except Exception as e:
print(f"Error in MCQ generation: {str(e)}")
return []
def present_mcqs_to_content(mcqs: List[MCQ], content: str, total_tokens: List[int], document_type: str = 'discharge_note') -> List[Dict]:
"""
Present MCQs to content and collect responses.
"""
# Get criteria based on document type
criteria = NOTE_CRITERIA.get(document_type, NOTE_CRITERIA['discharge_note'])
batch_size = 20
llm = ChatOpenAI(temperature=0) # Remove model parameter
user_responses = []
for i in range(0, len(mcqs), batch_size):
batch_mcqs = mcqs[i:i + batch_size]
questions_text = "\n\n".join([
f"Question {j+1}: {mcq.question}\n"
f"A. {mcq.options[0]}\n"
f"B. {mcq.options[1]}\n"
f"C. {mcq.options[2]}\n"
f"D. {mcq.options[3]}\n"
f"E. I don't know"
for j, mcq in enumerate(batch_mcqs)
])
batch_prompt = f"""
You are an expert {criteria['name'].lower()} evaluator. Given a medical note and multiple questions:
1. For each question, verify if it can be answered from the given content
2. If a question cannot be answered from the content, choose 'E' (I don't know)
3. If a question can be answered, choose the most accurate option based ONLY on the given content
Document Content: {content}
{questions_text}
Respond with ONLY the question numbers and corresponding letters, one per line, like this:
1: A
2: B
etc.
"""
messages = [HumanMessage(content=batch_prompt)]
response = llm(messages)
tokens_used = num_tokens_from_messages([
{"role": "user", "content": batch_prompt},
{"role": "assistant", "content": response.content}
]) # Remove model parameter
total_tokens[0] += tokens_used
try:
response_lines = response.content.strip().split('\n')
for j, line in enumerate(response_lines):
if j >= len(batch_mcqs):
break
try:
answer = line.split(':')[1].strip().upper()
if answer not in ['A', 'B', 'C', 'D', 'E']:
answer = 'E'
mcq = batch_mcqs[j]
user_responses.append({
"question": mcq.question,
"user_answer": answer,
"correct_answer": chr(ord('A') + mcq.options.index(mcq.correct_answer))
})
except (IndexError, ValueError):
mcq = batch_mcqs[j]
user_responses.append({
"question": mcq.question,
"user_answer": "E",
"correct_answer": chr(ord('A') + mcq.options.index(mcq.correct_answer))
})
except Exception as e:
print(f"Error processing batch responses: {str(e)}")
for mcq in batch_mcqs[len(user_responses):]:
user_responses.append({
"question": mcq.question,
"user_answer": "E",
"correct_answer": chr(ord('A') + mcq.options.index(mcq.correct_answer))
})
return user_responses
def evaluate_responses(user_responses) -> int:
"""
Evaluate responses and return score.
"""
correct = 0
for response in user_responses:
if response["user_answer"] == "E": # "I don't know" is now "E"
continue
elif response["user_answer"] == response["correct_answer"]:
correct += 1
return correct
def run_evaluation(ai_content: str, ai_mcqs: List[MCQ], note_content: str, note_mcqs: List[MCQ],
note_name: str, original_note_number: int, total_tokens: List[int],
document_type: str = 'discharge_note') -> List[Dict]:
"""
Run evaluation with specified document type.
"""
# For Doc1: use questions from Doc2 (note_mcqs)
# For Doc2: use questions from Doc1 (ai_mcqs)
mcqs_to_use = ai_mcqs if note_name == 'Doc2' else note_mcqs
content_to_evaluate = note_content
responses = present_mcqs_to_content(mcqs_to_use, content_to_evaluate, total_tokens, document_type=document_type)
results = []
for i, mcq in enumerate(mcqs_to_use):
result = {
"original_note_number": original_note_number,
"new_note_name": note_name,
"question": mcq.question,
"source_document": mcq.source_name,
"options": mcq.options,
"ideal_answer": mcq.options[ord(responses[i]["correct_answer"]) - ord('A')],
"correct_answer": responses[i]["correct_answer"],
"ai_answer": responses[i]["user_answer"],
"note_answer": responses[i]["user_answer"],
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
results.append(result)
return results
def main():
parser = argparse.ArgumentParser(description="Process CSV containing AI and modified notes.")
parser.add_argument("--modified_csv", required=True, help="Path to CSV with AI & modified notes")
parser.add_argument("--result_csv", default="results.csv", help="Output CSV file")
parser.add_argument("--start", type=int, default=0, help="Start original_note_number (inclusive)")
parser.add_argument("--end", type=int, default=10, help="End original_note_number (exclusive)")
parser.add_argument("--model", default="gpt-4o-mini", help="OpenAI model to use")
args = parser.parse_args()
print(f"\n=== MCQ EVALUATOR ===")
print(f"Reading from: {args.modified_csv}")
print(f"Writing results to: {args.result_csv}")
print(f"Processing original_note_number in [{args.start}, {args.end})")
print(f"Using model: {args.model}\n")
global llm
llm = ChatOpenAI(model=args.model, temperature=0)
if not os.path.exists(args.modified_csv):
print(f"ERROR: {args.modified_csv} not found.")
return
try:
print("Loading CSV file...")
df = pd.read_csv(args.modified_csv)
print(f"Loaded {len(df)} rows")
except Exception as e:
print(f"ERROR reading {args.modified_csv}: {e}")
return
needed_cols = {"original_note_number", "new_note_name", "modified_text"}
if not needed_cols.issubset(df.columns):
print(f"ERROR: Missing columns in {args.modified_csv}. We need {needed_cols}.")
return
df_in_range = df[(df["original_note_number"] >= args.start) &
(df["original_note_number"] < args.end)]
if df_in_range.empty:
print("No rows found in the specified range.")
return
print(f"Found {len(df_in_range)} rows in specified range")
results = []
total_tokens = [0]
grouped = df_in_range.groupby("original_note_number")
for onum, group in tqdm(grouped, desc="Processing notes"):
print(f"\n\nProcessing original_note_number {onum}")
# Get AI note and generate MCQs once per group
ai_row = group[group["new_note_name"] == "AI"]
if ai_row.empty:
print(f"Warning: No AI note found for original_note_number={onum}, skipping.")
continue
ai_text = ai_row.iloc[0]["modified_text"]
print("Generating MCQs for AI note...")
mcqs_ai = generate_mcqs_for_note(
note_content=ai_text,
total_tokens=total_tokens,
source_name='AI',
document_type='discharge_note'
)
print(f"Generated {len(mcqs_ai)} MCQs from AI note")
# Process ALL other notes (including original)
print("\nProcessing comparisons...")
other_rows = group[group["new_note_name"] != "AI"]
for idx, row in other_rows.iterrows():
note_name = row["new_note_name"]
print(f"\nProcessing comparison with {note_name}")
note_text = row["modified_text"]
result = run_evaluation(
ai_content=ai_text,
ai_mcqs=mcqs_ai,
note_content=note_text,
note_mcqs=mcqs_ai,
note_name=note_name,
original_note_number=onum,
total_tokens=total_tokens,
document_type='discharge_note'
)
results.extend(result)
file_exists = os.path.exists(args.result_csv)
mode = 'a' if file_exists else 'w'
fieldnames = ["original_note_number", "new_note_name", "question", "source_document",
"options", "ideal_answer", "correct_answer", "ai_answer", "note_answer",
"timestamp", "total_tokens"]
with open(args.result_csv, mode, newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
if not file_exists:
writer.writeheader()
# Fix: Modify how we handle the results
for result in results: # results is already a list of dictionaries
result_dict = dict(result) # Create a copy of the result dictionary
result_dict["total_tokens"] = total_tokens[0] # Add token count
writer.writerow(result_dict)
print(f"\nResults written to {args.result_csv}")
print(f"Total tokens used: {total_tokens[0]}")
print("=== Done ===")
if __name__ == "__main__":
app.run(host='0.0.0.0', port=5000, debug=True)
#python dvd_evaluator.py --modified_csv "examples/example.csv" --result_csv "results.csv" --start 1 --end 2 --model "gpt-4o-mini"