classquota / evaluator.py
muhalwan's picture
Revised version
6a0a429
import logging
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from sklearn.metrics import mean_absolute_error, mean_squared_error
from config import Config
logger = logging.getLogger(__name__)
class Evaluator:
def __init__(self, config: Config):
self.config = config
def run_backtest(self, full_data: pd.DataFrame, predictor):
results = []
start_year: int = self.config.backtest.START_YEAR
end_year: int = self.config.backtest.END_YEAR
class_capacity = self.config.class_capacity.DEFAULT_CLASS_CAPACITY
for year in range(start_year, end_year + 1):
for smt in [1, 2]:
target_mask = (full_data["thn"] == year) & (full_data["smt"] == smt)
test_set = full_data[target_mask]
if test_set.empty:
continue
train_set = full_data[
(full_data["thn"] < year)
| ((full_data["thn"] == year) & (full_data["smt"] < smt))
]
try:
pop_est = predictor.get_student_forecast(year, smt)
except Exception:
pop_est = test_set["jumlah_aktif"].mean()
for _, row in test_set.iterrows():
pred = predictor.predict_course(
row["kode_mk"], train_set, year, smt, pop_est
)
actual_enrollment = row["enrollment"]
predicted_enrollment = pred["val"]
actual_classes = self._calculate_classes(
actual_enrollment, class_capacity
)
predicted_classes = pred.get(
"classes_needed",
self._calculate_classes(predicted_enrollment, class_capacity),
)
results.append(
{
"year": year,
"semester": smt,
"kode_mk": row["kode_mk"],
"actual": actual_enrollment,
"predicted": predicted_enrollment,
"actual_classes": actual_classes,
"predicted_classes": predicted_classes,
"strategy": pred["strategy"],
"error": abs(actual_enrollment - predicted_enrollment),
"class_error": abs(actual_classes - predicted_classes),
}
)
return pd.DataFrame(results)
def _calculate_classes(self, enrollment: float, capacity: int) -> int:
if enrollment < self.config.class_capacity.MIN_STUDENTS_TO_OPEN_CLASS:
return 0
return int(np.ceil(enrollment / capacity))
def generate_metrics(self, results: pd.DataFrame):
if results.empty:
logger.warning("No results to generate metrics from")
return {"mae": 0, "rmse": 0, "class_mae": 0, "class_accuracy": 0}
results["error"] = abs(results["predicted"] - results["actual"])
results["class_error"] = abs(
results["predicted_classes"] - results["actual_classes"]
)
# Enrollment metrics
mae = mean_absolute_error(results["actual"], results["predicted"])
rmse = np.sqrt(mean_squared_error(results["actual"], results["predicted"]))
# Class count metrics
class_mae = results["class_error"].mean()
# Class accuracy: percentage of predictions with correct class count
class_correct = (results["class_error"] == 0).sum()
class_accuracy = (class_correct / len(results)) * 100 if len(results) > 0 else 0
# Class accuracy within 1: predictions within ±1 class
class_within_1 = (results["class_error"] <= 1).sum()
class_accuracy_within_1 = (
(class_within_1 / len(results)) * 100 if len(results) > 0 else 0
)
logger.info("BACKTEST METRICS")
logger.info("\nEnrollment Prediction Metrics:")
logger.info(f" Overall MAE: {mae:.2f} students")
logger.info(f" Overall RMSE: {rmse:.2f} students")
logger.info("\nClass Count Prediction Metrics:")
logger.info(f" Class MAE: {class_mae:.2f} classes")
logger.info(f" Exact Class Match: {class_accuracy:.1f}%")
logger.info(f" Within ±1 Class: {class_accuracy_within_1:.1f}%")
logger.info("\nPerformance by Strategy:")
strat_perf = (
results.groupby("strategy")
.agg({"error": "mean", "class_error": "mean"})
.round(2)
)
strat_perf.columns = ["Avg Enrollment Error", "Avg Class Error"]
logger.info(strat_perf.to_string())
logger.info("=" * 50)
self._plot_results(results)
self._plot_class_results(results)
return {
"mae": mae,
"rmse": rmse,
"class_mae": class_mae,
"class_accuracy": class_accuracy,
"class_accuracy_within_1": class_accuracy_within_1,
}
def _plot_results(self, df):
Path(self.config.output.OUTPUT_DIR).mkdir(parents=True, exist_ok=True)
plt.figure(figsize=(10, 6))
sns.scatterplot(
data=df,
x="actual",
y="predicted",
hue="strategy",
style="strategy",
alpha=0.7,
)
limit = max(df["actual"].max(), df["predicted"].max())
plt.plot([0, limit], [0, limit], "r--", alpha=0.5, label="Perfect Prediction")
plt.title("Actual vs Predicted Enrollment")
plt.xlabel("Actual Enrollment")
plt.ylabel("Predicted Enrollment")
plt.legend(bbox_to_anchor=(1.05, 1), loc="upper left")
plt.tight_layout()
plt.savefig(
f"{self.config.output.OUTPUT_DIR}/backtest_enrollment_scatter.png", dpi=150
)
plt.close()
def _plot_class_results(self, df):
Path(self.config.output.OUTPUT_DIR).mkdir(parents=True, exist_ok=True)
plt.figure(figsize=(10, 6))
jitter_strength = 0.1
df_plot = df.copy()
df_plot["actual_jitter"] = df_plot["actual_classes"] + np.random.uniform(
-jitter_strength, jitter_strength, len(df_plot)
)
df_plot["predicted_jitter"] = df_plot["predicted_classes"] + np.random.uniform(
-jitter_strength, jitter_strength, len(df_plot)
)
sns.scatterplot(
data=df_plot,
x="actual_jitter",
y="predicted_jitter",
hue="strategy",
style="strategy",
alpha=0.7,
)
limit = max(df["actual_classes"].max(), df["predicted_classes"].max()) + 1
plt.plot([0, limit], [0, limit], "r--", alpha=0.5, label="Perfect Prediction")
plt.title("Actual vs Predicted Number of Classes")
plt.xlabel("Actual Classes Needed")
plt.ylabel("Predicted Classes Needed")
plt.legend(bbox_to_anchor=(1.05, 1), loc="upper left")
plt.tight_layout()
plt.savefig(
f"{self.config.output.OUTPUT_DIR}/backtest_classes_scatter.png", dpi=150
)
plt.close()
def generate_class_capacity_report(self, results: pd.DataFrame) -> pd.DataFrame:
if results.empty:
return pd.DataFrame()
course_summary = (
results.groupby("kode_mk")
.agg(
{
"actual": ["mean", "sum", "count"],
"predicted": ["mean", "sum"],
"actual_classes": ["mean", "sum"],
"predicted_classes": ["mean", "sum"],
"class_error": ["mean", "sum"],
}
)
.round(2)
)
course_summary.columns = [
"avg_actual_enrollment",
"total_actual_enrollment",
"n_semesters",
"avg_predicted_enrollment",
"total_predicted_enrollment",
"avg_actual_classes",
"total_actual_classes",
"avg_predicted_classes",
"total_predicted_classes",
"avg_class_error",
"total_class_error",
]
course_summary = course_summary.reset_index()
course_summary = course_summary.sort_values(
"total_class_error", ascending=False
)
return course_summary
def analyze_capacity_trends(self, full_data: pd.DataFrame) -> pd.DataFrame:
class_capacity = self.config.class_capacity.DEFAULT_CLASS_CAPACITY
trend_data = full_data.copy()
trend_data["classes_needed"] = trend_data["enrollment"].apply(
lambda x: self._calculate_classes(x, class_capacity)
)
course_trends = []
for course in trend_data["kode_mk"].unique():
course_data = trend_data[trend_data["kode_mk"] == course].sort_values(
["thn", "smt"]
)
if len(course_data) < 2:
continue
first_year = course_data.iloc[0]
last_year = course_data.iloc[-1]
enrollment_growth = last_year["enrollment"] - first_year["enrollment"]
class_growth = last_year["classes_needed"] - first_year["classes_needed"]
years_diff = last_year["thn"] - first_year["thn"]
if years_diff > 0 and first_year["enrollment"] > 0:
annual_growth_rate = (
(last_year["enrollment"] / first_year["enrollment"])
** (1 / years_diff)
- 1
) * 100
else:
annual_growth_rate = 0
course_trends.append(
{
"kode_mk": course,
"first_enrollment": first_year["enrollment"],
"last_enrollment": last_year["enrollment"],
"enrollment_growth": enrollment_growth,
"first_classes": first_year["classes_needed"],
"last_classes": last_year["classes_needed"],
"class_growth": class_growth,
"annual_growth_rate": round(annual_growth_rate, 1),
"data_points": len(course_data),
"year_range": f"{int(first_year['thn'])}-{int(last_year['thn'])}",
}
)
trends_df = pd.DataFrame(course_trends)
if not trends_df.empty:
trends_df = trends_df.sort_values("annual_growth_rate", ascending=False)
return trends_df