import re import json import tempfile import yaml import gradio as gr from openai import OpenAI from pydantic import BaseModel from typing import List from .run_manager import get_run_manager # --------------------------------------------------------------------------- # Pydantic model for LLM validation response # --------------------------------------------------------------------------- class _QuestionValidation(BaseModel): is_valid: bool issues: List[str] # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _next_button_label(index, total): """Return 'Accept & Finish' for the last question, 'Accept & Next' otherwise.""" if total > 0 and index >= total - 1: return gr.update(value="Accept & Finish") return gr.update(value="Accept & Next") def _sanitize_text(text, keep_bullets: bool = False) -> str: """Normalize Unicode typography then strip any remaining non-ASCII characters. Only standard printable ASCII (32-126), newlines, and tabs are kept. Set keep_bullets=True to additionally preserve the bullet chars (• ◦) used as structural markers in the .md editing format. """ if not text: return text # Normalize common Unicode typography to ASCII equivalents _REPLACEMENTS = { '\u2018': "'", '\u2019': "'", # ' ' (smart single quotes) '\u201c': '"', '\u201d': '"', # " " (smart double quotes) '\u2013': '-', '\u2014': '-', # – — (en/em dashes) '\u2026': '...', # … (ellipsis) '\u00a0': ' ', # non-breaking space '\u00b2': '2', '\u00b3': '3', # superscript digits } for uc, rep in _REPLACEMENTS.items(): text = text.replace(uc, rep) # Chars always allowed: printable ASCII + newline + tab def _allowed(c: str) -> bool: if 32 <= ord(c) <= 126 or c in '\n\t': return True if keep_bullets and c in '\u2022\u25e6': # • ◦ return True return False return ''.join(c for c in text if _allowed(c)) # --------------------------------------------------------------------------- # Markdown parsing # --------------------------------------------------------------------------- def _parse_questions(md_content: str) -> List[str]: """Split formatted_quiz.md content into individual question blocks.""" parts = re.split(r'(?=\*\*Question \d+)', md_content.strip()) return [p.strip() for p in parts if p.strip()] def _parse_question_block(block_text: str) -> dict: """Parse a single markdown question block into structured data. Supports multi-line prompts: non-empty lines between the question header and the first option are accumulated as additional prompt text. """ prompt_lines: List[str] = [] options: List[dict] = [] current_option = None in_prompt = False for line in block_text.split('\n'): stripped = line.strip() # Question header (colon may be inside or outside bold markers) q_match = re.match(r'\*\*Question \d+.*?\*\*:?\s*(.*)', stripped) if q_match: first_line = q_match.group(1).strip() if first_line: prompt_lines.append(first_line) in_prompt = True continue # Skip ranking reasoning line and stop prompt accumulation if stripped.startswith('Ranking Reasoning:'): in_prompt = False continue # Option line: • A [Correct]: text or • A: text opt_match = re.match(r'•\s*([A-D])\s*(\[Correct\])?\s*:\s*(.+)', stripped) if opt_match: in_prompt = False if current_option: options.append(current_option) current_option = { 'answer': opt_match.group(3).strip(), 'isCorrect': opt_match.group(2) is not None, 'feedback': '' } continue # Feedback line fb_match = re.match(r'◦\s*Feedback:\s*(.+)', stripped) if fb_match and current_option: current_option['feedback'] = fb_match.group(1).strip() continue # Accumulate additional prompt lines if in_prompt and stripped: prompt_lines.append(stripped) if current_option: options.append(current_option) return {'prompt': '\n'.join(prompt_lines), 'options': options} # --------------------------------------------------------------------------- # YAML generation # --------------------------------------------------------------------------- def _generate_yml(questions_data: List[dict]) -> str: """Generate YAML quiz from parsed question data using the standard format. All text fields (prompt, answer, feedback) use the '|-' block scalar and are sanitized to contain only standard printable ASCII characters. """ lines = [ "name: Quiz 1", "passingThreshold: 5", "estimatedTimeSec: 600", "maxTrialsPer24Hrs: 3", "courseSlug: course_Slug", "insertAfterConclusion: true", "RandomQuestionPosition: true", "questions:", ] for q in questions_data: lines.append(" - typeName: multipleChoice") lines.append(" points: 1") lines.append(" shuffle: true") lines.append(" prompt: |-") for prompt_line in _sanitize_text(q['prompt']).split('\n'): lines.append(f" {prompt_line}") lines.append(" options:") for opt in q['options']: answer_clean = _sanitize_text(opt['answer']) feedback_clean = _sanitize_text(opt['feedback']) is_correct = 'true' if opt['isCorrect'] else 'false' lines.append(" - answer: |-") for answer_line in answer_clean.split('\n'): lines.append(f" {answer_line}") lines.append(f" isCorrect: {is_correct}") lines.append(" feedback: |-") for fb_line in feedback_clean.split('\n'): lines.append(f" {fb_line}") return '\n'.join(lines) + '\n' # --------------------------------------------------------------------------- # YAML loading (converts any valid YAML quiz to md blocks) # --------------------------------------------------------------------------- def _parse_yml_to_md_blocks(yml_content: str): """Parse a YAML quiz file into Markdown question blocks. Handles both '|-' block scalars and quoted-string answer formats since PyYAML normalizes both to plain Python strings. Returns (blocks, error_message). On success error_message is None. """ try: data = yaml.safe_load(yml_content) except yaml.YAMLError as e: return None, f"Failed to parse YAML: {e}" if not isinstance(data, dict): return None, "Invalid YAML structure: expected a mapping at the top level." questions = data.get('questions', []) if not questions: return None, "No questions found in the YAML file." option_letters = ['A', 'B', 'C', 'D'] blocks = [] for i, q in enumerate(questions, start=1): prompt = str(q.get('prompt', '')).strip() options = q.get('options', []) prompt_lines = prompt.split('\n') first_line = prompt_lines[0] if prompt_lines else '' extra_lines = [l.strip() for l in prompt_lines[1:] if l.strip()] block_lines = [f"**Question {i}:** {first_line}"] for extra in extra_lines: block_lines.append(extra) block_lines.append("") for j, opt in enumerate(options): if j >= len(option_letters): break letter = option_letters[j] answer = str(opt.get('answer', '')).strip() is_correct = opt.get('isCorrect', False) feedback = str(opt.get('feedback', '')).strip() correct_marker = " [Correct]" if is_correct else "" block_lines.append(f"\t• {letter}{correct_marker}: {answer}") if feedback: block_lines.append(f"\t ◦ Feedback: {feedback}") block_lines.append("") blocks.append('\n'.join(block_lines).strip()) return blocks, None # --------------------------------------------------------------------------- # LLM validation # --------------------------------------------------------------------------- def _validate_question_block(block_text: str) -> List[str]: """Validate a question block structurally, then with LLM semantic check. Returns a list of issue strings. An empty list means the question is valid. Structural issues block advancement; LLM issues produce warnings but still surface as returned issues so the caller can decide how to handle them. """ parsed = _parse_question_block(block_text) issues: List[str] = [] # --- Structural validation (fast, no API call) --- if not parsed['prompt'].strip(): issues.append("Missing question prompt.") n_opts = len(parsed['options']) if n_opts != 4: issues.append(f"Expected 4 answer options, found {n_opts}.") else: correct_count = sum(1 for o in parsed['options'] if o['isCorrect']) if correct_count == 0: issues.append("No option is marked as correct. Add [Correct] to one option.") elif correct_count > 1: issues.append(f"{correct_count} options are marked correct; exactly 1 is required.") for i, opt in enumerate(parsed['options']): letter = chr(65 + i) if not opt['answer'].strip(): issues.append(f"Option {letter} has no answer text.") if not opt['feedback'].strip(): issues.append(f"Option {letter} is missing feedback.") # Don't call the LLM if the question is structurally broken if issues: return issues # --- LLM semantic validation --- try: client = OpenAI() options_text = "\n".join( f"{'[CORRECT] ' if o['isCorrect'] else ''}Answer: {o['answer']}\n" f"Feedback: {o['feedback']}" for o in parsed['options'] ) prompt = ( "You are an educational quality reviewer. Evaluate this multiple-choice question.\n\n" f"Question: {parsed['prompt']}\n\n" f"{options_text}\n\n" "Check for: (1) clarity and unambiguity of the question, " "(2) factual correctness of the marked answer, " "(3) plausibility but clear incorrectness of the distractors, " "(4) accuracy and helpfulness of the feedback for each option.\n" 'Return JSON with schema: {"is_valid": bool, "issues": ["issue1", ...]}' ) result = client.beta.chat.completions.parse( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], response_format=_QuestionValidation, ) validation = result.choices[0].message.parsed if not validation.is_valid and validation.issues: issues.extend(validation.issues) except Exception: # Never block saving if the LLM is unavailable pass return issues # --------------------------------------------------------------------------- # Public handlers (called by ui/app.py) # --------------------------------------------------------------------------- def load_quiz_for_editing(formatted_quiz_text: str = ""): """Load the generated quiz for editing. Tries disk first, falls back to UI text.""" run_manager = get_run_manager() content = None quiz_path = run_manager.get_latest_formatted_quiz_path() if quiz_path is not None: with open(quiz_path, "r", encoding="utf-8") as f: content = f.read() if not content and formatted_quiz_text: content = formatted_quiz_text if not content: return ( "No formatted quiz found. Generate questions in the 'Generate Questions' tab first.", "", [], 0, [], gr.update(), ) questions = _parse_questions(content) if not questions: return "The quiz file is empty.", "", [], 0, [], gr.update() edited = list(questions) return ( f"Question 1 of {len(questions)}", questions[0], questions, 0, edited, _next_button_label(0, len(questions)), ) def load_file_for_editing(file_path): """Load a user-uploaded .md or .yml quiz file and initialise the editing flow.""" if file_path is None: return "No file uploaded.", "", [], 0, [], gr.update() try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() except Exception as e: return f"Error reading file: {e}", "", [], 0, [], gr.update() file_lower = str(file_path).lower() if file_lower.endswith('.yml') or file_lower.endswith('.yaml'): questions, error = _parse_yml_to_md_blocks(content) if error: return error, "", [], 0, [], gr.update() elif file_lower.endswith('.md'): questions = _parse_questions(content) if not questions: return "No questions found in the Markdown file.", "", [], 0, [], gr.update() else: return "Unsupported file format. Please upload a .md or .yml file.", "", [], 0, [], gr.update() if not questions: return "No questions found in the file.", "", [], 0, [], gr.update() n = len(questions) edited = list(questions) return ( f"Loaded {n} question(s) from file. Showing Question 1 of {n}.", questions[0], questions, 0, edited, _next_button_label(0, n), ) def accept_and_next(current_text: str, questions: list, index: int, edited: list): """Validate current question, then save and advance to the next one. Structural errors block advancement. LLM semantic issues are surfaced as warnings but still allow the user to proceed. """ if not questions: return "No quiz loaded.", "", questions, index, edited, gr.update() # --- Validate before saving --- issues = _validate_question_block(current_text) # Separate structural issues (must be fixed) from LLM warnings structural_keywords = [ "Missing question", "Expected 4", "No option is marked", "options are marked correct", "has no answer text", "is missing feedback" ] structural_issues = [i for i in issues if any(k in i for k in structural_keywords)] llm_warnings = [i for i in issues if i not in structural_issues] if structural_issues: error_msg = "Cannot advance — please fix: " + "; ".join(structural_issues) return ( error_msg, current_text, questions, index, edited, _next_button_label(index, len(questions)), ) # Save the (valid) edit edited[index] = current_text if index + 1 < len(questions): new_index = index + 1 base_status = f"Question {new_index + 1} of {len(questions)}" if llm_warnings: base_status += f" | WARNING (previous Q): {'; '.join(llm_warnings)}" return ( base_status, edited[new_index], questions, new_index, edited, _next_button_label(new_index, len(questions)), ) else: base_status = f"All {len(questions)} questions reviewed. Click 'Download edited quiz' to save." if llm_warnings: base_status += f" | WARNING: {'; '.join(llm_warnings)}" return ( base_status, current_text, questions, index, edited, gr.update(value="Accept & Finish"), ) def go_previous(current_text: str, questions: list, index: int, edited: list): """Save current edit and go back to the previous question.""" if not questions: return "No quiz loaded.", "", questions, index, edited, gr.update() edited[index] = current_text if index > 0: new_index = index - 1 return ( f"Question {new_index + 1} of {len(questions)}", edited[new_index], questions, new_index, edited, _next_button_label(new_index, len(questions)), ) return ( f"Question 1 of {len(questions)} (already at first question)", current_text, questions, index, edited, _next_button_label(index, len(questions)), ) def save_and_download(current_text: str, questions: list, index: int, edited: list): """Validate all questions structurally, then join, sanitize, and export.""" if not edited: return "No edited questions to save.", None # Save the current edit in case user did not click Accept edited[index] = current_text # --- Structural validation of every question before export --- all_errors: List[str] = [] for i, block in enumerate(edited, start=1): parsed = _parse_question_block(block) q_errors: List[str] = [] if not parsed['prompt'].strip(): q_errors.append("missing prompt") if len(parsed['options']) != 4: q_errors.append(f"expected 4 options, found {len(parsed['options'])}") else: correct_count = sum(1 for o in parsed['options'] if o['isCorrect']) if correct_count != 1: q_errors.append(f"expected 1 correct option, found {correct_count}") for j, opt in enumerate(parsed['options']): if not opt['feedback'].strip(): q_errors.append(f"option {chr(65+j)} missing feedback") if q_errors: all_errors.append(f"Question {i}: {'; '.join(q_errors)}") if all_errors: return "Export blocked — fix these issues first:\n" + "\n".join(all_errors), None # --- Build outputs --- # .md: sanitize text content but keep bullet markers (• ◦) for readability combined_md = _sanitize_text("\n\n".join(edited) + "\n", keep_bullets=True) # .yml: fully sanitized via _generate_yml questions_data = [_parse_question_block(q) for q in edited] yml_content = _generate_yml(questions_data) # Save to output folder run_manager = get_run_manager() saved_path = run_manager.save_edited_quiz(combined_md, "formatted_quiz_edited.md") run_manager.save_edited_quiz(yml_content, "formatted_quiz_edited.yml") # Temp files for Gradio download tmp_md = tempfile.NamedTemporaryFile(delete=False, suffix=".md", mode="w", encoding="utf-8") tmp_md.write(combined_md) tmp_md.close() tmp_yml = tempfile.NamedTemporaryFile(delete=False, suffix=".yml", mode="w", encoding="utf-8") tmp_yml.write(yml_content) tmp_yml.close() status = f"Saved to {saved_path}" if saved_path else "Download ready." return status, [tmp_md.name, tmp_yml.name]