import os import json import shutil from typing import List from quiz_generator import QuizGenerator from models import LearningObjective from .state import get_processed_contents from .formatting import format_quiz_for_ui from .run_manager import get_run_manager def generate_questions(objectives_json, model_name, temperature, num_questions, num_runs): """Generate questions based on approved learning objectives.""" run_manager = get_run_manager() # Input validation if not objectives_json: return "No learning objectives provided.", None, None, None if not os.getenv("OPENAI_API_KEY"): return "OpenAI API key not found.", None, None, None if not get_processed_contents(): return "No processed content available. Please go back to the first tab and upload files.", None, None, None # Parse and create learning objectives learning_objectives = _parse_learning_objectives(objectives_json) if not learning_objectives: run_manager.log("Invalid learning objectives JSON", level="ERROR") return "Invalid learning objectives JSON.", None, None, None # Start question run run_id = run_manager.start_question_run( objectives_count=len(learning_objectives), model=model_name, temperature=temperature, num_questions=int(num_questions), num_runs=int(num_runs) ) run_manager.log(f"Parsed {len(learning_objectives)} learning objectives", level="INFO") run_manager.log(f"Target total questions: {num_questions}", level="INFO") # Generate questions run_manager.log(f"Creating QuizGenerator with model={model_name}, temperature={temperature}", level="INFO") quiz_generator = QuizGenerator( api_key=os.getenv("OPENAI_API_KEY"), model=model_name, temperature=float(temperature) ) all_questions = _generate_questions_multiple_runs( quiz_generator, learning_objectives, int(num_questions), num_runs, run_manager ) # Group and rank questions results = _group_and_rank_questions(quiz_generator, all_questions, run_manager) # Improve incorrect answers #_improve_incorrect_answers(quiz_generator, results["best_in_group_ranked"]) # Format results formatted_results = _format_question_results(results, int(num_questions), run_manager) # Save outputs to files params = { "objectives_count": len(learning_objectives), "model": model_name, "temperature": temperature, "num_questions": int(num_questions), "num_runs": int(num_runs) } run_manager.save_questions_outputs( best_ranked=formatted_results[1], all_grouped=formatted_results[2], formatted_quiz=formatted_results[3], params=params ) # End run run_manager.end_run(run_type="Questions") return formatted_results def _parse_learning_objectives(objectives_json): """Parse learning objectives from JSON.""" try: objectives_data = json.loads(objectives_json) learning_objectives = [] for obj_data in objectives_data: obj = LearningObjective( id=obj_data["id"], learning_objective=obj_data["learning_objective"], source_reference=obj_data["source_reference"], correct_answer=obj_data["correct_answer"], incorrect_answer_options=obj_data["incorrect_answer_options"] ) learning_objectives.append(obj) return learning_objectives except json.JSONDecodeError: return None def _generate_questions_multiple_runs(quiz_generator, learning_objectives, num_questions, num_runs, run_manager): """Generate questions across multiple runs with proportional distribution.""" all_questions = [] num_runs_int = int(num_runs) num_objectives = len(learning_objectives) # Calculate proportional distribution of questions across objectives distribution = _calculate_proportional_distribution(num_questions, num_objectives) run_manager.log(f"Question distribution across {num_objectives} objectives: {distribution}", level="INFO") # Select which objectives to use based on distribution objectives_to_use = [] for i, count in enumerate(distribution): if count > 0 and i < len(learning_objectives): objectives_to_use.append((learning_objectives[i], count)) run_manager.log(f"Using {len(objectives_to_use)} learning objectives for question generation", level="INFO") for run in range(num_runs_int): run_manager.log(f"Starting question generation run {run+1}/{num_runs_int}", level="INFO") # Generate questions for each selected objective with its assigned count for obj, question_count in objectives_to_use: run_manager.log(f"Generating {question_count} question(s) for objective {obj.id}: {obj.learning_objective[:80]}...", level="INFO") for q_num in range(question_count): run_questions = quiz_generator.generate_questions_in_parallel( [obj], get_processed_contents() ) if run_questions: run_manager.log(f"Generated question {q_num+1}/{question_count} for objective {obj.id}", level="DEBUG") all_questions.extend(run_questions) run_manager.log(f"Generated {len(all_questions)} questions so far in run {run+1}", level="INFO") # Assign unique IDs for i, q in enumerate(all_questions): q.id = i + 1 run_manager.log(f"Total questions from all runs: {len(all_questions)}", level="INFO") return all_questions def _calculate_proportional_distribution(num_questions, num_objectives): """Calculate how to distribute N questions across M objectives proportionally.""" if num_questions <= 0 or num_objectives <= 0: return [] # If we have more objectives than questions, only use as many objectives as we have questions if num_questions < num_objectives: distribution = [1] * num_questions + [0] * (num_objectives - num_questions) return distribution # Calculate base questions per objective and remainder base_per_objective = num_questions // num_objectives remainder = num_questions % num_objectives # Distribute evenly, giving extra questions to the first 'remainder' objectives distribution = [base_per_objective + (1 if i < remainder else 0) for i in range(num_objectives)] return distribution def _group_and_rank_questions(quiz_generator, all_questions, run_manager): """Group and rank questions.""" run_manager.log(f"Grouping {len(all_questions)} questions by similarity...", level="INFO") grouping_result = quiz_generator.group_questions(all_questions, get_processed_contents()) run_manager.log(f"Grouped into {len(grouping_result['best_in_group'])} best-in-group questions", level="INFO") # Rank ALL grouped questions (not just best-in-group) to ensure we have enough questions for selection run_manager.log(f"Ranking all {len(grouping_result['grouped'])} grouped questions...", level="INFO") ranking_result = quiz_generator.rank_questions(grouping_result['grouped'], get_processed_contents()) run_manager.log("Completed ranking of questions", level="INFO") return { "grouped": grouping_result["grouped"], "all_ranked": ranking_result["ranked"] } def _improve_incorrect_answers(quiz_generator, questions): """Improve incorrect answer options.""" # Clear debug directory debug_dir = os.path.join("wrong_answer_debug") if os.path.exists(debug_dir): shutil.rmtree(debug_dir) os.makedirs(debug_dir, exist_ok=True) quiz_generator.regenerate_incorrect_answers(questions, get_processed_contents()) def _format_question_results(results, num_questions, run_manager): """Format question results for display.""" run_manager.log("Formatting question results for display", level="INFO") # Format all ranked questions (these will be the top N questions from all grouped questions) formatted_best_questions = [] for q in results["all_ranked"]: formatted_best_questions.append({ "id": q.id, "question_text": q.question_text, "options": [{"text": opt.option_text, "is_correct": opt.is_correct, "feedback": opt.feedback} for opt in q.options], "learning_objective_id": q.learning_objective_id, "learning_objective": q.learning_objective, "correct_answer": q.correct_answer, "source_reference": q.source_reference, "rank": getattr(q, "rank", None), "ranking_reasoning": getattr(q, "ranking_reasoning", None), "in_group": getattr(q, "in_group", None), "group_members": getattr(q, "group_members", None), "best_in_group": getattr(q, "best_in_group", None) }) # Format all grouped questions formatted_all_questions = [] for q in results["grouped"]: formatted_all_questions.append({ "id": q.id, "question_text": q.question_text, "options": [{"text": opt.option_text, "is_correct": opt.is_correct, "feedback": opt.feedback} for opt in q.options], "learning_objective_id": q.learning_objective_id, "learning_objective": q.learning_objective, "correct_answer": q.correct_answer, "source_reference": q.source_reference, "in_group": getattr(q, "in_group", None), "group_members": getattr(q, "group_members", None), "best_in_group": getattr(q, "best_in_group", None) }) # Limit formatted quiz and best-ranked to the requested number of questions formatted_best_questions_limited = formatted_best_questions[:num_questions] formatted_quiz = format_quiz_for_ui(json.dumps(formatted_best_questions_limited, indent=2)) run_manager.log(f"Formatted {len(formatted_best_questions)} best-ranked, {len(formatted_all_questions)} grouped questions", level="INFO") run_manager.log(f"Best-ranked and formatted quiz limited to top {len(formatted_best_questions_limited)} questions (requested: {num_questions})", level="INFO") return ( f"Generated and ranked {len(formatted_best_questions_limited)} unique questions successfully. Saved to run: {run_manager.get_current_run_id()}/{run_manager.get_current_question_run_id()}", json.dumps(formatted_best_questions_limited, indent=2), json.dumps(formatted_all_questions, indent=2), formatted_quiz )