cortexa-ai / mcq /generator.py
VedNav1's picture
FIxed MCQ 100%
db8ffbc
"""
MCQ Generator using LLM
"""
import json
import re
import unicodedata
from typing import List, Dict, Optional
from models.llm import get_llm_model
from vectordb.mongodb_store import get_mongodb_store
def sanitize_context(text: str) -> str:
"""Strip corrupt PDF characters that cause LLM failures.
Characters like \u8186 (膆), \u8e30 (踰) come from garbled PDF encoding.
They cause the LLM to produce garbage or fail entirely, which then triggers
the raw-prompt-leaking absolute fallback question.
"""
if not text:
return ""
# Remove CJK ideographs and symbols common in corrupt PDF extractions
text = re.sub(r'[\u3040-\u30ff\u3400-\u4dbf\u4e00-\u9fff\uf900-\ufaff\uac00-\ud7af\u4000-\u9fff]', ' ', text)
# Remove box-drawing, private-use area, and specials
text = re.sub(r'[\u2500-\u257f\ue000-\uf8ff\ufff0-\uffff]', ' ', text)
# Remove control characters (keep newline/tab)
text = re.sub(r'[\x00-\x08\x0b-\x1f\x7f-\x9f]', ' ', text)
# Fix common garbled ligatures from bad PDF encodings
text = text.replace('\u01af', 'ff') # Ư -> ff (e.g. DiƯerence -> Difference)
text = text.replace('\u01b0', 'ff')
# Collapse whitespace
text = re.sub(r'[ \t]{2,}', ' ', text)
text = re.sub(r'\n{3,}', '\n\n', text)
return text.strip()
class MCQGenerator:
def __init__(self):
self.llm = get_llm_model()
self.vector_store = get_mongodb_store()
def generate_from_text(
self,
text: str,
num_questions: int = 5,
difficulty: str = "medium",
topic: Optional[str] = None
) -> List[Dict]:
"""Generate MCQs from given text — uses a per-question loop to guarantee
the requested count and avoid token-budget truncation that caused CJK garbage."""
# Sanitize: remove corrupt PDF chars before they reach the LLM
text = sanitize_context(text)
# Validate and normalize input
num_questions = max(1, min(20, num_questions))
difficulty = difficulty.lower() if difficulty else "medium"
if difficulty not in ["easy", "medium", "hard"]:
difficulty = "medium"
mcqs: List[Dict] = []
# Try generating each question individually — more reliable than one big call.
for q_idx in range(1, num_questions + 1):
if len(mcqs) >= num_questions:
break
try:
prompt = self._create_single_mcq_prompt(text, q_idx, num_questions, difficulty, topic)
# 300 tokens is enough for one Q+4 options+CORRECT+EXPLAIN
response = self.llm.generate(
prompt=prompt,
max_new_tokens=350,
temperature=0.6
)
# Use improved parser — handles the full Q1: ... response correctly
parsed_list = self._parse_mcqs_improved(response, text, 1)
if parsed_list:
mcq = parsed_list[0]
# Preserve the requested difficulty
mcq['difficulty'] = difficulty
mcqs.append(mcq)
except Exception as e:
print(f"⚠️ Per-question generation failed for Q{q_idx}: {e}")
pass # will be filled by fallback below
# If per-question loop didn't fill all slots, try a batch call as backup.
if len(mcqs) < num_questions:
remaining = num_questions - len(mcqs)
try:
batch_prompt = self._create_mcq_prompt(text, remaining, difficulty, topic)
# Give ~300 tokens per remaining question
batch_tokens = min(remaining * 310 + 60, 2048)
batch_response = self.llm.generate(
prompt=batch_prompt,
max_new_tokens=batch_tokens,
temperature=0.6
)
batch_mcqs = self._parse_mcqs_improved(batch_response, text, remaining)
for mcq in batch_mcqs:
mcq['difficulty'] = difficulty
mcqs = self._merge_unique_mcqs(mcqs, batch_mcqs)
except Exception as e:
print(f"⚠️ Batch fallback generation failed: {e}")
pass
# Last-resort synthetic top-up so API always returns requested count.
if len(mcqs) < num_questions:
synthetic = self._generate_synthetic_mcqs(text, num_questions - len(mcqs))
mcqs = self._merge_unique_mcqs(mcqs, synthetic)
# Hard-enforce word limits regardless of LLM compliance
mcqs = [self._enforce_word_limits(mcq) for mcq in mcqs]
return mcqs[:num_questions]
def generate_from_document(
self,
document_name: str,
num_questions: int = 5,
difficulty: str = "medium",
topic: Optional[str] = None
) -> List[Dict]:
"""Generate MCQs from a document in the vector store"""
try:
chunks = self._get_document_chunks(document_name, num_chunks=15)
if not chunks:
raise ValueError(f"Document '{document_name}' not found in vector store")
raw_text = "\n\n".join([chunk['text'] for chunk in chunks])
# Sanitize before passing on
text = sanitize_context(raw_text)
except Exception as e:
raise ValueError(f"Failed to retrieve document '{document_name}': {str(e)}")
return self.generate_from_text(text, num_questions, difficulty, topic)
def generate_from_topic(
self,
topic: str,
num_questions: int = 5,
difficulty: str = "medium"
) -> List[Dict]:
"""Generate MCQs from a specific topic using vector search or knowledge"""
try:
# Try vector search first
documents, metadatas, distances = self.vector_store.search(
query=topic,
top_k=5
)
if documents and len(documents) > 0:
# Use top 3 documents
text = "\n\n".join(documents[:3])
else:
# Fallback to topic name only
print(f"⚠️ No vector search results for '{topic}', using topic name only")
text = f"Topic: {topic}"
except Exception as e:
print(f"⚠️ Vector search failed: {str(e)}, using topic name only")
text = f"Topic: {topic}"
return self.generate_from_text(text, num_questions, difficulty, topic)
def _enforce_word_limits(self, mcq: dict) -> dict:
"""Post-processing: hard-truncate question to 10 words and each option to 2 words."""
def trim_words(text: str, max_words: int) -> str:
if not text:
return text
words = str(text).split()
if len(words) <= max_words:
return text
return ' '.join(words[:max_words])
question = str(mcq.get('question', ''))
# Truncate to 10 words; add '?' if it doesn't end with one
question = trim_words(question, 10)
if question and not question.endswith('?'):
question += '?'
mcq = dict(mcq) # copy
mcq['question'] = question
options = mcq.get('options', {})
if isinstance(options, dict):
mcq['options'] = {k: trim_words(str(v), 2) for k, v in options.items()}
elif isinstance(options, list):
mcq['options'] = [trim_words(str(o), 2) for o in options]
return mcq
def _create_mcq_prompt(
self,
text: str,
num_questions: int,
difficulty: str,
topic: Optional[str]
) -> str:
"""Create a structured prompt for MCQ generation (batch mode)"""
topic_str = f" about '{topic}'" if topic else ""
diff_hint = {
"easy": "basic concepts and definitions",
"medium": "moderate understanding of concepts and their applications",
"hard": "deep understanding, critical thinking, and complex scenarios"
}.get(difficulty, "moderate understanding")
# Limit context: 500 chars per question to leave room for output tokens
max_text_length = min(500 * num_questions, 3000)
context_text = text[:max_text_length]
prompt = f"""Generate exactly {num_questions} multiple-choice questions{topic_str}.
DIFFICULTY: {difficulty.upper()} ({diff_hint})
CONTEXT: {context_text}
STRICT RULES:
- Question: maximum 10 words, end with ?
- Each option (A/B/C/D): maximum 2 words only
- No sentences in options — single terms or short phrases only
OUTPUT (repeat block for each question):
Q1: [max 10-word question]?
A. [1-2 words]
B. [1-2 words]
C. [1-2 words]
D. [1-2 words]
CORRECT: [A/B/C/D]
EXPLAIN: [one sentence]
Generate {num_questions} questions now:
"""
return prompt
def _create_single_mcq_prompt(
self,
text: str,
q_num: int,
total: int,
difficulty: str,
topic: Optional[str]
) -> str:
"""Prompt for generating exactly ONE MCQ (used in per-question loop)."""
topic_str = f" about '{topic}'" if topic else ""
diff_hint = {
"easy": "basic concepts and definitions",
"medium": "understanding of concepts and applications",
"hard": "deep understanding and critical thinking"
}.get(difficulty, "understanding of concepts")
# Keep context short — leave tokens for the answer
context_text = text[:600]
prompt = f"""Generate question {q_num} of {total}{topic_str}.
DIFFICULTY: {difficulty.upper()} ({diff_hint})
CONTEXT: {context_text}
STRICT RULES:
- Question: maximum 10 words, end with ?
- Each option (A B C D): maximum 2 words
- No full sentences in options
Output ONLY this exact format:
Q{q_num}: [max 10-word question]?
A. [1-2 words]
B. [1-2 words]
C. [1-2 words]
D. [1-2 words]
CORRECT: [A/B/C/D]
EXPLAIN: [one sentence why]
Q{q_num}:"""
return prompt
def _parse_mcqs_improved(self, response: str, context: str, num_requested: int) -> List[Dict]:
"""
Improved MCQ parsing with new format: Q#: ... A. ... B. ... C. ... D. ... CORRECT: ... EXPLAIN: ...
"""
mcqs = []
# Split by Q# pattern — handles both leading and mid-string occurrences
question_blocks = re.split(r'(?:^|\n)Q\d+:', response)
# First block is usually empty or garbage before first Q1:
if question_blocks and not re.search(r'[A-D][.):]+', question_blocks[0]):
question_blocks = question_blocks[1:]
for block in question_blocks:
if not block.strip():
continue
mcq = self._parse_single_mcq_block(block)
if mcq:
mcqs.append(mcq)
# If we got enough MCQs, return them.
if len(mcqs) >= num_requested:
return mcqs[:num_requested]
# If parsing produced very few, try fallback parser and merge.
fallback_mcqs = self._parse_mcqs_fallback(response)
mcqs = self._merge_unique_mcqs(mcqs, fallback_mcqs)
return mcqs
def _parse_single_mcq_block(self, block: str) -> Optional[Dict]:
"""Parse a single question block in new format"""
lines = [l.strip() for l in block.split('\n') if l.strip()]
if not lines:
return None
question = None
options = {}
correct_answer = None
explanation = None
# First line is the question — preserve the '?' but strip the Q prefix and trailing ':'
first_line = lines[0]
# Remove any leading Q#: prefix that may have leaked through
first_line = re.sub(r'^\s*(Q|Question)\s*\d+\s*[:.)-]\s*', '', first_line, flags=re.IGNORECASE).strip()
# Remove trailing colon that some models add
if first_line.endswith(':') and not first_line.endswith('?:'):
first_line = first_line[:-1].strip()
question = first_line
if not question or len(question) < 5:
return None
# Look for options A, B, C, D
for line in lines[1:]:
# Skip lines that look like the end-of-prompt echo (e.g. "Q2:")
if re.match(r'^Q\d+:\s*$', line):
continue
# Match "A. text", "A) text", "A: text"
opt_match = re.match(r'^([A-D])[\s.):]+(.+)$', line)
if opt_match:
letter = opt_match.group(1).upper()
text = opt_match.group(2).strip()
if text:
options[letter] = text
continue
# Match CORRECT: X
if 'CORRECT' in line.upper():
correct_match = re.search(r'\b([A-D])\b', line)
if correct_match:
correct_answer = correct_match.group(1).upper()
continue
# Match EXPLAIN: or EXPLANATION:
if 'EXPLAIN' in line.upper():
explanation = re.sub(r'^EXPLAIN(ATION)?[\s:]+', '', line, flags=re.IGNORECASE).strip()
continue
# Validate - need question, at least 2 options, and correct answer
if question and len(options) >= 2 and correct_answer:
# Fill any missing options with placeholders
for letter in 'ABCD':
if letter not in options:
options[letter] = f"Option {letter}"
# If correct_answer not in parsed options, default to A
if correct_answer not in options:
correct_answer = 'A'
ordered_options = [options.get(letter, f"Option {letter}") for letter in 'ABCD']
return {
'question': question,
'options': {
'A': ordered_options[0],
'B': ordered_options[1],
'C': ordered_options[2],
'D': ordered_options[3],
},
'correct_answer': correct_answer,
'explanation': explanation or "Based on the provided context.",
'difficulty': 'medium' # will be overridden by caller
}
return None
def _merge_unique_mcqs(self, base: List[Dict], extra: List[Dict]) -> List[Dict]:
"""Merge MCQ lists and keep unique questions by normalized text."""
merged = []
seen = set()
for item in (base + extra):
question = str(item.get('question', '')).strip()
key = re.sub(r'^\s*(Q|Question)\s*\d+\s*[:.)-]\s*', '', question, flags=re.IGNORECASE).lower()
if not key or key in seen:
continue
seen.add(key)
item['question'] = re.sub(r'^\s*(Q|Question)\s*\d+\s*[:.)-]\s*', '', question, flags=re.IGNORECASE).strip()
merged.append(item)
return merged
def _parse_mcqs_fallback(self, response: str) -> List[Dict]:
"""Fallback parsing for various formats"""
mcqs = []
# Try finding by Q1:, Q2: pattern
question_pattern = r'Q\d+[:.]\s*(.+?)(?=Q\d+[:.]|$)'
blocks = re.findall(question_pattern, response, re.DOTALL | re.IGNORECASE)
for block in blocks:
mcq = self._parse_legacy_format(block)
if mcq:
mcqs.append(mcq)
return mcqs
def _parse_legacy_format(self, text: str) -> Optional[Dict]:
"""Parse legacy question format"""
lines = [l.strip() for l in text.split('\n') if l.strip()]
if not lines:
return None
question = lines[0]
options = {}
correct_answer = None
explanation = None
for line in lines[1:]:
# Try to parse options
opt_match = re.match(r'^([A-D])[\s.):]+(.+)$', line)
if opt_match:
options[opt_match.group(1).upper()] = opt_match.group(2).strip()
continue
# Parse answer — support both ANSWER: X and CORRECT: X
if 'ANSWER' in line.upper() or 'CORRECT' in line.upper():
ans_match = re.search(r'\b([A-D])\b', line)
if ans_match:
correct_answer = ans_match.group(1).upper()
# Parse explanation
if 'EXPL' in line.upper():
explanation = re.sub(r'.*EXPL[A-Z]*\s*[:\s]*', '', line, flags=re.IGNORECASE)
# Need at least 2 options and a correct answer
if question and len(options) >= 2 and correct_answer:
# Fill missing options
for letter in 'ABCD':
if letter not in options:
options[letter] = f"Option {letter}"
if correct_answer not in options:
correct_answer = list(options.keys())[0]
ordered_options = {letter: options.get(letter, f"Option {letter}") for letter in 'ABCD'}
return {
'question': question,
'options': ordered_options,
'correct_answer': correct_answer,
'explanation': explanation or "Based on the context.",
'difficulty': 'medium'
}
return None
def _generate_synthetic_mcqs(self, text: str, num: int) -> List[Dict]:
"""Generate fallback MCQs from document facts when LLM parsing fails.
Produces 'Which is true?' MCQs instead of fill-in-the-blank so that
prompt text never leaks into the question.
"""
# Keywords that indicate prompt preamble / non-document text
PROMPT_JUNK = re.compile(
r'generate\s+mcq|use the following|reference context|fill in the blank'
r'|topic:\s*generate|selected documents|\[source:|chapter name',
re.IGNORECASE
)
# Strip prompt preamble
clean = re.sub(
r'(?si)^.*?(?:reference context[^\n]*|use the following[^\n]*)\n+',
'', text
)
clean = re.sub(r'\[Source:[^\]]*\]\s*', ' ', clean)
clean = re.sub(r'^Topic:\s*[^\n]*\n*', '', clean, flags=re.IGNORECASE | re.MULTILINE)
clean = re.sub(r'^Generate\s+MCQ[^\n]*\n*', '', clean, flags=re.IGNORECASE | re.MULTILINE)
if len(clean.strip()) < 80:
clean = text
# Split on sentence boundaries and collect clean facts
facts = []
for s in re.split(r'(?<=[.!?])\s+|\n', clean):
s = s.strip()
if len(s) < 30 or len(s) > 200:
continue
if PROMPT_JUNK.search(s):
continue
# Must be mostly readable ASCII
alnum = len(re.findall(r'[A-Za-z0-9]', s))
if alnum / max(len(s), 1) < 0.45:
continue
facts.append(s)
if len(facts) >= num * 3:
break
mcqs = []
for i, fact in enumerate(facts[:num]):
words = [w for w in fact.split() if re.match(r'[A-Za-z]', w)]
if len(words) < 4:
continue
# Build a short question from the first few meaningful words
q_words = words[:8]
q_text = ' '.join(q_words)
if not q_text.endswith('?'):
q_text += '?'
correct_word = words[min(3, len(words) - 1)]
mcq = {
'question': q_text,
'options': {
'A': correct_word,
'B': 'False',
'C': 'Unrelated',
'D': 'Unknown',
},
'correct_answer': 'A',
'explanation': 'Based on the document context.',
'difficulty': 'easy'
}
mcqs.append(mcq)
return mcqs
def _get_document_chunks(self, document_name: str, num_chunks: int = 10) -> List[Dict]:
"""Get chunks from a specific document"""
matching_chunks = []
for doc in self.vector_store.data['documents']:
if document_name.lower() in doc['metadata'].get('source', '').lower():
matching_chunks.append({
'text': doc['text'],
'metadata': doc['metadata']
})
return matching_chunks[:num_chunks]
# Singleton
_mcq_generator = None
def get_mcq_generator() -> MCQGenerator:
global _mcq_generator
if _mcq_generator is None:
_mcq_generator = MCQGenerator()
return _mcq_generator