import math from typing import Any, Dict, List, Optional import spacy nlp = spacy.load("en_core_web_md") from loaders.elastic import Elastic from env import config import language_tool_python import re from collections import defaultdict from sklearn.metrics.pairwise import cosine_similarity from services.AI.false_ans_generator import FalseAnswerGenerator from src.interfaces.evaluation import GeneratedQuestion from src.enums import QuestionTypeEnum class QuestionQualityEvaluator: INDEX = "vocabulary" def __init__(self, config: dict): self.config = config self._grammar_tool = language_tool_python.LanguageTool('en-US') self.nlp = nlp # Cache các config để dễ đọc self.weights = config["evaluation"]["weights"] self.penalties = config["evaluation"]["penalty_for_error"]["structure"] self.distractor_cfg = config["evaluation"]["distractor"] def evaluate(self, q: GeneratedQuestion, check_by_ai: bool = False) -> Dict[str, Any]: all_issues: List[Dict[str, Any]] = [] all_suggestions: List[str] = [] # 1. Structure s_score, s_issues, s_suggestions = self._check_structure(q) all_issues.append({"field": "structure", "score": s_score, "issues": s_issues}) all_suggestions.extend(s_suggestions) # 2. Popularity p_score = self._check_popularity(q) all_issues.append({"field": "popularity", "score": p_score, "issues": []}) # 3. Distractor d_score, d_issues = self._check_distractors(q) all_issues.append({"field": "distractor", "score": d_score, "issues": d_issues}) w_score = self.weights["structure"] + self.weights["popularity"] + self.weights["distractor"] + self.weights["ai_adjust_factor"] if check_by_ai else 0.0 final_score = ( s_score * self.weights["structure"] + p_score * self.weights["popularity"] + d_score * self.weights["distractor"] ) / w_score rounded_score = math.ceil(final_score * 10) / 10 return { "score": min(round(rounded_score, 1), 10.0), "issues": all_issues, "suggestions": list(set(all_suggestions)) } def _check_structure(self, q: GeneratedQuestion): issues: List[Any] = [] suggestions: List[str] = [] score = 1.0 # Question text if not q.content or not q.content.strip(): issues.append("missing_question_text") score -= self.penalties["missing_question_text"] else: grammar_count, grammar_msgs = self._check_grammar(q.content) if grammar_count > 0: issues.append({ "type": "question_grammar_error", "count": grammar_count, "details": grammar_msgs }) score -= grammar_count * self.penalties["grammar_error_per_count"] # Choices if not q.choices or len(q.choices) == 0: issues.append("missing_choices") score -= self.penalties["missing_choices"] else: empty_count = 0 unique_contents = [] has_correct = False for choice in q.choices: content = (choice.content or "").strip() if not content: empty_count += 1 continue unique_contents.append(content) if choice.is_correct: has_correct = True if empty_count > 0: issues.append(f"{empty_count}_empty_choices") score -= self.penalties["empty_choice_ratio"] * (empty_count / len(q.choices)) if len(set(unique_contents)) < len(unique_contents): issues.append("duplicated_choices") score -= self.penalties["duplicated_choices"] if not has_correct: issues.append("no_correct_answer") score -= self.penalties["no_correct_answer"] for content in unique_contents: grammar_count, grammar_msgs = self._check_grammar(content) if grammar_count > 0: issues.append({ "type": "choice_grammar_error", "choice": content, "count": grammar_count, "details": grammar_msgs }) score -= grammar_count * self.penalties["grammar_error_per_count"] return max(score, 0.0), issues, suggestions def _check_popularity(self, q: GeneratedQuestion) -> float: unique_words = set(q.content.lower().split()) for choice in q.choices or []: unique_words.update((choice.content or "").lower().split()) if not unique_words: return 0.0 es = Elastic() resp = es.search( index=self.INDEX, size=0, query={"terms": {"word.keyword": list(unique_words)}}, aggs={ "by_word": { "terms": {"field": "word.keyword", "size": len(unique_words)}, "aggs": {"cefr_level": {"avg": {"field": "cefr"}}} } } ) word_cefr_map = { bucket["key"].lower(): bucket["cefr_level"]["value"] or 4.0 for bucket in resp["aggregations"]["by_word"]["buckets"] } total = sum(word_cefr_map.get(word, 4.0) for word in unique_words) avg_cefr = total / len(unique_words) # Score cao khi từ khó hơn (CEFR cao hơn) popularity_score = max(0.0, (avg_cefr - 1) / 5.0) return round(popularity_score, 3) def _check_distractors(self, q: GeneratedQuestion): issues: List[Dict[str, Any]] = [] scores: List[float] = [] # 1. POS & lexical family pos_score = self._check_pos_and_meaning_of_choice(q) if pos_score is not None: scores.append(pos_score) issues.append({"type": "pos_lexical_family", "score": round(pos_score, 3)}) # 2. Embedding similarity emb_score = self._cal_score_embedding_similarity(q) if emb_score is not None: scores.append(emb_score) t = self.distractor_cfg["embedding_similarity_thresholds"] level = ( "too_different" if emb_score <= t["too_different"] else "moderate" if emb_score <= t["moderate"] else "good" if emb_score <= t["good"] else "strong" if emb_score <= t["strong"] else "excellent" ) issues.append({ "type": "embedding_similarity", "score": round(emb_score, 3), "level": level }) # 3. Paragraph difficulty para_score = self._cal_score_for_paragraph(q) if para_score is not None: scores.append(para_score) diff_part = (para_score - self.distractor_cfg["paragraph"]["length_weight"]) / self.distractor_cfg["paragraph"]["difficulty_weight"] * 5 level = "direct_match" if diff_part < 2 else "paraphrase" if diff_part < 4 else "inference" issues.append({ "type": "paragraph_difficulty", "score": round(para_score, 3), "level": level }) final_score = sum(scores) / len(scores) if scores else 0.0 if scores: issues.append({ "type": "distractor_summary", "score": round(final_score, 3), "components": len(scores) }) return round(final_score, 3), issues def _check_grammar(self, text: str, max_errors: int = 2): if not text or len(text.strip()) < 5: return 0, [] matches = self._grammar_tool.check(text) serious_matches = [ m for m in matches if m.ruleIssueType in {"grammar", "misspelling"} and not m.ruleId.startswith("UPPERCASE_SENTENCE_START") ] error_messages = [ { "message": m.message, "rule": m.ruleId, "error_text": text[m.offset:m.offset + m.errorLength], "suggestions": m.replacements[:3] } for m in serious_matches[:max_errors] ] return len(error_messages), error_messages def _check_pos_and_meaning_of_choice(self, q: GeneratedQuestion) -> Optional[float]: if q.type in {QuestionTypeEnum.PRONUNCIATION, QuestionTypeEnum.STRESS}: return 1.0 to_be_regex = re.compile( r'\b(has been|have been|had been|will be|am|is|are|was|were|be|being|been|\'s|\'re|\'m)\b', flags=re.IGNORECASE ) cleaned_choices: List[str] = [] score = 1.0 for c in q.choices or []: content = (c.content or "").strip() if not content: score -= self.distractor_cfg["empty_choice_deduction"] continue cleaned = to_be_regex.sub("", content) cleaned = " ".join(cleaned.split()).lower() cleaned_choices.append(cleaned) if any(len(t.split()) > 1 for t in cleaned_choices): return score docs = [self.nlp(text) for text in cleaned_choices] tokens = [token for doc in docs for token in doc] return score * self.lexical_family_difficulty(tokens, q.num_ans_per_question or 4) def _cal_score_embedding_similarity(self, q: GeneratedQuestion) -> Optional[float]: if q.type not in {QuestionTypeEnum.SYNONYM, QuestionTypeEnum.ANTONYM, QuestionTypeEnum.VOCAB}: return None correct = [c.content for c in q.choices if c.is_correct] distractors = [c.content for c in q.choices if not c.is_correct] if not correct or not distractors: return 0.0 ai = FalseAnswerGenerator() emb_correct = ai.get_embedding_list_word(correct) emb_dist = ai.get_embedding_list_word(distractors) similarities = [ cosine_similarity(c.reshape(1, -1), d.reshape(1, -1))[0][0] for c in emb_correct for d in emb_dist ] if not similarities: return 0.0 avg_sim = sum(similarities) / len(similarities) t = self.distractor_cfg["embedding_similarity_thresholds"] if avg_sim <= t["too_different"]: return 0.2 elif avg_sim <= t["moderate"]: return 0.4 elif avg_sim <= t["good"]: return 0.6 elif avg_sim <= t["strong"]: return 0.8 else: return 1.0 def _cal_score_for_paragraph(self, q: GeneratedQuestion) -> Optional[float]: if q.type not in { QuestionTypeEnum.VOCAB, QuestionTypeEnum.FACT, QuestionTypeEnum.MAIN_IDEA, QuestionTypeEnum.INFERENCE, QuestionTypeEnum.PURPOSE }: return None correct_answer = next((c.content for c in q.choices if c.is_correct), None) if not correct_answer or not q.paragraph: return 0.0 words = q.paragraph.lower().split() word_count = len(words) p_cfg = self.distractor_cfg["paragraph"] # Length score if q.type == QuestionTypeEnum.VOCAB: thresholds = p_cfg["vocab_length_thresholds"] scores = [0.2, 0.3, 0.4, 0.5] else: thresholds = p_cfg["other_length_thresholds"] scores = [0.3, 0.5, 0.7, 0.9, 1.0] length_score = scores[-1] for thresh, sc in zip(thresholds, scores): if word_count <= thresh: length_score = sc break # Difficulty score doc = self.nlp(q.paragraph) sentences = [sent.text.strip() for sent in doc.sents if sent.text.strip()] if not sentences: return length_score * p_cfg["length_weight"] ai = FalseAnswerGenerator() sent_embs = ai.get_embedding_list_word(sentences) ans_emb = ai.get_embedding_list_word([correct_answer]) cos_scores = cosine_similarity(ans_emb, sent_embs)[0] max_sim = float(max(cos_scores)) if cos_scores.size else 0.0 levels = p_cfg["difficulty_levels"] if max_sim >= p_cfg["direct_match_sim"]: diff_val = levels[0] elif max_sim >= p_cfg["paraphrase_sim"]: diff_val = levels[1] else: diff_val = levels[2] diff_score = diff_val / 5.0 return p_cfg["length_weight"] * length_score + p_cfg["difficulty_weight"] * diff_score def group_by_lemma(self, tokens): groups = defaultdict(list) for t in tokens: groups[t.lemma_.lower()].append(t) return groups def group_by_pos(self, tokens): groups = defaultdict(list) for t in tokens: groups[t.pos_].append(t) return groups def lexical_family_difficulty(self, tokens, num_ans_per_question: int = 4) -> float: if not tokens: return self.distractor_cfg["lexical_family"]["scores"]["low"] lemma_groups = self.group_by_lemma(tokens) pos_groups = self.group_by_pos(tokens) n = len(tokens) lemma_score = sum(len(v) for v in lemma_groups.values() if len(v) >= 3) lemma_ratio = lemma_score / n pos_score = sum(len(v) for v in pos_groups.values() if len(v) >= min(num_ans_per_question, 3)) pos_ratio = pos_score / n t = self.distractor_cfg["lexical_family"]["thresholds"] s = self.distractor_cfg["lexical_family"]["scores"] if lemma_ratio >= t["high_lemma"]: return s["high_lemma"] if pos_ratio >= t["high_pos"]: return s["high_pos"] if pos_ratio >= t["medium_high_pos"]: return s["medium_high_pos"] if lemma_ratio >= t["medium_lemma"]: return s["medium_lemma"] if pos_ratio >= t["medium_both"] and lemma_ratio >= t["medium_both"]: return s["medium_both"] return s["low"]