import logging from pathlib import Path import matplotlib.pyplot as plt import numpy as np import pandas as pd import seaborn as sns from sklearn.metrics import mean_absolute_error, mean_squared_error from config import Config logger = logging.getLogger(__name__) class Evaluator: def __init__(self, config: Config): self.config = config def run_backtest(self, full_data: pd.DataFrame, predictor): results = [] start_year: int = self.config.backtest.START_YEAR end_year: int = self.config.backtest.END_YEAR class_capacity = self.config.class_capacity.DEFAULT_CLASS_CAPACITY for year in range(start_year, end_year + 1): for smt in [1, 2]: target_mask = (full_data["thn"] == year) & (full_data["smt"] == smt) test_set = full_data[target_mask] if test_set.empty: continue train_set = full_data[ (full_data["thn"] < year) | ((full_data["thn"] == year) & (full_data["smt"] < smt)) ] try: pop_est = predictor.get_student_forecast(year, smt) except Exception: pop_est = test_set["jumlah_aktif"].mean() for _, row in test_set.iterrows(): pred = predictor.predict_course( row["kode_mk"], train_set, year, smt, pop_est ) actual_enrollment = row["enrollment"] predicted_enrollment = pred["val"] actual_classes = self._calculate_classes( actual_enrollment, class_capacity ) predicted_classes = pred.get( "classes_needed", self._calculate_classes(predicted_enrollment, class_capacity), ) results.append( { "year": year, "semester": smt, "kode_mk": row["kode_mk"], "actual": actual_enrollment, "predicted": predicted_enrollment, "actual_classes": actual_classes, "predicted_classes": predicted_classes, "strategy": pred["strategy"], "error": abs(actual_enrollment - predicted_enrollment), "class_error": abs(actual_classes - predicted_classes), } ) return pd.DataFrame(results) def _calculate_classes(self, enrollment: float, capacity: int) -> int: if enrollment < self.config.class_capacity.MIN_STUDENTS_TO_OPEN_CLASS: return 0 return int(np.ceil(enrollment / capacity)) def generate_metrics(self, results: pd.DataFrame): if results.empty: logger.warning("No results to generate metrics from") return {"mae": 0, "rmse": 0, "class_mae": 0, "class_accuracy": 0} results["error"] = abs(results["predicted"] - results["actual"]) results["class_error"] = abs( results["predicted_classes"] - results["actual_classes"] ) # Enrollment metrics mae = mean_absolute_error(results["actual"], results["predicted"]) rmse = np.sqrt(mean_squared_error(results["actual"], results["predicted"])) # Class count metrics class_mae = results["class_error"].mean() # Class accuracy: percentage of predictions with correct class count class_correct = (results["class_error"] == 0).sum() class_accuracy = (class_correct / len(results)) * 100 if len(results) > 0 else 0 # Class accuracy within 1: predictions within ±1 class class_within_1 = (results["class_error"] <= 1).sum() class_accuracy_within_1 = ( (class_within_1 / len(results)) * 100 if len(results) > 0 else 0 ) logger.info("BACKTEST METRICS") logger.info("\nEnrollment Prediction Metrics:") logger.info(f" Overall MAE: {mae:.2f} students") logger.info(f" Overall RMSE: {rmse:.2f} students") logger.info("\nClass Count Prediction Metrics:") logger.info(f" Class MAE: {class_mae:.2f} classes") logger.info(f" Exact Class Match: {class_accuracy:.1f}%") logger.info(f" Within ±1 Class: {class_accuracy_within_1:.1f}%") logger.info("\nPerformance by Strategy:") strat_perf = ( results.groupby("strategy") .agg({"error": "mean", "class_error": "mean"}) .round(2) ) strat_perf.columns = ["Avg Enrollment Error", "Avg Class Error"] logger.info(strat_perf.to_string()) logger.info("=" * 50) self._plot_results(results) self._plot_class_results(results) return { "mae": mae, "rmse": rmse, "class_mae": class_mae, "class_accuracy": class_accuracy, "class_accuracy_within_1": class_accuracy_within_1, } def _plot_results(self, df): Path(self.config.output.OUTPUT_DIR).mkdir(parents=True, exist_ok=True) plt.figure(figsize=(10, 6)) sns.scatterplot( data=df, x="actual", y="predicted", hue="strategy", style="strategy", alpha=0.7, ) limit = max(df["actual"].max(), df["predicted"].max()) plt.plot([0, limit], [0, limit], "r--", alpha=0.5, label="Perfect Prediction") plt.title("Actual vs Predicted Enrollment") plt.xlabel("Actual Enrollment") plt.ylabel("Predicted Enrollment") plt.legend(bbox_to_anchor=(1.05, 1), loc="upper left") plt.tight_layout() plt.savefig( f"{self.config.output.OUTPUT_DIR}/backtest_enrollment_scatter.png", dpi=150 ) plt.close() def _plot_class_results(self, df): Path(self.config.output.OUTPUT_DIR).mkdir(parents=True, exist_ok=True) plt.figure(figsize=(10, 6)) jitter_strength = 0.1 df_plot = df.copy() df_plot["actual_jitter"] = df_plot["actual_classes"] + np.random.uniform( -jitter_strength, jitter_strength, len(df_plot) ) df_plot["predicted_jitter"] = df_plot["predicted_classes"] + np.random.uniform( -jitter_strength, jitter_strength, len(df_plot) ) sns.scatterplot( data=df_plot, x="actual_jitter", y="predicted_jitter", hue="strategy", style="strategy", alpha=0.7, ) limit = max(df["actual_classes"].max(), df["predicted_classes"].max()) + 1 plt.plot([0, limit], [0, limit], "r--", alpha=0.5, label="Perfect Prediction") plt.title("Actual vs Predicted Number of Classes") plt.xlabel("Actual Classes Needed") plt.ylabel("Predicted Classes Needed") plt.legend(bbox_to_anchor=(1.05, 1), loc="upper left") plt.tight_layout() plt.savefig( f"{self.config.output.OUTPUT_DIR}/backtest_classes_scatter.png", dpi=150 ) plt.close() def generate_class_capacity_report(self, results: pd.DataFrame) -> pd.DataFrame: if results.empty: return pd.DataFrame() course_summary = ( results.groupby("kode_mk") .agg( { "actual": ["mean", "sum", "count"], "predicted": ["mean", "sum"], "actual_classes": ["mean", "sum"], "predicted_classes": ["mean", "sum"], "class_error": ["mean", "sum"], } ) .round(2) ) course_summary.columns = [ "avg_actual_enrollment", "total_actual_enrollment", "n_semesters", "avg_predicted_enrollment", "total_predicted_enrollment", "avg_actual_classes", "total_actual_classes", "avg_predicted_classes", "total_predicted_classes", "avg_class_error", "total_class_error", ] course_summary = course_summary.reset_index() course_summary = course_summary.sort_values( "total_class_error", ascending=False ) return course_summary def analyze_capacity_trends(self, full_data: pd.DataFrame) -> pd.DataFrame: class_capacity = self.config.class_capacity.DEFAULT_CLASS_CAPACITY trend_data = full_data.copy() trend_data["classes_needed"] = trend_data["enrollment"].apply( lambda x: self._calculate_classes(x, class_capacity) ) course_trends = [] for course in trend_data["kode_mk"].unique(): course_data = trend_data[trend_data["kode_mk"] == course].sort_values( ["thn", "smt"] ) if len(course_data) < 2: continue first_year = course_data.iloc[0] last_year = course_data.iloc[-1] enrollment_growth = last_year["enrollment"] - first_year["enrollment"] class_growth = last_year["classes_needed"] - first_year["classes_needed"] years_diff = last_year["thn"] - first_year["thn"] if years_diff > 0 and first_year["enrollment"] > 0: annual_growth_rate = ( (last_year["enrollment"] / first_year["enrollment"]) ** (1 / years_diff) - 1 ) * 100 else: annual_growth_rate = 0 course_trends.append( { "kode_mk": course, "first_enrollment": first_year["enrollment"], "last_enrollment": last_year["enrollment"], "enrollment_growth": enrollment_growth, "first_classes": first_year["classes_needed"], "last_classes": last_year["classes_needed"], "class_growth": class_growth, "annual_growth_rate": round(annual_growth_rate, 1), "data_points": len(course_data), "year_range": f"{int(first_year['thn'])}-{int(last_year['thn'])}", } ) trends_df = pd.DataFrame(course_trends) if not trends_df.empty: trends_df = trends_df.sort_values("annual_growth_rate", ascending=False) return trends_df