import os import re import json import shutil from typing import List from models.learning_objectives import LearningObjective from .content_processor import ContentProcessor from quiz_generator import QuizGenerator from .state import get_processed_contents, set_processed_contents, set_learning_objectives from .run_manager import get_run_manager from .question_handlers import generate_questions def process_files(files, num_objectives, num_runs, model_name, incorrect_answer_model_name, temperature): """Process uploaded files and generate learning objectives.""" run_manager = get_run_manager() # Input validation if not files: return "Please upload at least one file.", None, None, None if not os.getenv("OPENAI_API_KEY"): return "OpenAI API key not found. Please set the OPENAI_API_KEY environment variable.", None, None, None # Extract file paths file_paths = _extract_file_paths(files) if not file_paths: return "No valid files found. Please upload valid .ipynb, .vtt, .srt, or .md files.", None, None, None # Start run and logging run_id = run_manager.start_objective_run( files=file_paths, num_objectives=num_objectives, num_runs=num_runs, model=model_name, incorrect_answer_model=incorrect_answer_model_name, temperature=temperature ) run_manager.log(f"Processing {len(file_paths)} files: {[os.path.basename(f) for f in file_paths]}", level="DEBUG") # Process files processor = ContentProcessor() file_contents = processor.process_files(file_paths) if not file_contents: run_manager.log("No content extracted from the uploaded files", level="ERROR") return "No content extracted from the uploaded files.", None, None, None run_manager.log(f"Successfully extracted content from {len(file_contents)} files", level="INFO") # Store file contents for later use set_processed_contents(file_contents) # Generate learning objectives run_manager.log(f"Creating QuizGenerator with model={model_name}, temperature={temperature}", level="INFO") quiz_generator = QuizGenerator( api_key=os.getenv("OPENAI_API_KEY"), model=model_name, temperature=float(temperature) ) all_learning_objectives = _generate_multiple_runs( quiz_generator, file_contents, num_objectives, num_runs, incorrect_answer_model_name, run_manager ) # Group and rank objectives grouped_result = _group_base_objectives_add_incorrect_answers( quiz_generator, all_learning_objectives, file_contents, incorrect_answer_model_name, run_manager ) # Format results for display formatted_results = _format_objective_results(grouped_result, all_learning_objectives, num_objectives, run_manager) # Store results set_learning_objectives(grouped_result["all_grouped"]) # Save outputs to files params = { "files": [os.path.basename(f) for f in file_paths], "num_objectives": num_objectives, "num_runs": num_runs, "model": model_name, "incorrect_answer_model": incorrect_answer_model_name, "temperature": temperature } run_manager.save_objectives_outputs( best_in_group=formatted_results[1], all_grouped=formatted_results[2], raw_ungrouped=formatted_results[3], params=params ) # End run run_manager.end_run(run_type="Learning Objectives") return formatted_results def regenerate_objectives(objectives_json, feedback, num_objectives, num_runs, model_name, temperature): """Regenerate learning objectives based on feedback.""" if not get_processed_contents(): return "No processed content available. Please upload files first.", objectives_json, objectives_json if not os.getenv("OPENAI_API_KEY"): return "OpenAI API key not found.", objectives_json, objectives_json if not feedback: return "Please provide feedback to regenerate learning objectives.", objectives_json, objectives_json # Add feedback to file contents file_contents_with_feedback = get_processed_contents().copy() file_contents_with_feedback.append(f"FEEDBACK ON PREVIOUS OBJECTIVES: {feedback}") # Generate with feedback quiz_generator = QuizGenerator( api_key=os.getenv("OPENAI_API_KEY"), model=model_name, temperature=float(temperature) ) try: # Generate multiple runs of learning objectives with feedback all_learning_objectives = _generate_multiple_runs( quiz_generator, file_contents_with_feedback, num_objectives, num_runs, model_name # Use the same model for incorrect answer suggestions ) # Group and rank the objectives grouping_result = _group_base_objectives_add_incorrect_answers(quiz_generator, all_base_learning_objectives, file_contents_with_feedback, model_name) # Get the results grouped_objectives = grouping_result["all_grouped"] best_in_group_objectives = grouping_result["best_in_group"] # Convert to JSON grouped_objectives_json = json.dumps([obj.dict() for obj in grouped_objectives]) best_in_group_json = json.dumps([obj.dict() for obj in best_in_group_objectives]) return f"Generated {len(all_learning_objectives)} learning objectives, {len(best_in_group_objectives)} unique after grouping.", grouped_objectives_json, best_in_group_json except Exception as e: print(f"Error regenerating learning objectives: {e}") import traceback traceback.print_exc() return f"Error regenerating learning objectives: {str(e)}", objectives_json, objectives_json def _extract_file_paths(files): """Extract file paths from different input formats.""" file_paths = [] if isinstance(files, list): for file in files: if file and os.path.exists(file): file_paths.append(file) elif isinstance(files, str) and os.path.exists(files): file_paths.append(files) elif hasattr(files, 'name') and os.path.exists(files.name): file_paths.append(files.name) return file_paths def _generate_multiple_runs(quiz_generator, file_contents, num_objectives, num_runs, incorrect_answer_model_name, run_manager): """Generate learning objectives across multiple runs.""" all_learning_objectives = [] num_runs_int = int(num_runs) for run in range(num_runs_int): run_manager.log(f"Starting generation run {run+1}/{num_runs_int}", level="INFO") # Generate base learning objectives without grouping or incorrect answers learning_objectives = quiz_generator.generate_base_learning_objectives( file_contents, num_objectives, incorrect_answer_model_name ) run_manager.log(f"Generated {len(learning_objectives)} learning objectives in run {run+1}", level="INFO") # Assign temporary IDs for i, obj in enumerate(learning_objectives): obj.id = 1000 * (run + 1) + (i + 1) all_learning_objectives.extend(learning_objectives) run_manager.log(f"Total learning objectives from all runs: {len(all_learning_objectives)}", level="INFO") return all_learning_objectives def _group_base_objectives_add_incorrect_answers(quiz_generator, all_base_learning_objectives, file_contents, incorrect_answer_model_name=None, run_manager=None): """Group base learning objectives and add incorrect answers to best-in-group objectives.""" run_manager.log("Grouping base learning objectives...", level="INFO") grouping_result = quiz_generator.group_base_learning_objectives(all_base_learning_objectives, file_contents) grouped_objectives = grouping_result["all_grouped"] best_in_group_objectives = grouping_result["best_in_group"] run_manager.log(f"Grouped into {len(best_in_group_objectives)} best-in-group objectives", level="INFO") # Find and reassign the best first objective to ID=1 _reassign_objective_ids(grouped_objectives, run_manager) # Step 1: Generate incorrect answer suggestions only for best-in-group objectives run_manager.log("Generating incorrect answer options only for best-in-group objectives...", level="INFO") enhanced_best_in_group = quiz_generator.generate_lo_incorrect_answer_options( file_contents, best_in_group_objectives, incorrect_answer_model_name ) run_manager.log("Generated incorrect answer options", level="INFO") # Clear debug directory for incorrect answer regeneration logs debug_dir = os.path.join("incorrect_suggestion_debug") if os.path.exists(debug_dir): shutil.rmtree(debug_dir) os.makedirs(debug_dir, exist_ok=True) # Step 2: Run the improvement workflow on the generated incorrect answers run_manager.log("Improving incorrect answer options for best-in-group objectives...", level="INFO") improved_best_in_group = quiz_generator.learning_objective_generator.regenerate_incorrect_answers( enhanced_best_in_group, file_contents ) run_manager.log("Completed improvement of incorrect answer options", level="INFO") # Create a map of best-in-group objectives by ID for easy lookup best_in_group_map = {obj.id: obj for obj in improved_best_in_group} # Process all grouped objectives final_grouped_objectives = [] for grouped_obj in grouped_objectives: if getattr(grouped_obj, "best_in_group", False): # For best-in-group objectives, use the enhanced version with incorrect answers if grouped_obj.id in best_in_group_map: final_grouped_objectives.append(best_in_group_map[grouped_obj.id]) else: # This shouldn't happen, but just in case final_grouped_objectives.append(grouped_obj) else: # For non-best-in-group objectives, ensure they have empty incorrect answers final_grouped_objectives.append(LearningObjective( id=grouped_obj.id, learning_objective=grouped_obj.learning_objective, source_reference=grouped_obj.source_reference, correct_answer=grouped_obj.correct_answer, incorrect_answer_options=[], # Empty list for non-best-in-group in_group=getattr(grouped_obj, 'in_group', None), group_members=getattr(grouped_obj, 'group_members', None), best_in_group=getattr(grouped_obj, 'best_in_group', None) )) return { "all_grouped": final_grouped_objectives, "best_in_group": improved_best_in_group } def _reassign_objective_ids(grouped_objectives, run_manager): """Reassign IDs to ensure best first objective gets ID=1.""" # Find best first objective best_first_objective = None # First identify all groups containing objectives with IDs ending in 001 groups_with_001 = {} for obj in grouped_objectives: if obj.id % 1000 == 1: # ID ends in 001 group_members = getattr(obj, "group_members", [obj.id]) for member_id in group_members: if member_id not in groups_with_001: groups_with_001[member_id] = True # Now find the best_in_group objective from these groups for obj in grouped_objectives: obj_id = getattr(obj, "id", 0) group_members = getattr(obj, "group_members", [obj_id]) # Check if this objective is in a group with 001 objectives is_in_001_group = any(member_id in groups_with_001 for member_id in group_members) if is_in_001_group and getattr(obj, "best_in_group", False): best_first_objective = obj run_manager.log(f"Found best_in_group objective in a 001 group with ID={obj.id}", level="DEBUG") break # If no best_in_group from 001 groups found, fall back to the first 001 objective if not best_first_objective: for obj in grouped_objectives: if obj.id % 1000 == 1: # First objective from a run best_first_objective = obj run_manager.log(f"No best_in_group from 001 groups found, using first 001 with ID={obj.id}", level="DEBUG") break # Reassign IDs id_counter = 2 if best_first_objective: best_first_objective.id = 1 run_manager.log(f"Reassigned primary objective to ID=1", level="INFO") for obj in grouped_objectives: if obj is best_first_objective: continue obj.id = id_counter id_counter += 1 def _format_objective_results(grouped_result, all_learning_objectives, num_objectives, run_manager): """Format objective results for display.""" sorted_best_in_group = sorted(grouped_result["best_in_group"], key=lambda obj: obj.id) sorted_all_grouped = sorted(grouped_result["all_grouped"], key=lambda obj: obj.id) # Limit best-in-group to the requested number of objectives sorted_best_in_group = sorted_best_in_group[:num_objectives] run_manager.log("Formatting objective results for display", level="INFO") run_manager.log(f"Best-in-group objectives limited to top {len(sorted_best_in_group)} (requested: {num_objectives})", level="INFO") # Format best-in-group formatted_best_in_group = [] for obj in sorted_best_in_group: formatted_best_in_group.append({ "id": obj.id, "learning_objective": obj.learning_objective, "source_reference": obj.source_reference, "correct_answer": obj.correct_answer, "incorrect_answer_options": getattr(obj, 'incorrect_answer_options', None), "in_group": getattr(obj, 'in_group', None), "group_members": getattr(obj, 'group_members', None), "best_in_group": getattr(obj, 'best_in_group', None) }) # Format grouped formatted_grouped = [] for obj in sorted_all_grouped: formatted_grouped.append({ "id": obj.id, "learning_objective": obj.learning_objective, "source_reference": obj.source_reference, "correct_answer": obj.correct_answer, "incorrect_answer_options": getattr(obj, 'incorrect_answer_options', None), "in_group": getattr(obj, 'in_group', None), "group_members": getattr(obj, 'group_members', None), "best_in_group": getattr(obj, 'best_in_group', None) }) # Format unranked formatted_unranked = [] for obj in all_learning_objectives: formatted_unranked.append({ "id": obj.id, "learning_objective": obj.learning_objective, "source_reference": obj.source_reference, "correct_answer": obj.correct_answer }) run_manager.log(f"Formatted {len(formatted_best_in_group)} best-in-group, {len(formatted_grouped)} grouped, {len(formatted_unranked)} raw objectives", level="INFO") return ( f"Generated and grouped {len(formatted_best_in_group)} unique learning objectives successfully. Saved to run: {run_manager.get_current_run_id()}", json.dumps(formatted_best_in_group, indent=2), json.dumps(formatted_grouped, indent=2), json.dumps(formatted_unranked, indent=2) ) def parse_user_learning_objectives(text: str) -> List[str]: """ Parse user-entered learning objectives text into a list of clean objective strings. Handles common label formats: - Numbered: "1. Objective" "2) Objective" "3: Objective" - Lettered: "a. Objective" "b) Objective" "c: Objective" - Plain: "Objective" (no label) Trailing punctuation is preserved as it may be part of the sentence. """ objectives = [] for line in text.strip().split('\n'): line = line.strip() if not line: continue # Strip optional leading number/letter label followed by ., ), or : cleaned = re.sub(r'^(\d+|[a-zA-Z])[\.\)\:]\s+', '', line) if cleaned: objectives.append(cleaned) return objectives def process_user_objectives(files, user_objectives_text, model_name, incorrect_answer_model_name, temperature): """ Process user-provided learning objectives using uploaded course materials. Pipeline: 1. Parse objective texts from the user's input 2. Find source references in course materials for each objective 3. Generate a correct answer for each objective (same function as auto-generate flow) 4. Generate incorrect answer options (all objectives are treated as best-in-group) 5. Improve incorrect answer options iteratively 6. Return output in the same format as the auto-generate flow """ run_manager = get_run_manager() # --- Input validation --- if not files: return "Please upload at least one file.", None, None, None if not user_objectives_text or not user_objectives_text.strip(): return "Please enter at least one learning objective.", None, None, None if not os.getenv("OPENAI_API_KEY"): return "OpenAI API key not found. Please set the OPENAI_API_KEY environment variable.", None, None, None file_paths = _extract_file_paths(files) if not file_paths: return "No valid files found. Please upload valid .ipynb, .vtt, .srt, or .md files.", None, None, None objective_texts = parse_user_learning_objectives(user_objectives_text) if not objective_texts: return "No valid learning objectives found. Please enter at least one objective.", None, None, None # --- Start run --- run_manager.start_objective_run( files=file_paths, num_objectives=len(objective_texts), num_runs=1, model=model_name, incorrect_answer_model=incorrect_answer_model_name, temperature=temperature ) run_manager.log(f"Processing {len(objective_texts)} user-provided learning objectives", level="INFO") # --- Process course material files --- processor = ContentProcessor() file_contents = processor.process_files(file_paths) if not file_contents: run_manager.log("No content extracted from the uploaded files", level="ERROR") return "No content extracted from the uploaded files.", None, None, None run_manager.log(f"Successfully extracted content from {len(file_contents)} files", level="INFO") set_processed_contents(file_contents) quiz_generator = QuizGenerator( api_key=os.getenv("OPENAI_API_KEY"), model=model_name, temperature=float(temperature) ) # --- Step 1: Find source references in course materials --- run_manager.log("Finding source references for user-provided objectives...", level="INFO") from learning_objective_generator.base_generation import ( find_sources_for_user_objectives, generate_correct_answers_for_objectives ) objectives_without_answers = find_sources_for_user_objectives( quiz_generator.client, model_name, float(temperature), file_contents, objective_texts ) run_manager.log(f"Found sources for {len(objectives_without_answers)} objectives", level="INFO") # --- Step 2: Generate correct answers --- run_manager.log("Generating correct answers for user-provided objectives...", level="INFO") base_objectives = generate_correct_answers_for_objectives( quiz_generator.client, model_name, float(temperature), file_contents, objectives_without_answers ) run_manager.log(f"Generated correct answers for {len(base_objectives)} objectives", level="INFO") # --- Step 3: Generate incorrect answer options --- run_manager.log("Generating incorrect answer options...", level="INFO") debug_dir = os.path.join("incorrect_suggestion_debug") if os.path.exists(debug_dir): shutil.rmtree(debug_dir) os.makedirs(debug_dir, exist_ok=True) enhanced_objectives = quiz_generator.generate_lo_incorrect_answer_options( file_contents, base_objectives, incorrect_answer_model_name ) run_manager.log("Generated incorrect answer options", level="INFO") # --- Step 4: Improve incorrect answers iteratively --- run_manager.log("Improving incorrect answer options...", level="INFO") improved_objectives = quiz_generator.learning_objective_generator.regenerate_incorrect_answers( enhanced_objectives, file_contents ) run_manager.log("Completed improvement of incorrect answer options", level="INFO") # All user-provided objectives are their own group and all are best-in-group for obj in improved_objectives: obj.in_group = False obj.group_members = [obj.id] obj.best_in_group = True set_learning_objectives(improved_objectives) # --- Format and return results --- formatted_results = _format_user_objective_results(improved_objectives, run_manager) params = { "files": [os.path.basename(f) for f in file_paths], "num_objectives": len(objective_texts), "num_runs": 1, "model": model_name, "incorrect_answer_model": incorrect_answer_model_name, "temperature": temperature, "source": "user-provided" } run_manager.save_objectives_outputs( best_in_group=formatted_results[1], all_grouped=formatted_results[2], raw_ungrouped=formatted_results[3], params=params ) run_manager.end_run(run_type="Learning Objectives (User-provided)") return formatted_results def _format_user_objective_results(objectives, run_manager): """Format user-provided objective results for display (same structure as auto-generated).""" sorted_objectives = sorted(objectives, key=lambda obj: obj.id) run_manager.log(f"Formatting {len(sorted_objectives)} user-provided objectives for display", level="INFO") formatted_best_in_group = [] for obj in sorted_objectives: formatted_best_in_group.append({ "id": obj.id, "learning_objective": obj.learning_objective, "source_reference": obj.source_reference, "correct_answer": obj.correct_answer, "incorrect_answer_options": getattr(obj, 'incorrect_answer_options', None), "in_group": getattr(obj, 'in_group', None), "group_members": getattr(obj, 'group_members', None), "best_in_group": getattr(obj, 'best_in_group', None) }) # Grouped view is identical to best-in-group (no grouping was performed) formatted_grouped = formatted_best_in_group # Raw view: base fields only (no incorrect answers), for the debug panel formatted_unranked = [ { "id": obj.id, "learning_objective": obj.learning_objective, "source_reference": obj.source_reference, "correct_answer": obj.correct_answer } for obj in sorted_objectives ] return ( f"Processed {len(formatted_best_in_group)} user-provided learning objectives successfully. Saved to run: {run_manager.get_current_run_id()}", json.dumps(formatted_best_in_group, indent=2), json.dumps(formatted_grouped, indent=2), json.dumps(formatted_unranked, indent=2) ) def process_user_objectives_and_generate_questions(files, user_objectives_text, model_name, incorrect_answer_model_name, temperature, model_name_q, temperature_q, num_questions, num_runs_q): """Process user-provided objectives and then generate questions in one flow.""" obj_results = process_user_objectives(files, user_objectives_text, model_name, incorrect_answer_model_name, temperature) status_obj, objectives_output, grouped_output, raw_ungrouped_output = obj_results if not objectives_output or objectives_output is None: return ( status_obj, objectives_output, grouped_output, raw_ungrouped_output, "Learning objectives processing failed. Cannot proceed with questions.", None, None, None ) question_results = generate_questions(objectives_output, model_name_q, temperature_q, num_questions, num_runs_q) status_q, best_questions_output, all_questions_output, formatted_quiz_output = question_results return ( f"{status_obj}\n\nThen:\n{status_q}", objectives_output, grouped_output, raw_ungrouped_output, status_q, best_questions_output, all_questions_output, formatted_quiz_output ) def process_files_and_generate_questions(files, num_objectives, num_runs, model_name, incorrect_answer_model_name, temperature, model_name_q, temperature_q, num_questions, num_runs_q): """Process files, generate learning objectives, and then generate questions in one flow.""" # First, generate learning objectives obj_results = process_files(files, num_objectives, num_runs, model_name, incorrect_answer_model_name, temperature) # obj_results contains: (status, objectives_output, grouped_output, raw_ungrouped_output) status_obj, objectives_output, grouped_output, raw_ungrouped_output = obj_results # Check if objectives generation failed if not objectives_output or objectives_output is None: # Return error status for objectives and empty values for questions return ( status_obj, # status_output objectives_output, # objectives_output grouped_output, # grouped_output raw_ungrouped_output, # raw_ungrouped_output "Learning objectives generation failed. Cannot proceed with questions.", # status_q_output None, # best_questions_output None, # all_questions_output None # formatted_quiz_output ) # Now generate questions using the objectives question_results = generate_questions(objectives_output, model_name_q, temperature_q, num_questions, num_runs_q) # question_results contains: (status_q, best_questions_output, all_questions_output, formatted_quiz_output) status_q, best_questions_output, all_questions_output, formatted_quiz_output = question_results # Combine the status messages combined_status = f"{status_obj}\n\nThen:\n{status_q}" # Return all 8 outputs return ( combined_status, # status_output objectives_output, # objectives_output grouped_output, # grouped_output raw_ungrouped_output, # raw_ungrouped_output status_q, # status_q_output best_questions_output, # best_questions_output all_questions_output, # all_questions_output formatted_quiz_output # formatted_quiz_output )