Spaces:
Sleeping
Sleeping
| import re | |
| import json | |
| import tempfile | |
| import yaml | |
| import gradio as gr | |
| from openai import OpenAI | |
| from pydantic import BaseModel | |
| from typing import List | |
| from .run_manager import get_run_manager | |
| # --------------------------------------------------------------------------- | |
| # Pydantic model for LLM validation response | |
| # --------------------------------------------------------------------------- | |
| class _QuestionValidation(BaseModel): | |
| is_valid: bool | |
| issues: List[str] | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def _next_button_label(index, total): | |
| """Return 'Accept & Finish' for the last question, 'Accept & Next' otherwise.""" | |
| if total > 0 and index >= total - 1: | |
| return gr.update(value="Accept & Finish") | |
| return gr.update(value="Accept & Next") | |
| def _sanitize_text(text, keep_bullets: bool = False) -> str: | |
| """Normalize Unicode typography then strip any remaining non-ASCII characters. | |
| Only standard printable ASCII (32-126), newlines, and tabs are kept. | |
| Set keep_bullets=True to additionally preserve the bullet chars (• ◦) used | |
| as structural markers in the .md editing format. | |
| """ | |
| if not text: | |
| return text | |
| # Normalize common Unicode typography to ASCII equivalents | |
| _REPLACEMENTS = { | |
| '\u2018': "'", '\u2019': "'", # ' ' (smart single quotes) | |
| '\u201c': '"', '\u201d': '"', # " " (smart double quotes) | |
| '\u2013': '-', '\u2014': '-', # – — (en/em dashes) | |
| '\u2026': '...', # … (ellipsis) | |
| '\u00a0': ' ', # non-breaking space | |
| '\u00b2': '2', '\u00b3': '3', # superscript digits | |
| } | |
| for uc, rep in _REPLACEMENTS.items(): | |
| text = text.replace(uc, rep) | |
| # Chars always allowed: printable ASCII + newline + tab | |
| def _allowed(c: str) -> bool: | |
| if 32 <= ord(c) <= 126 or c in '\n\t': | |
| return True | |
| if keep_bullets and c in '\u2022\u25e6': # • ◦ | |
| return True | |
| return False | |
| return ''.join(c for c in text if _allowed(c)) | |
| # --------------------------------------------------------------------------- | |
| # Markdown parsing | |
| # --------------------------------------------------------------------------- | |
| def _parse_questions(md_content: str) -> List[str]: | |
| """Split formatted_quiz.md content into individual question blocks.""" | |
| parts = re.split(r'(?=\*\*Question \d+)', md_content.strip()) | |
| return [p.strip() for p in parts if p.strip()] | |
| def _parse_question_block(block_text: str) -> dict: | |
| """Parse a single markdown question block into structured data. | |
| Supports multi-line prompts: non-empty lines between the question header | |
| and the first option are accumulated as additional prompt text. | |
| """ | |
| prompt_lines: List[str] = [] | |
| options: List[dict] = [] | |
| current_option = None | |
| in_prompt = False | |
| for line in block_text.split('\n'): | |
| stripped = line.strip() | |
| # Question header (colon may be inside or outside bold markers) | |
| q_match = re.match(r'\*\*Question \d+.*?\*\*:?\s*(.*)', stripped) | |
| if q_match: | |
| first_line = q_match.group(1).strip() | |
| if first_line: | |
| prompt_lines.append(first_line) | |
| in_prompt = True | |
| continue | |
| # Skip ranking reasoning line and stop prompt accumulation | |
| if stripped.startswith('Ranking Reasoning:'): | |
| in_prompt = False | |
| continue | |
| # Option line: • A [Correct]: text or • A: text | |
| opt_match = re.match(r'•\s*([A-D])\s*(\[Correct\])?\s*:\s*(.+)', stripped) | |
| if opt_match: | |
| in_prompt = False | |
| if current_option: | |
| options.append(current_option) | |
| current_option = { | |
| 'answer': opt_match.group(3).strip(), | |
| 'isCorrect': opt_match.group(2) is not None, | |
| 'feedback': '' | |
| } | |
| continue | |
| # Feedback line | |
| fb_match = re.match(r'◦\s*Feedback:\s*(.+)', stripped) | |
| if fb_match and current_option: | |
| current_option['feedback'] = fb_match.group(1).strip() | |
| continue | |
| # Accumulate additional prompt lines | |
| if in_prompt and stripped: | |
| prompt_lines.append(stripped) | |
| if current_option: | |
| options.append(current_option) | |
| return {'prompt': '\n'.join(prompt_lines), 'options': options} | |
| # --------------------------------------------------------------------------- | |
| # YAML generation | |
| # --------------------------------------------------------------------------- | |
| def _generate_yml(questions_data: List[dict]) -> str: | |
| """Generate YAML quiz from parsed question data using the standard format. | |
| All text fields (prompt, answer, feedback) use the '|-' block scalar | |
| and are sanitized to contain only standard printable ASCII characters. | |
| """ | |
| lines = [ | |
| "name: Quiz 1", | |
| "passingThreshold: 5", | |
| "estimatedTimeSec: 600", | |
| "maxTrialsPer24Hrs: 3", | |
| "courseSlug: course_Slug", | |
| "insertAfterConclusion: true", | |
| "RandomQuestionPosition: true", | |
| "questions:", | |
| ] | |
| for q in questions_data: | |
| lines.append(" - typeName: multipleChoice") | |
| lines.append(" points: 1") | |
| lines.append(" shuffle: true") | |
| lines.append(" prompt: |-") | |
| for prompt_line in _sanitize_text(q['prompt']).split('\n'): | |
| lines.append(f" {prompt_line}") | |
| lines.append(" options:") | |
| for opt in q['options']: | |
| answer_clean = _sanitize_text(opt['answer']) | |
| feedback_clean = _sanitize_text(opt['feedback']) | |
| is_correct = 'true' if opt['isCorrect'] else 'false' | |
| lines.append(" - answer: |-") | |
| for answer_line in answer_clean.split('\n'): | |
| lines.append(f" {answer_line}") | |
| lines.append(f" isCorrect: {is_correct}") | |
| lines.append(" feedback: |-") | |
| for fb_line in feedback_clean.split('\n'): | |
| lines.append(f" {fb_line}") | |
| return '\n'.join(lines) + '\n' | |
| # --------------------------------------------------------------------------- | |
| # YAML loading (converts any valid YAML quiz to md blocks) | |
| # --------------------------------------------------------------------------- | |
| def _parse_yml_to_md_blocks(yml_content: str): | |
| """Parse a YAML quiz file into Markdown question blocks. | |
| Handles both '|-' block scalars and quoted-string answer formats since | |
| PyYAML normalizes both to plain Python strings. | |
| Returns (blocks, error_message). On success error_message is None. | |
| """ | |
| try: | |
| data = yaml.safe_load(yml_content) | |
| except yaml.YAMLError as e: | |
| return None, f"Failed to parse YAML: {e}" | |
| if not isinstance(data, dict): | |
| return None, "Invalid YAML structure: expected a mapping at the top level." | |
| questions = data.get('questions', []) | |
| if not questions: | |
| return None, "No questions found in the YAML file." | |
| option_letters = ['A', 'B', 'C', 'D'] | |
| blocks = [] | |
| for i, q in enumerate(questions, start=1): | |
| prompt = str(q.get('prompt', '')).strip() | |
| options = q.get('options', []) | |
| prompt_lines = prompt.split('\n') | |
| first_line = prompt_lines[0] if prompt_lines else '' | |
| extra_lines = [l.strip() for l in prompt_lines[1:] if l.strip()] | |
| block_lines = [f"**Question {i}:** {first_line}"] | |
| for extra in extra_lines: | |
| block_lines.append(extra) | |
| block_lines.append("") | |
| for j, opt in enumerate(options): | |
| if j >= len(option_letters): | |
| break | |
| letter = option_letters[j] | |
| answer = str(opt.get('answer', '')).strip() | |
| is_correct = opt.get('isCorrect', False) | |
| feedback = str(opt.get('feedback', '')).strip() | |
| correct_marker = " [Correct]" if is_correct else "" | |
| block_lines.append(f"\t• {letter}{correct_marker}: {answer}") | |
| if feedback: | |
| block_lines.append(f"\t ◦ Feedback: {feedback}") | |
| block_lines.append("") | |
| blocks.append('\n'.join(block_lines).strip()) | |
| return blocks, None | |
| # --------------------------------------------------------------------------- | |
| # LLM validation | |
| # --------------------------------------------------------------------------- | |
| def _validate_question_block(block_text: str) -> List[str]: | |
| """Validate a question block structurally, then with LLM semantic check. | |
| Returns a list of issue strings. An empty list means the question is valid. | |
| Structural issues block advancement; LLM issues produce warnings but still | |
| surface as returned issues so the caller can decide how to handle them. | |
| """ | |
| parsed = _parse_question_block(block_text) | |
| issues: List[str] = [] | |
| # --- Structural validation (fast, no API call) --- | |
| if not parsed['prompt'].strip(): | |
| issues.append("Missing question prompt.") | |
| n_opts = len(parsed['options']) | |
| if n_opts != 4: | |
| issues.append(f"Expected 4 answer options, found {n_opts}.") | |
| else: | |
| correct_count = sum(1 for o in parsed['options'] if o['isCorrect']) | |
| if correct_count == 0: | |
| issues.append("No option is marked as correct. Add [Correct] to one option.") | |
| elif correct_count > 1: | |
| issues.append(f"{correct_count} options are marked correct; exactly 1 is required.") | |
| for i, opt in enumerate(parsed['options']): | |
| letter = chr(65 + i) | |
| if not opt['answer'].strip(): | |
| issues.append(f"Option {letter} has no answer text.") | |
| if not opt['feedback'].strip(): | |
| issues.append(f"Option {letter} is missing feedback.") | |
| # Don't call the LLM if the question is structurally broken | |
| if issues: | |
| return issues | |
| # --- LLM semantic validation --- | |
| try: | |
| client = OpenAI() | |
| options_text = "\n".join( | |
| f"{'[CORRECT] ' if o['isCorrect'] else ''}Answer: {o['answer']}\n" | |
| f"Feedback: {o['feedback']}" | |
| for o in parsed['options'] | |
| ) | |
| prompt = ( | |
| "You are an educational quality reviewer. Evaluate this multiple-choice question.\n\n" | |
| f"Question: {parsed['prompt']}\n\n" | |
| f"{options_text}\n\n" | |
| "Check for: (1) clarity and unambiguity of the question, " | |
| "(2) factual correctness of the marked answer, " | |
| "(3) plausibility but clear incorrectness of the distractors, " | |
| "(4) accuracy and helpfulness of the feedback for each option.\n" | |
| 'Return JSON with schema: {"is_valid": bool, "issues": ["issue1", ...]}' | |
| ) | |
| result = client.beta.chat.completions.parse( | |
| model="gpt-4o-mini", | |
| messages=[{"role": "user", "content": prompt}], | |
| response_format=_QuestionValidation, | |
| ) | |
| validation = result.choices[0].message.parsed | |
| if not validation.is_valid and validation.issues: | |
| issues.extend(validation.issues) | |
| except Exception: | |
| # Never block saving if the LLM is unavailable | |
| pass | |
| return issues | |
| # --------------------------------------------------------------------------- | |
| # Public handlers (called by ui/app.py) | |
| # --------------------------------------------------------------------------- | |
| def load_quiz_for_editing(formatted_quiz_text: str = ""): | |
| """Load the generated quiz for editing. Tries disk first, falls back to UI text.""" | |
| run_manager = get_run_manager() | |
| content = None | |
| quiz_path = run_manager.get_latest_formatted_quiz_path() | |
| if quiz_path is not None: | |
| with open(quiz_path, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| if not content and formatted_quiz_text: | |
| content = formatted_quiz_text | |
| if not content: | |
| return ( | |
| "No formatted quiz found. Generate questions in the 'Generate Questions' tab first.", | |
| "", [], 0, [], gr.update(), | |
| ) | |
| questions = _parse_questions(content) | |
| if not questions: | |
| return "The quiz file is empty.", "", [], 0, [], gr.update() | |
| edited = list(questions) | |
| return ( | |
| f"Question 1 of {len(questions)}", | |
| questions[0], questions, 0, edited, | |
| _next_button_label(0, len(questions)), | |
| ) | |
| def load_file_for_editing(file_path): | |
| """Load a user-uploaded .md or .yml quiz file and initialise the editing flow.""" | |
| if file_path is None: | |
| return "No file uploaded.", "", [], 0, [], gr.update() | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| except Exception as e: | |
| return f"Error reading file: {e}", "", [], 0, [], gr.update() | |
| file_lower = str(file_path).lower() | |
| if file_lower.endswith('.yml') or file_lower.endswith('.yaml'): | |
| questions, error = _parse_yml_to_md_blocks(content) | |
| if error: | |
| return error, "", [], 0, [], gr.update() | |
| elif file_lower.endswith('.md'): | |
| questions = _parse_questions(content) | |
| if not questions: | |
| return "No questions found in the Markdown file.", "", [], 0, [], gr.update() | |
| else: | |
| return "Unsupported file format. Please upload a .md or .yml file.", "", [], 0, [], gr.update() | |
| if not questions: | |
| return "No questions found in the file.", "", [], 0, [], gr.update() | |
| n = len(questions) | |
| edited = list(questions) | |
| return ( | |
| f"Loaded {n} question(s) from file. Showing Question 1 of {n}.", | |
| questions[0], questions, 0, edited, | |
| _next_button_label(0, n), | |
| ) | |
| def accept_and_next(current_text: str, questions: list, index: int, edited: list): | |
| """Validate current question, then save and advance to the next one. | |
| Structural errors block advancement. LLM semantic issues are surfaced as | |
| warnings but still allow the user to proceed. | |
| """ | |
| if not questions: | |
| return "No quiz loaded.", "", questions, index, edited, gr.update() | |
| # --- Validate before saving --- | |
| issues = _validate_question_block(current_text) | |
| # Separate structural issues (must be fixed) from LLM warnings | |
| structural_keywords = [ | |
| "Missing question", "Expected 4", "No option is marked", | |
| "options are marked correct", "has no answer text", "is missing feedback" | |
| ] | |
| structural_issues = [i for i in issues if any(k in i for k in structural_keywords)] | |
| llm_warnings = [i for i in issues if i not in structural_issues] | |
| if structural_issues: | |
| error_msg = "Cannot advance — please fix: " + "; ".join(structural_issues) | |
| return ( | |
| error_msg, current_text, questions, index, edited, | |
| _next_button_label(index, len(questions)), | |
| ) | |
| # Save the (valid) edit | |
| edited[index] = current_text | |
| if index + 1 < len(questions): | |
| new_index = index + 1 | |
| base_status = f"Question {new_index + 1} of {len(questions)}" | |
| if llm_warnings: | |
| base_status += f" | WARNING (previous Q): {'; '.join(llm_warnings)}" | |
| return ( | |
| base_status, edited[new_index], questions, new_index, edited, | |
| _next_button_label(new_index, len(questions)), | |
| ) | |
| else: | |
| base_status = f"All {len(questions)} questions reviewed. Click 'Download edited quiz' to save." | |
| if llm_warnings: | |
| base_status += f" | WARNING: {'; '.join(llm_warnings)}" | |
| return ( | |
| base_status, current_text, questions, index, edited, | |
| gr.update(value="Accept & Finish"), | |
| ) | |
| def go_previous(current_text: str, questions: list, index: int, edited: list): | |
| """Save current edit and go back to the previous question.""" | |
| if not questions: | |
| return "No quiz loaded.", "", questions, index, edited, gr.update() | |
| edited[index] = current_text | |
| if index > 0: | |
| new_index = index - 1 | |
| return ( | |
| f"Question {new_index + 1} of {len(questions)}", | |
| edited[new_index], questions, new_index, edited, | |
| _next_button_label(new_index, len(questions)), | |
| ) | |
| return ( | |
| f"Question 1 of {len(questions)} (already at first question)", | |
| current_text, questions, index, edited, | |
| _next_button_label(index, len(questions)), | |
| ) | |
| def save_and_download(current_text: str, questions: list, index: int, edited: list): | |
| """Validate all questions structurally, then join, sanitize, and export.""" | |
| if not edited: | |
| return "No edited questions to save.", None | |
| # Save the current edit in case user did not click Accept | |
| edited[index] = current_text | |
| # --- Structural validation of every question before export --- | |
| all_errors: List[str] = [] | |
| for i, block in enumerate(edited, start=1): | |
| parsed = _parse_question_block(block) | |
| q_errors: List[str] = [] | |
| if not parsed['prompt'].strip(): | |
| q_errors.append("missing prompt") | |
| if len(parsed['options']) != 4: | |
| q_errors.append(f"expected 4 options, found {len(parsed['options'])}") | |
| else: | |
| correct_count = sum(1 for o in parsed['options'] if o['isCorrect']) | |
| if correct_count != 1: | |
| q_errors.append(f"expected 1 correct option, found {correct_count}") | |
| for j, opt in enumerate(parsed['options']): | |
| if not opt['feedback'].strip(): | |
| q_errors.append(f"option {chr(65+j)} missing feedback") | |
| if q_errors: | |
| all_errors.append(f"Question {i}: {'; '.join(q_errors)}") | |
| if all_errors: | |
| return "Export blocked — fix these issues first:\n" + "\n".join(all_errors), None | |
| # --- Build outputs --- | |
| # .md: sanitize text content but keep bullet markers (• ◦) for readability | |
| combined_md = _sanitize_text("\n\n".join(edited) + "\n", keep_bullets=True) | |
| # .yml: fully sanitized via _generate_yml | |
| questions_data = [_parse_question_block(q) for q in edited] | |
| yml_content = _generate_yml(questions_data) | |
| # Save to output folder | |
| run_manager = get_run_manager() | |
| saved_path = run_manager.save_edited_quiz(combined_md, "formatted_quiz_edited.md") | |
| run_manager.save_edited_quiz(yml_content, "formatted_quiz_edited.yml") | |
| # Temp files for Gradio download | |
| tmp_md = tempfile.NamedTemporaryFile(delete=False, suffix=".md", mode="w", encoding="utf-8") | |
| tmp_md.write(combined_md) | |
| tmp_md.close() | |
| tmp_yml = tempfile.NamedTemporaryFile(delete=False, suffix=".yml", mode="w", encoding="utf-8") | |
| tmp_yml.write(yml_content) | |
| tmp_yml.close() | |
| status = f"Saved to {saved_path}" if saved_path else "Download ready." | |
| return status, [tmp_md.name, tmp_yml.name] | |