PLRS / plrs /model /evaluator.py
Clementina Tom (via Gemini)
Upgrade to v0.2.0: Modular architecture, skill_encoder_v2 support, and model fallback
a30026f
"""
plrs.model.evaluator
====================
Evaluation suite for PLRS.
Metrics:
- Knowledge Tracing: AUC-ROC, Accuracy, Binary Cross-Entropy
- Recommendation: Prerequisite Violation Rate, Coverage, Diversity
- Baselines: Random, Popularity, BKT (Bayesian Knowledge Tracing)
Usage:
from plrs.model.evaluator import PLRSEvaluator
evaluator = PLRSEvaluator(pipeline, curriculum)
report = evaluator.evaluate(test_sequences, skill_to_topic)
"""
from __future__ import annotations
import time
from dataclasses import dataclass, field
from typing import Any
import numpy as np
try:
from sklearn.metrics import roc_auc_score, accuracy_score, log_loss
HAS_SKLEARN = True
except ImportError:
HAS_SKLEARN = False
# ── Baseline models ───────────────────────────────────────────────────────────
class RandomBaseline:
"""Predicts 0.5 for every interaction."""
def predict(self, skill_seq, correct_seq):
return {i: 0.5 for i in range(len(skill_seq))}
def recommend(self, curriculum, n=5):
import random
return random.sample(curriculum.nodes, min(n, len(curriculum.nodes)))
class PopularityBaseline:
"""Recommends the most-seen skills; predicts by global correctness rate."""
def __init__(self):
self.skill_correct: dict[int, list[float]] = {}
self.topic_count: dict[str, int] = {}
def fit(self, sequences, skill_to_topic=None):
for skill_seq, correct_seq in sequences:
for skill, correct in zip(skill_seq, correct_seq):
self.skill_correct.setdefault(skill, []).append(float(correct))
if skill_to_topic:
topic = skill_to_topic.get(skill)
if topic:
self.topic_count[topic] = self.topic_count.get(topic, 0) + 1
def predict_prob(self, skill_id: int) -> float:
history = self.skill_correct.get(skill_id, [])
return float(np.mean(history)) if history else 0.5
def recommend(self, curriculum, n=5):
if not self.topic_count:
return curriculum.nodes[:n]
sorted_topics = sorted(self.topic_count, key=self.topic_count.get, reverse=True)
return [t for t in sorted_topics if t in curriculum.nodes][:n]
class BKTBaseline:
"""
Bayesian Knowledge Tracing (per-skill).
Simple 4-parameter model: p_init, p_transit, p_slip, p_guess.
"""
def __init__(self, p_init=0.3, p_transit=0.1, p_slip=0.1, p_guess=0.2):
self.p_init = p_init
self.p_transit = p_transit
self.p_slip = p_slip
self.p_guess = p_guess
self._mastery: dict[int, float] = {}
def _update(self, skill: int, correct: int) -> float:
p = self._mastery.get(skill, self.p_init)
# Bayes update
if correct:
num = p * (1 - self.p_slip)
den = num + (1 - p) * self.p_guess
else:
num = p * self.p_slip
den = num + (1 - p) * (1 - self.p_guess)
p_post = num / max(den, 1e-9)
# Learning
p_post = p_post + (1 - p_post) * self.p_transit
self._mastery[skill] = p_post
return p_post
def predict_sequence(self, skill_seq: list[int], correct_seq: list[int]) -> list[float]:
self._mastery = {}
probs = []
for skill, correct in zip(skill_seq[:-1], correct_seq[:-1]):
self._update(skill, correct)
next_skill = skill_seq[len(probs) + 1]
probs.append(self._mastery.get(next_skill, self.p_init))
return probs
def get_mastery(self) -> dict[int, float]:
return dict(self._mastery)
# ── Result dataclasses ────────────────────────────────────────────────────────
@dataclass
class KTMetrics:
"""Knowledge tracing evaluation metrics."""
model_name: str
auc: float
accuracy: float
log_loss: float
n_samples: int
elapsed_s: float
@dataclass
class RecommendMetrics:
"""Recommendation quality metrics."""
violation_rate: float # fraction of recommendations that violate prerequisites
coverage: float # fraction of curriculum covered by recommendations
avg_downstream: float # avg topics unlocked by recommendations
mastery_rate: float # avg student mastery in test set
@dataclass
class EvaluationReport:
"""Full evaluation report."""
kt_metrics: list[KTMetrics]
rec_metrics: RecommendMetrics | None
config: dict[str, Any]
timestamp: str
def print(self) -> None:
print("\n" + "=" * 62)
print(" PLRS EVALUATION REPORT")
print("=" * 62)
print(f"\n{'Model':<22} {'AUC':>8} {'Accuracy':>10} {'Log Loss':>10} {'Samples':>8}")
print("-" * 62)
for m in self.kt_metrics:
print(f"{m.model_name:<22} {m.auc:>8.4f} {m.accuracy:>10.4f} {m.log_loss:>10.4f} {m.n_samples:>8,}")
if self.rec_metrics:
r = self.rec_metrics
print(f"\n{'Recommendation Metrics':}")
print(f" Prerequisite violation rate : {r.violation_rate:.1%}")
print(f" Curriculum coverage : {r.coverage:.1%}")
print(f" Avg downstream unlocked : {r.avg_downstream:.1f}")
print(f" Avg student mastery rate : {r.mastery_rate:.1%}")
print("=" * 62 + "\n")
def to_dict(self) -> dict:
return {
"kt_metrics": [
{
"model": m.model_name,
"auc": round(m.auc, 6),
"accuracy": round(m.accuracy, 6),
"log_loss": round(m.log_loss, 6),
"n_samples": m.n_samples,
"elapsed_s": round(m.elapsed_s, 3),
}
for m in self.kt_metrics
],
"rec_metrics": {
"violation_rate": round(self.rec_metrics.violation_rate, 6),
"coverage": round(self.rec_metrics.coverage, 6),
"avg_downstream": round(self.rec_metrics.avg_downstream, 3),
"mastery_rate": round(self.rec_metrics.mastery_rate, 6),
} if self.rec_metrics else None,
"config": self.config,
"timestamp": self.timestamp,
}
# ── Main evaluator ────────────────────────────────────────────────────────────
class PLRSEvaluator:
"""
Evaluate PLRS against baselines on held-out student sequences.
Parameters
----------
pipeline : PLRSPipeline
A loaded pipeline (with or without SAKT model).
"""
def __init__(self, pipeline) -> None:
self.pipeline = pipeline
self.curriculum = pipeline.curriculum
def evaluate(
self,
test_sequences: list[tuple[list[int], list[int]]],
skill_to_topic: dict[int, str] | None = None,
train_sequences: list[tuple[list[int], list[int]]] | None = None,
include_baselines: bool = True,
) -> EvaluationReport:
"""
Run full evaluation.
Parameters
----------
test_sequences : list of (skill_seq, correct_seq)
skill_to_topic : dict mapping skill_id β†’ curriculum topic_id
train_sequences : used to fit popularity baseline
include_baselines : whether to evaluate BKT and popularity baselines
Returns
-------
EvaluationReport
"""
import datetime
kt_metrics: list[KTMetrics] = []
# ── SAKT evaluation ──────────────────────────────────────────
if self.pipeline._model is not None:
kt_metrics.append(
self._eval_sakt(test_sequences)
)
# ── Baselines ────────────────────────────────────────────────
if include_baselines:
kt_metrics.append(self._eval_random(test_sequences))
kt_metrics.append(self._eval_bkt(test_sequences))
pop = PopularityBaseline()
pop.fit(train_sequences or test_sequences, skill_to_topic)
kt_metrics.append(self._eval_popularity(test_sequences, pop))
# ── Recommendation metrics ───────────────────────────────────
rec_metrics = self._eval_recommendations(test_sequences, skill_to_topic)
return EvaluationReport(
kt_metrics=kt_metrics,
rec_metrics=rec_metrics,
config={
"threshold": self.pipeline.threshold,
"soft_threshold": self.pipeline.soft_threshold,
"top_n": self.pipeline.top_n,
"n_test_students": len(test_sequences),
},
timestamp=datetime.datetime.now().isoformat(),
)
# ── KT evaluation helpers ─────────────────────────────────────────────────
def _eval_sakt(self, sequences) -> KTMetrics:
t0 = time.time()
all_probs, all_labels = [], []
for skill_seq, correct_seq in sequences:
if len(skill_seq) < 2:
continue
probs = self.pipeline._model.predict_mastery(skill_seq, correct_seq)
for skill_id, prob in probs.items():
if skill_id < len(correct_seq):
all_probs.append(prob)
all_labels.append(float(correct_seq[skill_id]))
return self._compute_kt_metrics("SAKT", all_probs, all_labels, time.time() - t0)
def _eval_random(self, sequences) -> KTMetrics:
t0 = time.time()
all_probs, all_labels = [], []
for skill_seq, correct_seq in sequences:
for correct in correct_seq[1:]:
all_probs.append(0.5)
all_labels.append(float(correct))
return self._compute_kt_metrics("Random (baseline)", all_probs, all_labels, time.time() - t0)
def _eval_bkt(self, sequences) -> KTMetrics:
t0 = time.time()
all_probs, all_labels = [], []
bkt = BKTBaseline()
for skill_seq, correct_seq in sequences:
if len(skill_seq) < 2:
continue
probs = bkt.predict_sequence(skill_seq, correct_seq)
labels = [float(c) for c in correct_seq[1:len(probs) + 1]]
all_probs.extend(probs)
all_labels.extend(labels)
return self._compute_kt_metrics("BKT (baseline)", all_probs, all_labels, time.time() - t0)
def _eval_popularity(self, sequences, pop: PopularityBaseline) -> KTMetrics:
t0 = time.time()
all_probs, all_labels = [], []
for skill_seq, correct_seq in sequences:
for skill, correct in zip(skill_seq[1:], correct_seq[1:]):
all_probs.append(pop.predict_prob(skill))
all_labels.append(float(correct))
return self._compute_kt_metrics("Popularity (baseline)", all_probs, all_labels, time.time() - t0)
@staticmethod
def _compute_kt_metrics(name, probs, labels, elapsed) -> KTMetrics:
probs_arr = np.nan_to_num(np.array(probs), nan=0.5)
labels_arr = np.nan_to_num(np.array(labels), nan=0.0)
n = len(probs_arr)
if HAS_SKLEARN and n > 0 and len(np.unique(labels_arr)) > 1:
auc = float(roc_auc_score(labels_arr, probs_arr))
acc = float(accuracy_score(labels_arr, (probs_arr >= 0.5).astype(int)))
loss = float(log_loss(labels_arr, np.clip(probs_arr, 1e-7, 1 - 1e-7)))
else:
auc = 0.5
acc = float(((probs_arr >= 0.5) == labels_arr).mean()) if n > 0 else 0.0
loss = float(-np.mean(
labels_arr * np.log(probs_arr + 1e-7) +
(1 - labels_arr) * np.log(1 - probs_arr + 1e-7)
)) if n > 0 else 0.0
return KTMetrics(
model_name=name, auc=auc, accuracy=acc,
log_loss=loss, n_samples=n, elapsed_s=elapsed,
)
# ── Recommendation evaluation ─────────────────────────────────────────────
def _eval_recommendations(
self,
sequences,
skill_to_topic,
) -> RecommendMetrics:
violation_rates, coverages, downstreams, mastery_rates = [], [], [], []
for skill_seq, correct_seq in sequences:
# Build mastery from sequence
if skill_to_topic:
topic_scores: dict[str, float] = {}
for skill, correct in zip(skill_seq, correct_seq):
topic = skill_to_topic.get(skill)
if topic and topic in self.curriculum.nodes:
topic_scores[topic] = max(topic_scores.get(topic, 0.0), float(correct))
mastery_scores = {n: 0.0 for n in self.curriculum.nodes}
mastery_scores.update(topic_scores)
else:
mastery_scores = {n: 0.0 for n in self.curriculum.nodes}
results = self.pipeline.recommend_from_mastery(mastery_scores)
stats = results["stats"]
summary = results["mastery_summary"]
violation_rates.append(stats["prerequisite_violation_rate"])
mastery_rates.append(summary["mastery_rate"])
# Coverage: fraction of curriculum represented in approved+challenging
rec_topics = set(
r["topic_id"] for r in results["approved"] + results["challenging"]
)
coverages.append(len(rec_topics) / max(self.curriculum.num_nodes, 1))
# Avg downstream unlock value
if results["approved"]:
downstreams.append(
np.mean([r["downstream_count"] for r in results["approved"]])
)
return RecommendMetrics(
violation_rate=float(np.mean(violation_rates)) if violation_rates else 0.0,
coverage=float(np.mean(coverages)) if coverages else 0.0,
avg_downstream=float(np.mean(downstreams)) if downstreams else 0.0,
mastery_rate=float(np.mean(mastery_rates)) if mastery_rates else 0.0,
)