quiz-generator-v3 / ui /objective_handlers.py
ecuartasm's picture
new lo
0b17ac5
import os
import re
import json
import shutil
from typing import List
from models.learning_objectives import LearningObjective
from .content_processor import ContentProcessor
from quiz_generator import QuizGenerator
from .state import get_processed_contents, set_processed_contents, set_learning_objectives
from .run_manager import get_run_manager
from .question_handlers import generate_questions
def process_files(files, num_objectives, num_runs, model_name, incorrect_answer_model_name, temperature):
"""Process uploaded files and generate learning objectives."""
run_manager = get_run_manager()
# Input validation
if not files:
return "Please upload at least one file.", None, None, None
if not os.getenv("OPENAI_API_KEY"):
return "OpenAI API key not found. Please set the OPENAI_API_KEY environment variable.", None, None, None
# Extract file paths
file_paths = _extract_file_paths(files)
if not file_paths:
return "No valid files found. Please upload valid .ipynb, .vtt, .srt, or .md files.", None, None, None
# Start run and logging
run_id = run_manager.start_objective_run(
files=file_paths,
num_objectives=num_objectives,
num_runs=num_runs,
model=model_name,
incorrect_answer_model=incorrect_answer_model_name,
temperature=temperature
)
run_manager.log(f"Processing {len(file_paths)} files: {[os.path.basename(f) for f in file_paths]}", level="DEBUG")
# Process files
processor = ContentProcessor()
file_contents = processor.process_files(file_paths)
if not file_contents:
run_manager.log("No content extracted from the uploaded files", level="ERROR")
return "No content extracted from the uploaded files.", None, None, None
run_manager.log(f"Successfully extracted content from {len(file_contents)} files", level="INFO")
# Store file contents for later use
set_processed_contents(file_contents)
# Generate learning objectives
run_manager.log(f"Creating QuizGenerator with model={model_name}, temperature={temperature}", level="INFO")
quiz_generator = QuizGenerator(
api_key=os.getenv("OPENAI_API_KEY"),
model=model_name,
temperature=float(temperature)
)
all_learning_objectives = _generate_multiple_runs(
quiz_generator, file_contents, num_objectives, num_runs, incorrect_answer_model_name, run_manager
)
# Group and rank objectives
grouped_result = _group_base_objectives_add_incorrect_answers(
quiz_generator, all_learning_objectives, file_contents, incorrect_answer_model_name, run_manager
)
# Format results for display
formatted_results = _format_objective_results(grouped_result, all_learning_objectives, num_objectives, run_manager)
# Store results
set_learning_objectives(grouped_result["all_grouped"])
# Save outputs to files
params = {
"files": [os.path.basename(f) for f in file_paths],
"num_objectives": num_objectives,
"num_runs": num_runs,
"model": model_name,
"incorrect_answer_model": incorrect_answer_model_name,
"temperature": temperature
}
run_manager.save_objectives_outputs(
best_in_group=formatted_results[1],
all_grouped=formatted_results[2],
raw_ungrouped=formatted_results[3],
params=params
)
# End run
run_manager.end_run(run_type="Learning Objectives")
return formatted_results
def regenerate_objectives(objectives_json, feedback, num_objectives, num_runs, model_name, temperature):
"""Regenerate learning objectives based on feedback."""
if not get_processed_contents():
return "No processed content available. Please upload files first.", objectives_json, objectives_json
if not os.getenv("OPENAI_API_KEY"):
return "OpenAI API key not found.", objectives_json, objectives_json
if not feedback:
return "Please provide feedback to regenerate learning objectives.", objectives_json, objectives_json
# Add feedback to file contents
file_contents_with_feedback = get_processed_contents().copy()
file_contents_with_feedback.append(f"FEEDBACK ON PREVIOUS OBJECTIVES: {feedback}")
# Generate with feedback
quiz_generator = QuizGenerator(
api_key=os.getenv("OPENAI_API_KEY"),
model=model_name,
temperature=float(temperature)
)
try:
# Generate multiple runs of learning objectives with feedback
all_learning_objectives = _generate_multiple_runs(
quiz_generator,
file_contents_with_feedback,
num_objectives,
num_runs,
model_name # Use the same model for incorrect answer suggestions
)
# Group and rank the objectives
grouping_result = _group_base_objectives_add_incorrect_answers(quiz_generator, all_base_learning_objectives, file_contents_with_feedback, model_name)
# Get the results
grouped_objectives = grouping_result["all_grouped"]
best_in_group_objectives = grouping_result["best_in_group"]
# Convert to JSON
grouped_objectives_json = json.dumps([obj.dict() for obj in grouped_objectives])
best_in_group_json = json.dumps([obj.dict() for obj in best_in_group_objectives])
return f"Generated {len(all_learning_objectives)} learning objectives, {len(best_in_group_objectives)} unique after grouping.", grouped_objectives_json, best_in_group_json
except Exception as e:
print(f"Error regenerating learning objectives: {e}")
import traceback
traceback.print_exc()
return f"Error regenerating learning objectives: {str(e)}", objectives_json, objectives_json
def _extract_file_paths(files):
"""Extract file paths from different input formats."""
file_paths = []
if isinstance(files, list):
for file in files:
if file and os.path.exists(file):
file_paths.append(file)
elif isinstance(files, str) and os.path.exists(files):
file_paths.append(files)
elif hasattr(files, 'name') and os.path.exists(files.name):
file_paths.append(files.name)
return file_paths
def _generate_multiple_runs(quiz_generator, file_contents, num_objectives, num_runs, incorrect_answer_model_name, run_manager):
"""Generate learning objectives across multiple runs."""
all_learning_objectives = []
num_runs_int = int(num_runs)
for run in range(num_runs_int):
run_manager.log(f"Starting generation run {run+1}/{num_runs_int}", level="INFO")
# Generate base learning objectives without grouping or incorrect answers
learning_objectives = quiz_generator.generate_base_learning_objectives(
file_contents, num_objectives, incorrect_answer_model_name
)
run_manager.log(f"Generated {len(learning_objectives)} learning objectives in run {run+1}", level="INFO")
# Assign temporary IDs
for i, obj in enumerate(learning_objectives):
obj.id = 1000 * (run + 1) + (i + 1)
all_learning_objectives.extend(learning_objectives)
run_manager.log(f"Total learning objectives from all runs: {len(all_learning_objectives)}", level="INFO")
return all_learning_objectives
def _group_base_objectives_add_incorrect_answers(quiz_generator, all_base_learning_objectives, file_contents, incorrect_answer_model_name=None, run_manager=None):
"""Group base learning objectives and add incorrect answers to best-in-group objectives."""
run_manager.log("Grouping base learning objectives...", level="INFO")
grouping_result = quiz_generator.group_base_learning_objectives(all_base_learning_objectives, file_contents)
grouped_objectives = grouping_result["all_grouped"]
best_in_group_objectives = grouping_result["best_in_group"]
run_manager.log(f"Grouped into {len(best_in_group_objectives)} best-in-group objectives", level="INFO")
# Find and reassign the best first objective to ID=1
_reassign_objective_ids(grouped_objectives, run_manager)
# Step 1: Generate incorrect answer suggestions only for best-in-group objectives
run_manager.log("Generating incorrect answer options only for best-in-group objectives...", level="INFO")
enhanced_best_in_group = quiz_generator.generate_lo_incorrect_answer_options(
file_contents, best_in_group_objectives, incorrect_answer_model_name
)
run_manager.log("Generated incorrect answer options", level="INFO")
# Clear debug directory for incorrect answer regeneration logs
debug_dir = os.path.join("incorrect_suggestion_debug")
if os.path.exists(debug_dir):
shutil.rmtree(debug_dir)
os.makedirs(debug_dir, exist_ok=True)
# Step 2: Run the improvement workflow on the generated incorrect answers
run_manager.log("Improving incorrect answer options for best-in-group objectives...", level="INFO")
improved_best_in_group = quiz_generator.learning_objective_generator.regenerate_incorrect_answers(
enhanced_best_in_group, file_contents
)
run_manager.log("Completed improvement of incorrect answer options", level="INFO")
# Create a map of best-in-group objectives by ID for easy lookup
best_in_group_map = {obj.id: obj for obj in improved_best_in_group}
# Process all grouped objectives
final_grouped_objectives = []
for grouped_obj in grouped_objectives:
if getattr(grouped_obj, "best_in_group", False):
# For best-in-group objectives, use the enhanced version with incorrect answers
if grouped_obj.id in best_in_group_map:
final_grouped_objectives.append(best_in_group_map[grouped_obj.id])
else:
# This shouldn't happen, but just in case
final_grouped_objectives.append(grouped_obj)
else:
# For non-best-in-group objectives, ensure they have empty incorrect answers
final_grouped_objectives.append(LearningObjective(
id=grouped_obj.id,
learning_objective=grouped_obj.learning_objective,
source_reference=grouped_obj.source_reference,
correct_answer=grouped_obj.correct_answer,
incorrect_answer_options=[], # Empty list for non-best-in-group
in_group=getattr(grouped_obj, 'in_group', None),
group_members=getattr(grouped_obj, 'group_members', None),
best_in_group=getattr(grouped_obj, 'best_in_group', None)
))
return {
"all_grouped": final_grouped_objectives,
"best_in_group": improved_best_in_group
}
def _reassign_objective_ids(grouped_objectives, run_manager):
"""Reassign IDs to ensure best first objective gets ID=1."""
# Find best first objective
best_first_objective = None
# First identify all groups containing objectives with IDs ending in 001
groups_with_001 = {}
for obj in grouped_objectives:
if obj.id % 1000 == 1: # ID ends in 001
group_members = getattr(obj, "group_members", [obj.id])
for member_id in group_members:
if member_id not in groups_with_001:
groups_with_001[member_id] = True
# Now find the best_in_group objective from these groups
for obj in grouped_objectives:
obj_id = getattr(obj, "id", 0)
group_members = getattr(obj, "group_members", [obj_id])
# Check if this objective is in a group with 001 objectives
is_in_001_group = any(member_id in groups_with_001 for member_id in group_members)
if is_in_001_group and getattr(obj, "best_in_group", False):
best_first_objective = obj
run_manager.log(f"Found best_in_group objective in a 001 group with ID={obj.id}", level="DEBUG")
break
# If no best_in_group from 001 groups found, fall back to the first 001 objective
if not best_first_objective:
for obj in grouped_objectives:
if obj.id % 1000 == 1: # First objective from a run
best_first_objective = obj
run_manager.log(f"No best_in_group from 001 groups found, using first 001 with ID={obj.id}", level="DEBUG")
break
# Reassign IDs
id_counter = 2
if best_first_objective:
best_first_objective.id = 1
run_manager.log(f"Reassigned primary objective to ID=1", level="INFO")
for obj in grouped_objectives:
if obj is best_first_objective:
continue
obj.id = id_counter
id_counter += 1
def _format_objective_results(grouped_result, all_learning_objectives, num_objectives, run_manager):
"""Format objective results for display."""
sorted_best_in_group = sorted(grouped_result["best_in_group"], key=lambda obj: obj.id)
sorted_all_grouped = sorted(grouped_result["all_grouped"], key=lambda obj: obj.id)
# Limit best-in-group to the requested number of objectives
sorted_best_in_group = sorted_best_in_group[:num_objectives]
run_manager.log("Formatting objective results for display", level="INFO")
run_manager.log(f"Best-in-group objectives limited to top {len(sorted_best_in_group)} (requested: {num_objectives})", level="INFO")
# Format best-in-group
formatted_best_in_group = []
for obj in sorted_best_in_group:
formatted_best_in_group.append({
"id": obj.id,
"learning_objective": obj.learning_objective,
"source_reference": obj.source_reference,
"correct_answer": obj.correct_answer,
"incorrect_answer_options": getattr(obj, 'incorrect_answer_options', None),
"in_group": getattr(obj, 'in_group', None),
"group_members": getattr(obj, 'group_members', None),
"best_in_group": getattr(obj, 'best_in_group', None)
})
# Format grouped
formatted_grouped = []
for obj in sorted_all_grouped:
formatted_grouped.append({
"id": obj.id,
"learning_objective": obj.learning_objective,
"source_reference": obj.source_reference,
"correct_answer": obj.correct_answer,
"incorrect_answer_options": getattr(obj, 'incorrect_answer_options', None),
"in_group": getattr(obj, 'in_group', None),
"group_members": getattr(obj, 'group_members', None),
"best_in_group": getattr(obj, 'best_in_group', None)
})
# Format unranked
formatted_unranked = []
for obj in all_learning_objectives:
formatted_unranked.append({
"id": obj.id,
"learning_objective": obj.learning_objective,
"source_reference": obj.source_reference,
"correct_answer": obj.correct_answer
})
run_manager.log(f"Formatted {len(formatted_best_in_group)} best-in-group, {len(formatted_grouped)} grouped, {len(formatted_unranked)} raw objectives", level="INFO")
return (
f"Generated and grouped {len(formatted_best_in_group)} unique learning objectives successfully. Saved to run: {run_manager.get_current_run_id()}",
json.dumps(formatted_best_in_group, indent=2),
json.dumps(formatted_grouped, indent=2),
json.dumps(formatted_unranked, indent=2)
)
def parse_user_learning_objectives(text: str) -> List[str]:
"""
Parse user-entered learning objectives text into a list of clean objective strings.
Handles common label formats:
- Numbered: "1. Objective" "2) Objective" "3: Objective"
- Lettered: "a. Objective" "b) Objective" "c: Objective"
- Plain: "Objective" (no label)
Trailing punctuation is preserved as it may be part of the sentence.
"""
objectives = []
for line in text.strip().split('\n'):
line = line.strip()
if not line:
continue
# Strip optional leading number/letter label followed by ., ), or :
cleaned = re.sub(r'^(\d+|[a-zA-Z])[\.\)\:]\s+', '', line)
if cleaned:
objectives.append(cleaned)
return objectives
def process_user_objectives(files, user_objectives_text, model_name, incorrect_answer_model_name, temperature):
"""
Process user-provided learning objectives using uploaded course materials.
Pipeline:
1. Parse objective texts from the user's input
2. Find source references in course materials for each objective
3. Generate a correct answer for each objective (same function as auto-generate flow)
4. Generate incorrect answer options (all objectives are treated as best-in-group)
5. Improve incorrect answer options iteratively
6. Return output in the same format as the auto-generate flow
"""
run_manager = get_run_manager()
# --- Input validation ---
if not files:
return "Please upload at least one file.", None, None, None
if not user_objectives_text or not user_objectives_text.strip():
return "Please enter at least one learning objective.", None, None, None
if not os.getenv("OPENAI_API_KEY"):
return "OpenAI API key not found. Please set the OPENAI_API_KEY environment variable.", None, None, None
file_paths = _extract_file_paths(files)
if not file_paths:
return "No valid files found. Please upload valid .ipynb, .vtt, .srt, or .md files.", None, None, None
objective_texts = parse_user_learning_objectives(user_objectives_text)
if not objective_texts:
return "No valid learning objectives found. Please enter at least one objective.", None, None, None
# --- Start run ---
run_manager.start_objective_run(
files=file_paths,
num_objectives=len(objective_texts),
num_runs=1,
model=model_name,
incorrect_answer_model=incorrect_answer_model_name,
temperature=temperature
)
run_manager.log(f"Processing {len(objective_texts)} user-provided learning objectives", level="INFO")
# --- Process course material files ---
processor = ContentProcessor()
file_contents = processor.process_files(file_paths)
if not file_contents:
run_manager.log("No content extracted from the uploaded files", level="ERROR")
return "No content extracted from the uploaded files.", None, None, None
run_manager.log(f"Successfully extracted content from {len(file_contents)} files", level="INFO")
set_processed_contents(file_contents)
quiz_generator = QuizGenerator(
api_key=os.getenv("OPENAI_API_KEY"),
model=model_name,
temperature=float(temperature)
)
# --- Step 1: Find source references in course materials ---
run_manager.log("Finding source references for user-provided objectives...", level="INFO")
from learning_objective_generator.base_generation import (
find_sources_for_user_objectives,
generate_correct_answers_for_objectives
)
objectives_without_answers = find_sources_for_user_objectives(
quiz_generator.client, model_name, float(temperature), file_contents, objective_texts
)
run_manager.log(f"Found sources for {len(objectives_without_answers)} objectives", level="INFO")
# --- Step 2: Generate correct answers ---
run_manager.log("Generating correct answers for user-provided objectives...", level="INFO")
base_objectives = generate_correct_answers_for_objectives(
quiz_generator.client, model_name, float(temperature), file_contents, objectives_without_answers
)
run_manager.log(f"Generated correct answers for {len(base_objectives)} objectives", level="INFO")
# --- Step 3: Generate incorrect answer options ---
run_manager.log("Generating incorrect answer options...", level="INFO")
debug_dir = os.path.join("incorrect_suggestion_debug")
if os.path.exists(debug_dir):
shutil.rmtree(debug_dir)
os.makedirs(debug_dir, exist_ok=True)
enhanced_objectives = quiz_generator.generate_lo_incorrect_answer_options(
file_contents, base_objectives, incorrect_answer_model_name
)
run_manager.log("Generated incorrect answer options", level="INFO")
# --- Step 4: Improve incorrect answers iteratively ---
run_manager.log("Improving incorrect answer options...", level="INFO")
improved_objectives = quiz_generator.learning_objective_generator.regenerate_incorrect_answers(
enhanced_objectives, file_contents
)
run_manager.log("Completed improvement of incorrect answer options", level="INFO")
# All user-provided objectives are their own group and all are best-in-group
for obj in improved_objectives:
obj.in_group = False
obj.group_members = [obj.id]
obj.best_in_group = True
set_learning_objectives(improved_objectives)
# --- Format and return results ---
formatted_results = _format_user_objective_results(improved_objectives, run_manager)
params = {
"files": [os.path.basename(f) for f in file_paths],
"num_objectives": len(objective_texts),
"num_runs": 1,
"model": model_name,
"incorrect_answer_model": incorrect_answer_model_name,
"temperature": temperature,
"source": "user-provided"
}
run_manager.save_objectives_outputs(
best_in_group=formatted_results[1],
all_grouped=formatted_results[2],
raw_ungrouped=formatted_results[3],
params=params
)
run_manager.end_run(run_type="Learning Objectives (User-provided)")
return formatted_results
def _format_user_objective_results(objectives, run_manager):
"""Format user-provided objective results for display (same structure as auto-generated)."""
sorted_objectives = sorted(objectives, key=lambda obj: obj.id)
run_manager.log(f"Formatting {len(sorted_objectives)} user-provided objectives for display", level="INFO")
formatted_best_in_group = []
for obj in sorted_objectives:
formatted_best_in_group.append({
"id": obj.id,
"learning_objective": obj.learning_objective,
"source_reference": obj.source_reference,
"correct_answer": obj.correct_answer,
"incorrect_answer_options": getattr(obj, 'incorrect_answer_options', None),
"in_group": getattr(obj, 'in_group', None),
"group_members": getattr(obj, 'group_members', None),
"best_in_group": getattr(obj, 'best_in_group', None)
})
# Grouped view is identical to best-in-group (no grouping was performed)
formatted_grouped = formatted_best_in_group
# Raw view: base fields only (no incorrect answers), for the debug panel
formatted_unranked = [
{
"id": obj.id,
"learning_objective": obj.learning_objective,
"source_reference": obj.source_reference,
"correct_answer": obj.correct_answer
}
for obj in sorted_objectives
]
return (
f"Processed {len(formatted_best_in_group)} user-provided learning objectives successfully. Saved to run: {run_manager.get_current_run_id()}",
json.dumps(formatted_best_in_group, indent=2),
json.dumps(formatted_grouped, indent=2),
json.dumps(formatted_unranked, indent=2)
)
def process_user_objectives_and_generate_questions(files, user_objectives_text, model_name, incorrect_answer_model_name,
temperature, model_name_q, temperature_q, num_questions, num_runs_q):
"""Process user-provided objectives and then generate questions in one flow."""
obj_results = process_user_objectives(files, user_objectives_text, model_name, incorrect_answer_model_name, temperature)
status_obj, objectives_output, grouped_output, raw_ungrouped_output = obj_results
if not objectives_output or objectives_output is None:
return (
status_obj, objectives_output, grouped_output, raw_ungrouped_output,
"Learning objectives processing failed. Cannot proceed with questions.",
None, None, None
)
question_results = generate_questions(objectives_output, model_name_q, temperature_q, num_questions, num_runs_q)
status_q, best_questions_output, all_questions_output, formatted_quiz_output = question_results
return (
f"{status_obj}\n\nThen:\n{status_q}",
objectives_output, grouped_output, raw_ungrouped_output,
status_q, best_questions_output, all_questions_output, formatted_quiz_output
)
def process_files_and_generate_questions(files, num_objectives, num_runs, model_name, incorrect_answer_model_name,
temperature, model_name_q, temperature_q, num_questions, num_runs_q):
"""Process files, generate learning objectives, and then generate questions in one flow."""
# First, generate learning objectives
obj_results = process_files(files, num_objectives, num_runs, model_name, incorrect_answer_model_name, temperature)
# obj_results contains: (status, objectives_output, grouped_output, raw_ungrouped_output)
status_obj, objectives_output, grouped_output, raw_ungrouped_output = obj_results
# Check if objectives generation failed
if not objectives_output or objectives_output is None:
# Return error status for objectives and empty values for questions
return (
status_obj, # status_output
objectives_output, # objectives_output
grouped_output, # grouped_output
raw_ungrouped_output, # raw_ungrouped_output
"Learning objectives generation failed. Cannot proceed with questions.", # status_q_output
None, # best_questions_output
None, # all_questions_output
None # formatted_quiz_output
)
# Now generate questions using the objectives
question_results = generate_questions(objectives_output, model_name_q, temperature_q, num_questions, num_runs_q)
# question_results contains: (status_q, best_questions_output, all_questions_output, formatted_quiz_output)
status_q, best_questions_output, all_questions_output, formatted_quiz_output = question_results
# Combine the status messages
combined_status = f"{status_obj}\n\nThen:\n{status_q}"
# Return all 8 outputs
return (
combined_status, # status_output
objectives_output, # objectives_output
grouped_output, # grouped_output
raw_ungrouped_output, # raw_ungrouped_output
status_q, # status_q_output
best_questions_output, # best_questions_output
all_questions_output, # all_questions_output
formatted_quiz_output # formatted_quiz_output
)