quiz-generator-v3 / ui /question_handlers.py
ecuartasm's picture
Initial commit: AI Course Assessment Generator
217abc3
import os
import json
import shutil
from typing import List
from quiz_generator import QuizGenerator
from models import LearningObjective
from .state import get_processed_contents
from .formatting import format_quiz_for_ui
from .run_manager import get_run_manager
def generate_questions(objectives_json, model_name, temperature, num_questions, num_runs):
"""Generate questions based on approved learning objectives."""
run_manager = get_run_manager()
# Input validation
if not objectives_json:
return "No learning objectives provided.", None, None, None
if not os.getenv("OPENAI_API_KEY"):
return "OpenAI API key not found.", None, None, None
if not get_processed_contents():
return "No processed content available. Please go back to the first tab and upload files.", None, None, None
# Parse and create learning objectives
learning_objectives = _parse_learning_objectives(objectives_json)
if not learning_objectives:
run_manager.log("Invalid learning objectives JSON", level="ERROR")
return "Invalid learning objectives JSON.", None, None, None
# Start question run
run_id = run_manager.start_question_run(
objectives_count=len(learning_objectives),
model=model_name,
temperature=temperature,
num_questions=int(num_questions),
num_runs=int(num_runs)
)
run_manager.log(f"Parsed {len(learning_objectives)} learning objectives", level="INFO")
run_manager.log(f"Target total questions: {num_questions}", level="INFO")
# Generate questions
run_manager.log(f"Creating QuizGenerator with model={model_name}, temperature={temperature}", level="INFO")
quiz_generator = QuizGenerator(
api_key=os.getenv("OPENAI_API_KEY"),
model=model_name,
temperature=float(temperature)
)
all_questions = _generate_questions_multiple_runs(
quiz_generator, learning_objectives, int(num_questions), num_runs, run_manager
)
# Group and rank questions
results = _group_and_rank_questions(quiz_generator, all_questions, run_manager)
# Improve incorrect answers
#_improve_incorrect_answers(quiz_generator, results["best_in_group_ranked"])
# Format results
formatted_results = _format_question_results(results, int(num_questions), run_manager)
# Save outputs to files
params = {
"objectives_count": len(learning_objectives),
"model": model_name,
"temperature": temperature,
"num_questions": int(num_questions),
"num_runs": int(num_runs)
}
run_manager.save_questions_outputs(
best_ranked=formatted_results[1],
all_grouped=formatted_results[2],
formatted_quiz=formatted_results[3],
params=params
)
# End run
run_manager.end_run(run_type="Questions")
return formatted_results
def _parse_learning_objectives(objectives_json):
"""Parse learning objectives from JSON."""
try:
objectives_data = json.loads(objectives_json)
learning_objectives = []
for obj_data in objectives_data:
obj = LearningObjective(
id=obj_data["id"],
learning_objective=obj_data["learning_objective"],
source_reference=obj_data["source_reference"],
correct_answer=obj_data["correct_answer"],
incorrect_answer_options=obj_data["incorrect_answer_options"]
)
learning_objectives.append(obj)
return learning_objectives
except json.JSONDecodeError:
return None
def _generate_questions_multiple_runs(quiz_generator, learning_objectives, num_questions, num_runs, run_manager):
"""Generate questions across multiple runs with proportional distribution."""
all_questions = []
num_runs_int = int(num_runs)
num_objectives = len(learning_objectives)
# Calculate proportional distribution of questions across objectives
distribution = _calculate_proportional_distribution(num_questions, num_objectives)
run_manager.log(f"Question distribution across {num_objectives} objectives: {distribution}", level="INFO")
# Select which objectives to use based on distribution
objectives_to_use = []
for i, count in enumerate(distribution):
if count > 0 and i < len(learning_objectives):
objectives_to_use.append((learning_objectives[i], count))
run_manager.log(f"Using {len(objectives_to_use)} learning objectives for question generation", level="INFO")
for run in range(num_runs_int):
run_manager.log(f"Starting question generation run {run+1}/{num_runs_int}", level="INFO")
# Generate questions for each selected objective with its assigned count
for obj, question_count in objectives_to_use:
run_manager.log(f"Generating {question_count} question(s) for objective {obj.id}: {obj.learning_objective[:80]}...", level="INFO")
for q_num in range(question_count):
run_questions = quiz_generator.generate_questions_in_parallel(
[obj], get_processed_contents()
)
if run_questions:
run_manager.log(f"Generated question {q_num+1}/{question_count} for objective {obj.id}", level="DEBUG")
all_questions.extend(run_questions)
run_manager.log(f"Generated {len(all_questions)} questions so far in run {run+1}", level="INFO")
# Assign unique IDs
for i, q in enumerate(all_questions):
q.id = i + 1
run_manager.log(f"Total questions from all runs: {len(all_questions)}", level="INFO")
return all_questions
def _calculate_proportional_distribution(num_questions, num_objectives):
"""Calculate how to distribute N questions across M objectives proportionally."""
if num_questions <= 0 or num_objectives <= 0:
return []
# If we have more objectives than questions, only use as many objectives as we have questions
if num_questions < num_objectives:
distribution = [1] * num_questions + [0] * (num_objectives - num_questions)
return distribution
# Calculate base questions per objective and remainder
base_per_objective = num_questions // num_objectives
remainder = num_questions % num_objectives
# Distribute evenly, giving extra questions to the first 'remainder' objectives
distribution = [base_per_objective + (1 if i < remainder else 0) for i in range(num_objectives)]
return distribution
def _group_and_rank_questions(quiz_generator, all_questions, run_manager):
"""Group and rank questions."""
run_manager.log(f"Grouping {len(all_questions)} questions by similarity...", level="INFO")
grouping_result = quiz_generator.group_questions(all_questions, get_processed_contents())
run_manager.log(f"Grouped into {len(grouping_result['best_in_group'])} best-in-group questions", level="INFO")
# Rank ALL grouped questions (not just best-in-group) to ensure we have enough questions for selection
run_manager.log(f"Ranking all {len(grouping_result['grouped'])} grouped questions...", level="INFO")
ranking_result = quiz_generator.rank_questions(grouping_result['grouped'], get_processed_contents())
run_manager.log("Completed ranking of questions", level="INFO")
return {
"grouped": grouping_result["grouped"],
"all_ranked": ranking_result["ranked"]
}
def _improve_incorrect_answers(quiz_generator, questions):
"""Improve incorrect answer options."""
# Clear debug directory
debug_dir = os.path.join("wrong_answer_debug")
if os.path.exists(debug_dir):
shutil.rmtree(debug_dir)
os.makedirs(debug_dir, exist_ok=True)
quiz_generator.regenerate_incorrect_answers(questions, get_processed_contents())
def _format_question_results(results, num_questions, run_manager):
"""Format question results for display."""
run_manager.log("Formatting question results for display", level="INFO")
# Format all ranked questions (these will be the top N questions from all grouped questions)
formatted_best_questions = []
for q in results["all_ranked"]:
formatted_best_questions.append({
"id": q.id,
"question_text": q.question_text,
"options": [{"text": opt.option_text, "is_correct": opt.is_correct, "feedback": opt.feedback} for opt in q.options],
"learning_objective_id": q.learning_objective_id,
"learning_objective": q.learning_objective,
"correct_answer": q.correct_answer,
"source_reference": q.source_reference,
"rank": getattr(q, "rank", None),
"ranking_reasoning": getattr(q, "ranking_reasoning", None),
"in_group": getattr(q, "in_group", None),
"group_members": getattr(q, "group_members", None),
"best_in_group": getattr(q, "best_in_group", None)
})
# Format all grouped questions
formatted_all_questions = []
for q in results["grouped"]:
formatted_all_questions.append({
"id": q.id,
"question_text": q.question_text,
"options": [{"text": opt.option_text, "is_correct": opt.is_correct, "feedback": opt.feedback} for opt in q.options],
"learning_objective_id": q.learning_objective_id,
"learning_objective": q.learning_objective,
"correct_answer": q.correct_answer,
"source_reference": q.source_reference,
"in_group": getattr(q, "in_group", None),
"group_members": getattr(q, "group_members", None),
"best_in_group": getattr(q, "best_in_group", None)
})
# Limit formatted quiz and best-ranked to the requested number of questions
formatted_best_questions_limited = formatted_best_questions[:num_questions]
formatted_quiz = format_quiz_for_ui(json.dumps(formatted_best_questions_limited, indent=2))
run_manager.log(f"Formatted {len(formatted_best_questions)} best-ranked, {len(formatted_all_questions)} grouped questions", level="INFO")
run_manager.log(f"Best-ranked and formatted quiz limited to top {len(formatted_best_questions_limited)} questions (requested: {num_questions})", level="INFO")
return (
f"Generated and ranked {len(formatted_best_questions_limited)} unique questions successfully. Saved to run: {run_manager.get_current_run_id()}/{run_manager.get_current_question_run_id()}",
json.dumps(formatted_best_questions_limited, indent=2),
json.dumps(formatted_all_questions, indent=2),
formatted_quiz
)