linhnguyen02
set up to deploy in hugging face
42cffde
import math
from typing import Any, Dict, List, Optional
import spacy
nlp = spacy.load("en_core_web_md")
from loaders.elastic import Elastic
from env import config
import language_tool_python
import re
from collections import defaultdict
from sklearn.metrics.pairwise import cosine_similarity
from services.AI.false_ans_generator import FalseAnswerGenerator
from src.interfaces.evaluation import GeneratedQuestion
from src.enums import QuestionTypeEnum
class QuestionQualityEvaluator:
INDEX = "vocabulary"
def __init__(self, config: dict):
self.config = config
self._grammar_tool = language_tool_python.LanguageTool('en-US')
self.nlp = nlp
# Cache các config để dễ đọc
self.weights = config["evaluation"]["weights"]
self.penalties = config["evaluation"]["penalty_for_error"]["structure"]
self.distractor_cfg = config["evaluation"]["distractor"]
def evaluate(self, q: GeneratedQuestion, check_by_ai: bool = False) -> Dict[str, Any]:
all_issues: List[Dict[str, Any]] = []
all_suggestions: List[str] = []
# 1. Structure
s_score, s_issues, s_suggestions = self._check_structure(q)
all_issues.append({"field": "structure", "score": s_score, "issues": s_issues})
all_suggestions.extend(s_suggestions)
# 2. Popularity
p_score = self._check_popularity(q)
all_issues.append({"field": "popularity", "score": p_score, "issues": []})
# 3. Distractor
d_score, d_issues = self._check_distractors(q)
all_issues.append({"field": "distractor", "score": d_score, "issues": d_issues})
w_score = self.weights["structure"] + self.weights["popularity"] + self.weights["distractor"] + self.weights["ai_adjust_factor"] if check_by_ai else 0.0
final_score = (
s_score * self.weights["structure"] +
p_score * self.weights["popularity"] +
d_score * self.weights["distractor"]
) / w_score
rounded_score = math.ceil(final_score * 10) / 10
return {
"score": min(round(rounded_score, 1), 10.0),
"issues": all_issues,
"suggestions": list(set(all_suggestions))
}
def _check_structure(self, q: GeneratedQuestion):
issues: List[Any] = []
suggestions: List[str] = []
score = 1.0
# Question text
if not q.content or not q.content.strip():
issues.append("missing_question_text")
score -= self.penalties["missing_question_text"]
else:
grammar_count, grammar_msgs = self._check_grammar(q.content)
if grammar_count > 0:
issues.append({
"type": "question_grammar_error",
"count": grammar_count,
"details": grammar_msgs
})
score -= grammar_count * self.penalties["grammar_error_per_count"]
# Choices
if not q.choices or len(q.choices) == 0:
issues.append("missing_choices")
score -= self.penalties["missing_choices"]
else:
empty_count = 0
unique_contents = []
has_correct = False
for choice in q.choices:
content = (choice.content or "").strip()
if not content:
empty_count += 1
continue
unique_contents.append(content)
if choice.is_correct:
has_correct = True
if empty_count > 0:
issues.append(f"{empty_count}_empty_choices")
score -= self.penalties["empty_choice_ratio"] * (empty_count / len(q.choices))
if len(set(unique_contents)) < len(unique_contents):
issues.append("duplicated_choices")
score -= self.penalties["duplicated_choices"]
if not has_correct:
issues.append("no_correct_answer")
score -= self.penalties["no_correct_answer"]
for content in unique_contents:
grammar_count, grammar_msgs = self._check_grammar(content)
if grammar_count > 0:
issues.append({
"type": "choice_grammar_error",
"choice": content,
"count": grammar_count,
"details": grammar_msgs
})
score -= grammar_count * self.penalties["grammar_error_per_count"]
return max(score, 0.0), issues, suggestions
def _check_popularity(self, q: GeneratedQuestion) -> float:
unique_words = set(q.content.lower().split())
for choice in q.choices or []:
unique_words.update((choice.content or "").lower().split())
if not unique_words:
return 0.0
es = Elastic()
resp = es.search(
index=self.INDEX,
size=0,
query={"terms": {"word.keyword": list(unique_words)}},
aggs={
"by_word": {
"terms": {"field": "word.keyword", "size": len(unique_words)},
"aggs": {"cefr_level": {"avg": {"field": "cefr"}}}
}
}
)
word_cefr_map = {
bucket["key"].lower(): bucket["cefr_level"]["value"] or 4.0
for bucket in resp["aggregations"]["by_word"]["buckets"]
}
total = sum(word_cefr_map.get(word, 4.0) for word in unique_words)
avg_cefr = total / len(unique_words)
# Score cao khi từ khó hơn (CEFR cao hơn)
popularity_score = max(0.0, (avg_cefr - 1) / 5.0)
return round(popularity_score, 3)
def _check_distractors(self, q: GeneratedQuestion):
issues: List[Dict[str, Any]] = []
scores: List[float] = []
# 1. POS & lexical family
pos_score = self._check_pos_and_meaning_of_choice(q)
if pos_score is not None:
scores.append(pos_score)
issues.append({"type": "pos_lexical_family", "score": round(pos_score, 3)})
# 2. Embedding similarity
emb_score = self._cal_score_embedding_similarity(q)
if emb_score is not None:
scores.append(emb_score)
t = self.distractor_cfg["embedding_similarity_thresholds"]
level = (
"too_different" if emb_score <= t["too_different"] else
"moderate" if emb_score <= t["moderate"] else
"good" if emb_score <= t["good"] else
"strong" if emb_score <= t["strong"] else
"excellent"
)
issues.append({
"type": "embedding_similarity",
"score": round(emb_score, 3),
"level": level
})
# 3. Paragraph difficulty
para_score = self._cal_score_for_paragraph(q)
if para_score is not None:
scores.append(para_score)
diff_part = (para_score - self.distractor_cfg["paragraph"]["length_weight"]) / self.distractor_cfg["paragraph"]["difficulty_weight"] * 5
level = "direct_match" if diff_part < 2 else "paraphrase" if diff_part < 4 else "inference"
issues.append({
"type": "paragraph_difficulty",
"score": round(para_score, 3),
"level": level
})
final_score = sum(scores) / len(scores) if scores else 0.0
if scores:
issues.append({
"type": "distractor_summary",
"score": round(final_score, 3),
"components": len(scores)
})
return round(final_score, 3), issues
def _check_grammar(self, text: str, max_errors: int = 2):
if not text or len(text.strip()) < 5:
return 0, []
matches = self._grammar_tool.check(text)
serious_matches = [
m for m in matches
if m.ruleIssueType in {"grammar", "misspelling"}
and not m.ruleId.startswith("UPPERCASE_SENTENCE_START")
]
error_messages = [
{
"message": m.message,
"rule": m.ruleId,
"error_text": text[m.offset:m.offset + m.errorLength],
"suggestions": m.replacements[:3]
}
for m in serious_matches[:max_errors]
]
return len(error_messages), error_messages
def _check_pos_and_meaning_of_choice(self, q: GeneratedQuestion) -> Optional[float]:
if q.type in {QuestionTypeEnum.PRONUNCIATION, QuestionTypeEnum.STRESS}:
return 1.0
to_be_regex = re.compile(
r'\b(has been|have been|had been|will be|am|is|are|was|were|be|being|been|\'s|\'re|\'m)\b',
flags=re.IGNORECASE
)
cleaned_choices: List[str] = []
score = 1.0
for c in q.choices or []:
content = (c.content or "").strip()
if not content:
score -= self.distractor_cfg["empty_choice_deduction"]
continue
cleaned = to_be_regex.sub("", content)
cleaned = " ".join(cleaned.split()).lower()
cleaned_choices.append(cleaned)
if any(len(t.split()) > 1 for t in cleaned_choices):
return score
docs = [self.nlp(text) for text in cleaned_choices]
tokens = [token for doc in docs for token in doc]
return score * self.lexical_family_difficulty(tokens, q.num_ans_per_question or 4)
def _cal_score_embedding_similarity(self, q: GeneratedQuestion) -> Optional[float]:
if q.type not in {QuestionTypeEnum.SYNONYM, QuestionTypeEnum.ANTONYM, QuestionTypeEnum.VOCAB}:
return None
correct = [c.content for c in q.choices if c.is_correct]
distractors = [c.content for c in q.choices if not c.is_correct]
if not correct or not distractors:
return 0.0
ai = FalseAnswerGenerator()
emb_correct = ai.get_embedding_list_word(correct)
emb_dist = ai.get_embedding_list_word(distractors)
similarities = [
cosine_similarity(c.reshape(1, -1), d.reshape(1, -1))[0][0]
for c in emb_correct for d in emb_dist
]
if not similarities:
return 0.0
avg_sim = sum(similarities) / len(similarities)
t = self.distractor_cfg["embedding_similarity_thresholds"]
if avg_sim <= t["too_different"]:
return 0.2
elif avg_sim <= t["moderate"]:
return 0.4
elif avg_sim <= t["good"]:
return 0.6
elif avg_sim <= t["strong"]:
return 0.8
else:
return 1.0
def _cal_score_for_paragraph(self, q: GeneratedQuestion) -> Optional[float]:
if q.type not in {
QuestionTypeEnum.VOCAB, QuestionTypeEnum.FACT,
QuestionTypeEnum.MAIN_IDEA, QuestionTypeEnum.INFERENCE,
QuestionTypeEnum.PURPOSE
}:
return None
correct_answer = next((c.content for c in q.choices if c.is_correct), None)
if not correct_answer or not q.paragraph:
return 0.0
words = q.paragraph.lower().split()
word_count = len(words)
p_cfg = self.distractor_cfg["paragraph"]
# Length score
if q.type == QuestionTypeEnum.VOCAB:
thresholds = p_cfg["vocab_length_thresholds"]
scores = [0.2, 0.3, 0.4, 0.5]
else:
thresholds = p_cfg["other_length_thresholds"]
scores = [0.3, 0.5, 0.7, 0.9, 1.0]
length_score = scores[-1]
for thresh, sc in zip(thresholds, scores):
if word_count <= thresh:
length_score = sc
break
# Difficulty score
doc = self.nlp(q.paragraph)
sentences = [sent.text.strip() for sent in doc.sents if sent.text.strip()]
if not sentences:
return length_score * p_cfg["length_weight"]
ai = FalseAnswerGenerator()
sent_embs = ai.get_embedding_list_word(sentences)
ans_emb = ai.get_embedding_list_word([correct_answer])
cos_scores = cosine_similarity(ans_emb, sent_embs)[0]
max_sim = float(max(cos_scores)) if cos_scores.size else 0.0
levels = p_cfg["difficulty_levels"]
if max_sim >= p_cfg["direct_match_sim"]:
diff_val = levels[0]
elif max_sim >= p_cfg["paraphrase_sim"]:
diff_val = levels[1]
else:
diff_val = levels[2]
diff_score = diff_val / 5.0
return p_cfg["length_weight"] * length_score + p_cfg["difficulty_weight"] * diff_score
def group_by_lemma(self, tokens):
groups = defaultdict(list)
for t in tokens:
groups[t.lemma_.lower()].append(t)
return groups
def group_by_pos(self, tokens):
groups = defaultdict(list)
for t in tokens:
groups[t.pos_].append(t)
return groups
def lexical_family_difficulty(self, tokens, num_ans_per_question: int = 4) -> float:
if not tokens:
return self.distractor_cfg["lexical_family"]["scores"]["low"]
lemma_groups = self.group_by_lemma(tokens)
pos_groups = self.group_by_pos(tokens)
n = len(tokens)
lemma_score = sum(len(v) for v in lemma_groups.values() if len(v) >= 3)
lemma_ratio = lemma_score / n
pos_score = sum(len(v) for v in pos_groups.values() if len(v) >= min(num_ans_per_question, 3))
pos_ratio = pos_score / n
t = self.distractor_cfg["lexical_family"]["thresholds"]
s = self.distractor_cfg["lexical_family"]["scores"]
if lemma_ratio >= t["high_lemma"]:
return s["high_lemma"]
if pos_ratio >= t["high_pos"]:
return s["high_pos"]
if pos_ratio >= t["medium_high_pos"]:
return s["medium_high_pos"]
if lemma_ratio >= t["medium_lemma"]:
return s["medium_lemma"]
if pos_ratio >= t["medium_both"] and lemma_ratio >= t["medium_both"]:
return s["medium_both"]
return s["low"]