Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import shutil | |
| from typing import List | |
| from quiz_generator import QuizGenerator | |
| from models import LearningObjective | |
| from .state import get_processed_contents | |
| from .formatting import format_quiz_for_ui | |
| from .run_manager import get_run_manager | |
| def generate_questions(objectives_json, model_name, temperature, num_questions, num_runs): | |
| """Generate questions based on approved learning objectives.""" | |
| run_manager = get_run_manager() | |
| # Input validation | |
| if not objectives_json: | |
| return "No learning objectives provided.", None, None, None | |
| if not os.getenv("OPENAI_API_KEY"): | |
| return "OpenAI API key not found.", None, None, None | |
| if not get_processed_contents(): | |
| return "No processed content available. Please go back to the first tab and upload files.", None, None, None | |
| # Parse and create learning objectives | |
| learning_objectives = _parse_learning_objectives(objectives_json) | |
| if not learning_objectives: | |
| run_manager.log("Invalid learning objectives JSON", level="ERROR") | |
| return "Invalid learning objectives JSON.", None, None, None | |
| # Start question run | |
| run_id = run_manager.start_question_run( | |
| objectives_count=len(learning_objectives), | |
| model=model_name, | |
| temperature=temperature, | |
| num_questions=int(num_questions), | |
| num_runs=int(num_runs) | |
| ) | |
| run_manager.log(f"Parsed {len(learning_objectives)} learning objectives", level="INFO") | |
| run_manager.log(f"Target total questions: {num_questions}", level="INFO") | |
| # Generate questions | |
| run_manager.log(f"Creating QuizGenerator with model={model_name}, temperature={temperature}", level="INFO") | |
| quiz_generator = QuizGenerator( | |
| api_key=os.getenv("OPENAI_API_KEY"), | |
| model=model_name, | |
| temperature=float(temperature) | |
| ) | |
| all_questions = _generate_questions_multiple_runs( | |
| quiz_generator, learning_objectives, int(num_questions), num_runs, run_manager | |
| ) | |
| # Group and rank questions | |
| results = _group_and_rank_questions(quiz_generator, all_questions, run_manager) | |
| # Improve incorrect answers | |
| #_improve_incorrect_answers(quiz_generator, results["best_in_group_ranked"]) | |
| # Format results | |
| formatted_results = _format_question_results(results, int(num_questions), run_manager) | |
| # Save outputs to files | |
| params = { | |
| "objectives_count": len(learning_objectives), | |
| "model": model_name, | |
| "temperature": temperature, | |
| "num_questions": int(num_questions), | |
| "num_runs": int(num_runs) | |
| } | |
| run_manager.save_questions_outputs( | |
| best_ranked=formatted_results[1], | |
| all_grouped=formatted_results[2], | |
| formatted_quiz=formatted_results[3], | |
| params=params | |
| ) | |
| # End run | |
| run_manager.end_run(run_type="Questions") | |
| return formatted_results | |
| def _parse_learning_objectives(objectives_json): | |
| """Parse learning objectives from JSON.""" | |
| try: | |
| objectives_data = json.loads(objectives_json) | |
| learning_objectives = [] | |
| for obj_data in objectives_data: | |
| obj = LearningObjective( | |
| id=obj_data["id"], | |
| learning_objective=obj_data["learning_objective"], | |
| source_reference=obj_data["source_reference"], | |
| correct_answer=obj_data["correct_answer"], | |
| incorrect_answer_options=obj_data["incorrect_answer_options"] | |
| ) | |
| learning_objectives.append(obj) | |
| return learning_objectives | |
| except json.JSONDecodeError: | |
| return None | |
| def _generate_questions_multiple_runs(quiz_generator, learning_objectives, num_questions, num_runs, run_manager): | |
| """Generate questions across multiple runs with proportional distribution.""" | |
| all_questions = [] | |
| num_runs_int = int(num_runs) | |
| num_objectives = len(learning_objectives) | |
| # Calculate proportional distribution of questions across objectives | |
| distribution = _calculate_proportional_distribution(num_questions, num_objectives) | |
| run_manager.log(f"Question distribution across {num_objectives} objectives: {distribution}", level="INFO") | |
| # Select which objectives to use based on distribution | |
| objectives_to_use = [] | |
| for i, count in enumerate(distribution): | |
| if count > 0 and i < len(learning_objectives): | |
| objectives_to_use.append((learning_objectives[i], count)) | |
| run_manager.log(f"Using {len(objectives_to_use)} learning objectives for question generation", level="INFO") | |
| for run in range(num_runs_int): | |
| run_manager.log(f"Starting question generation run {run+1}/{num_runs_int}", level="INFO") | |
| # Generate questions for each selected objective with its assigned count | |
| for obj, question_count in objectives_to_use: | |
| run_manager.log(f"Generating {question_count} question(s) for objective {obj.id}: {obj.learning_objective[:80]}...", level="INFO") | |
| for q_num in range(question_count): | |
| run_questions = quiz_generator.generate_questions_in_parallel( | |
| [obj], get_processed_contents() | |
| ) | |
| if run_questions: | |
| run_manager.log(f"Generated question {q_num+1}/{question_count} for objective {obj.id}", level="DEBUG") | |
| all_questions.extend(run_questions) | |
| run_manager.log(f"Generated {len(all_questions)} questions so far in run {run+1}", level="INFO") | |
| # Assign unique IDs | |
| for i, q in enumerate(all_questions): | |
| q.id = i + 1 | |
| run_manager.log(f"Total questions from all runs: {len(all_questions)}", level="INFO") | |
| return all_questions | |
| def _calculate_proportional_distribution(num_questions, num_objectives): | |
| """Calculate how to distribute N questions across M objectives proportionally.""" | |
| if num_questions <= 0 or num_objectives <= 0: | |
| return [] | |
| # If we have more objectives than questions, only use as many objectives as we have questions | |
| if num_questions < num_objectives: | |
| distribution = [1] * num_questions + [0] * (num_objectives - num_questions) | |
| return distribution | |
| # Calculate base questions per objective and remainder | |
| base_per_objective = num_questions // num_objectives | |
| remainder = num_questions % num_objectives | |
| # Distribute evenly, giving extra questions to the first 'remainder' objectives | |
| distribution = [base_per_objective + (1 if i < remainder else 0) for i in range(num_objectives)] | |
| return distribution | |
| def _group_and_rank_questions(quiz_generator, all_questions, run_manager): | |
| """Group and rank questions.""" | |
| run_manager.log(f"Grouping {len(all_questions)} questions by similarity...", level="INFO") | |
| grouping_result = quiz_generator.group_questions(all_questions, get_processed_contents()) | |
| run_manager.log(f"Grouped into {len(grouping_result['best_in_group'])} best-in-group questions", level="INFO") | |
| # Rank ALL grouped questions (not just best-in-group) to ensure we have enough questions for selection | |
| run_manager.log(f"Ranking all {len(grouping_result['grouped'])} grouped questions...", level="INFO") | |
| ranking_result = quiz_generator.rank_questions(grouping_result['grouped'], get_processed_contents()) | |
| run_manager.log("Completed ranking of questions", level="INFO") | |
| return { | |
| "grouped": grouping_result["grouped"], | |
| "all_ranked": ranking_result["ranked"] | |
| } | |
| def _improve_incorrect_answers(quiz_generator, questions): | |
| """Improve incorrect answer options.""" | |
| # Clear debug directory | |
| debug_dir = os.path.join("wrong_answer_debug") | |
| if os.path.exists(debug_dir): | |
| shutil.rmtree(debug_dir) | |
| os.makedirs(debug_dir, exist_ok=True) | |
| quiz_generator.regenerate_incorrect_answers(questions, get_processed_contents()) | |
| def _format_question_results(results, num_questions, run_manager): | |
| """Format question results for display.""" | |
| run_manager.log("Formatting question results for display", level="INFO") | |
| # Format all ranked questions (these will be the top N questions from all grouped questions) | |
| formatted_best_questions = [] | |
| for q in results["all_ranked"]: | |
| formatted_best_questions.append({ | |
| "id": q.id, | |
| "question_text": q.question_text, | |
| "options": [{"text": opt.option_text, "is_correct": opt.is_correct, "feedback": opt.feedback} for opt in q.options], | |
| "learning_objective_id": q.learning_objective_id, | |
| "learning_objective": q.learning_objective, | |
| "correct_answer": q.correct_answer, | |
| "source_reference": q.source_reference, | |
| "rank": getattr(q, "rank", None), | |
| "ranking_reasoning": getattr(q, "ranking_reasoning", None), | |
| "in_group": getattr(q, "in_group", None), | |
| "group_members": getattr(q, "group_members", None), | |
| "best_in_group": getattr(q, "best_in_group", None) | |
| }) | |
| # Format all grouped questions | |
| formatted_all_questions = [] | |
| for q in results["grouped"]: | |
| formatted_all_questions.append({ | |
| "id": q.id, | |
| "question_text": q.question_text, | |
| "options": [{"text": opt.option_text, "is_correct": opt.is_correct, "feedback": opt.feedback} for opt in q.options], | |
| "learning_objective_id": q.learning_objective_id, | |
| "learning_objective": q.learning_objective, | |
| "correct_answer": q.correct_answer, | |
| "source_reference": q.source_reference, | |
| "in_group": getattr(q, "in_group", None), | |
| "group_members": getattr(q, "group_members", None), | |
| "best_in_group": getattr(q, "best_in_group", None) | |
| }) | |
| # Limit formatted quiz and best-ranked to the requested number of questions | |
| formatted_best_questions_limited = formatted_best_questions[:num_questions] | |
| formatted_quiz = format_quiz_for_ui(json.dumps(formatted_best_questions_limited, indent=2)) | |
| run_manager.log(f"Formatted {len(formatted_best_questions)} best-ranked, {len(formatted_all_questions)} grouped questions", level="INFO") | |
| run_manager.log(f"Best-ranked and formatted quiz limited to top {len(formatted_best_questions_limited)} questions (requested: {num_questions})", level="INFO") | |
| return ( | |
| f"Generated and ranked {len(formatted_best_questions_limited)} unique questions successfully. Saved to run: {run_manager.get_current_run_id()}/{run_manager.get_current_question_run_id()}", | |
| json.dumps(formatted_best_questions_limited, indent=2), | |
| json.dumps(formatted_all_questions, indent=2), | |
| formatted_quiz | |
| ) | |