classquota / backend.py
muhalwan's picture
Revised version
6a0a429
import logging
from dataclasses import dataclass
from typing import Dict, Optional, Tuple
import pandas as pd
from config import Config
from data_processor import DataProcessor
from evaluator import Evaluator
from prophet_predictor import ProphetPredictor
from utils import setup_logging
setup_logging("INFO")
logger = logging.getLogger("Backend")
@dataclass
class PredictionResult:
summary_data: Dict
predictions_df: pd.DataFrame
comparison_df: Optional[pd.DataFrame]
has_actual_data: bool
error: Optional[str] = None
@dataclass
class ForecastResult:
summary_data: Dict
forecast_df: pd.DataFrame
yearly_summary: pd.DataFrame
error: Optional[str] = None
class PredictionBackend:
def __init__(self):
self._processor: Optional[DataProcessor] = None
self._predictor: Optional[ProphetPredictor] = None
self._config: Optional[Config] = None
self._df_enrollment: Optional[pd.DataFrame] = None
self._elective_codes: Optional[set] = None
self._backtest_metrics: Optional[dict] = None
self._initialized: bool = False
@property
def is_initialized(self) -> bool:
return self._initialized
@property
def config(self) -> Optional[Config]:
return self._config
def initialize(self) -> bool:
try:
logger.info("Initializing prediction system...")
self._config = Config()
self._processor = DataProcessor(self._config)
self._df_enrollment, self._elective_codes = (
self._processor.load_and_process()
)
self._predictor = ProphetPredictor(self._config)
self._predictor.train_student_population_model(
self._processor.raw_data["students_yearly"]
)
self._initialized = True
logger.info("System initialized successfully")
return True
except Exception as e:
logger.error(f"Failed to initialize system: {e}", exc_info=True)
self._initialized = False
return False
def get_data_info(self) -> Dict:
if not self._initialized or self._processor is None or self._config is None:
return {"error": "System not initialized"}
try:
courses = self._processor.raw_data.get("courses")
students = self._processor.raw_data.get("students_yearly")
if courses is None or students is None:
return {"error": "Data not loaded"}
elective_courses = courses[courses["kategori_mk"] == "P"]
return {
"total_courses": len(courses),
"elective_courses": len(elective_courses),
"class_capacity": self._config.class_capacity.DEFAULT_CLASS_CAPACITY,
"year_min": int(students["thn"].min()),
"year_max": int(students["thn"].max()),
}
except Exception as e:
return {"error": str(e)}
def _run_backtest_if_needed(self) -> Dict:
if self._backtest_metrics is not None:
return self._backtest_metrics
if (
self._config is None
or self._df_enrollment is None
or self._predictor is None
):
logger.warning("System not initialized, using default metrics")
self._backtest_metrics = {"mae": 0, "rmse": 0}
return self._backtest_metrics
logger.info("Running backtest for the first time...")
evaluator = Evaluator(self._config)
backtest_results = evaluator.run_backtest(self._df_enrollment, self._predictor)
if backtest_results is None or len(backtest_results) == 0:
logger.warning("Backtest returned no results, using defaults")
self._backtest_metrics = {"mae": 0, "rmse": 0}
else:
metrics_result = evaluator.generate_metrics(backtest_results)
if metrics_result is None:
logger.warning("Metrics calculation failed, using defaults")
self._backtest_metrics = {"mae": 0, "rmse": 0}
else:
self._backtest_metrics = metrics_result
return self._backtest_metrics
def _get_actual_data(self, year: int, semester: int) -> Tuple[pd.DataFrame, bool]:
if self._df_enrollment is None:
return pd.DataFrame(), False
actual_data = self._df_enrollment[
(self._df_enrollment["thn"] == year)
& (self._df_enrollment["smt"] == semester)
]
return actual_data, len(actual_data) > 0
def _calculate_class_metrics(
self,
courses_with_actual: pd.DataFrame,
year: int,
semester: int,
) -> Dict:
if self._processor is None or self._config is None:
return {
"class_matches": 0,
"class_within_one": 0,
"total_for_class_accuracy": 0,
"class_accuracy_pct": 0,
"class_within_one_pct": 0,
"has_actual_class_data": False,
"data_source": "kalkulasi",
}
actual_classes_df = self._processor.get_class_count_for_validation(
year, semester
)
has_actual_class_data = False
courses_with_class_data: Optional[pd.DataFrame] = None
if len(actual_classes_df) > 0:
courses_with_actual = courses_with_actual.merge(
actual_classes_df, on="kode_mk", how="left"
)
has_actual_class_data = courses_with_actual["actual_classes"].notna().any()
if has_actual_class_data:
courses_with_class_data = courses_with_actual[
courses_with_actual["actual_classes"].notna()
].copy()
courses_with_class_data["actual_classes"] = courses_with_class_data[
"actual_classes"
].astype(int)
class_matches = (
courses_with_class_data["classes_needed"]
== courses_with_class_data["actual_classes"]
).sum()
total_for_class_accuracy = len(courses_with_class_data)
else:
config = self._config
courses_with_actual["actual_classes_calc"] = courses_with_actual.apply(
lambda row: config.calculate_classes_needed(
row["actual_enrollment"],
row["kode_mk"],
has_historical_data=True,
),
axis=1,
)
class_matches = (
courses_with_actual["classes_needed"]
== courses_with_actual["actual_classes_calc"]
).sum()
total_for_class_accuracy = len(courses_with_actual)
class_accuracy_pct = (
(class_matches / total_for_class_accuracy) * 100
if total_for_class_accuracy > 0
else 0
)
if has_actual_class_data and courses_with_class_data is not None:
class_within_one = (
abs(
courses_with_class_data["classes_needed"]
- courses_with_class_data["actual_classes"]
)
<= 1
).sum()
else:
class_within_one = (
abs(
courses_with_actual["classes_needed"]
- courses_with_actual["actual_classes_calc"]
)
<= 1
).sum()
class_within_one_pct = (
(class_within_one / total_for_class_accuracy) * 100
if total_for_class_accuracy > 0
else 0
)
return {
"class_matches": int(class_matches),
"class_within_one": int(class_within_one),
"total_for_class_accuracy": total_for_class_accuracy,
"class_accuracy_pct": class_accuracy_pct,
"class_within_one_pct": class_within_one_pct,
"has_actual_class_data": has_actual_class_data,
"data_source": "tabel2" if has_actual_class_data else "kalkulasi",
}
def _prepare_comparison_table(
self,
predictions: pd.DataFrame,
actual_data: pd.DataFrame,
year: int,
semester: int,
) -> Optional[pd.DataFrame]:
if self._processor is None or self._config is None:
return None
comparison = predictions.merge(
actual_data[["kode_mk", "enrollment"]], on="kode_mk", how="left"
)
comparison = comparison.rename(columns={"enrollment": "actual_enrollment"})
actual_classes_df = self._processor.get_class_count_for_validation(
year, semester
)
if len(actual_classes_df) > 0:
comparison = comparison.merge(actual_classes_df, on="kode_mk", how="left")
else:
comparison["actual_classes"] = None
courses_with_actual = comparison[comparison["actual_enrollment"].notna()].copy()
if len(courses_with_actual) == 0:
return None
courses_with_actual["error"] = (
courses_with_actual["predicted_enrollment"]
- courses_with_actual["actual_enrollment"]
)
courses_with_actual["abs_error"] = abs(courses_with_actual["error"])
courses_with_actual["accuracy_%"] = 100 * (
1
- courses_with_actual["abs_error"]
/ courses_with_actual["actual_enrollment"].replace(0, 1)
)
if (
"actual_classes" not in courses_with_actual.columns
or courses_with_actual["actual_classes"].isna().all()
):
config_ref = self._config
courses_with_actual["actual_classes"] = courses_with_actual.apply(
lambda row: config_ref.calculate_classes_needed(
row["actual_enrollment"],
row["kode_mk"],
has_historical_data=True,
),
axis=1,
)
else:
config_ref = self._config
courses_with_actual["actual_classes"] = courses_with_actual.apply(
lambda row: (
int(row["actual_classes"])
if pd.notna(row["actual_classes"])
else config_ref.calculate_classes_needed(
row["actual_enrollment"],
row["kode_mk"],
has_historical_data=True,
)
),
axis=1,
)
courses_with_actual["class_diff"] = (
courses_with_actual["classes_needed"]
- courses_with_actual["actual_classes"]
)
comparison_display = courses_with_actual[
[
"kode_mk",
"nama_mk",
"actual_enrollment",
"predicted_enrollment",
"actual_classes",
"classes_needed",
"class_diff",
"error",
"accuracy_%",
"strategy",
]
].copy()
comparison_display.columns = [
"Kode MK",
"Nama MK",
"Aktual",
"Prediksi",
"Kelas Aktual",
"Kelas Prediksi",
"Selisih Kelas",
"Error",
"Akurasi %",
"Strategy",
]
comparison_display["Aktual"] = comparison_display["Aktual"].astype(int)
comparison_display["Prediksi"] = comparison_display["Prediksi"].round(1)
comparison_display["Error"] = comparison_display["Error"].round(1)
comparison_display["Akurasi %"] = comparison_display["Akurasi %"].round(1)
comparison_display["Kelas Aktual"] = comparison_display["Kelas Aktual"].astype(
int
)
comparison_display["Kelas Prediksi"] = comparison_display[
"Kelas Prediksi"
].astype(int)
comparison_display["Selisih Kelas"] = comparison_display[
"Selisih Kelas"
].astype(int)
return comparison_display.sort_values("Aktual", ascending=False)
def _prepare_predictions_display(self, predictions: pd.DataFrame) -> pd.DataFrame:
"""Prepare predictions dataframe for display."""
display_df = predictions[
[
"kode_mk",
"nama_mk",
"predicted_enrollment",
"classes_needed",
"class_capacity",
"total_quota",
"utilization_pct",
"recommendation",
"confidence",
"strategy",
]
].copy()
display_df.columns = [
"Kode MK",
"Nama MK",
"Prediksi",
"Jumlah Kelas",
"Kapasitas/Kelas",
"Total Kuota",
"Utilization %",
"Status",
"Confidence",
"Strategy",
]
display_df["Prediksi"] = display_df["Prediksi"].round(1)
display_df["Jumlah Kelas"] = display_df["Jumlah Kelas"].astype(int)
display_df["Total Kuota"] = display_df["Total Kuota"].astype(int)
display_df["Status"] = display_df["Status"].map(
{"BUKA": "BUKA", "TUTUP": "TUTUP"}
)
display_df = display_df[display_df["Confidence"] == "high"]
display_df = display_df[display_df["Status"] == "BUKA"]
display_df = display_df.sort_values("Prediksi", ascending=False)
display_df = display_df.drop(columns=["Confidence", "Status"])
return display_df
def generate_predictions(self, year: int, semester: int) -> PredictionResult:
if semester not in [1, 2]:
return PredictionResult(
summary_data={},
predictions_df=pd.DataFrame(),
comparison_df=None,
has_actual_data=False,
error="Semester harus 1 (Ganjil) atau 2 (Genap)",
)
if year < 2020 or year > 2030:
return PredictionResult(
summary_data={},
predictions_df=pd.DataFrame(),
comparison_df=None,
has_actual_data=False,
error="Year must be between 2020 and 2030",
)
if not self._initialized:
return PredictionResult(
summary_data={},
predictions_df=pd.DataFrame(),
comparison_df=None,
has_actual_data=False,
error="System not initialized. Please restart the app.",
)
try:
logger.info(f"Generating predictions for {year} Semester {semester}...")
assert self._config is not None
assert self._predictor is not None
assert self._processor is not None
assert self._df_enrollment is not None
assert self._elective_codes is not None
self._config.prediction.PREDICT_YEAR = year
self._config.prediction.PREDICT_SEMESTER = semester
actual_data, has_actual_data = self._get_actual_data(year, semester)
if has_actual_data:
logger.info(
f"Found actual enrollment data for {year} Semester {semester}"
)
else:
logger.info(f"No actual data for {year} Semester {semester}")
metrics = self._run_backtest_if_needed()
predictions = self._predictor.generate_batch_predictions(
self._df_enrollment,
self._processor.raw_data["courses"],
self._elective_codes,
year,
semester,
)
open_courses = predictions[predictions["recommendation"] == "BUKA"]
total_to_open = len(open_courses)
total_classes = int(open_courses["classes_needed"].sum())
total_predicted_students = int(open_courses["predicted_enrollment"].sum())
total_capacity = int(open_courses["total_quota"].sum())
class_capacity = self._config.class_capacity.DEFAULT_CLASS_CAPACITY
summary_data = {
"year": year,
"semester": semester,
"semester_name": "1 (Ganjil)" if semester == 1 else "2 (Genap)",
"total_to_open": total_to_open,
"total_classes": total_classes,
"total_predicted_students": total_predicted_students,
"total_capacity": total_capacity,
"class_capacity": class_capacity,
"metrics": metrics,
"has_actual_data": has_actual_data,
}
comparison_df = None
if has_actual_data:
comparison = predictions.merge(
actual_data[["kode_mk", "enrollment"]], on="kode_mk", how="left"
)
comparison = comparison.rename(
columns={"enrollment": "actual_enrollment"}
)
courses_with_actual = comparison[
comparison["actual_enrollment"].notna()
].copy()
if len(courses_with_actual) > 0:
comparison_mae = abs(
courses_with_actual["predicted_enrollment"]
- courses_with_actual["actual_enrollment"]
).mean()
comparison_rmse = (
(
courses_with_actual["predicted_enrollment"]
- courses_with_actual["actual_enrollment"]
)
** 2
).mean() ** 0.5
total_actual = courses_with_actual["actual_enrollment"].sum()
total_predicted = courses_with_actual["predicted_enrollment"].sum()
accuracy_pct = (
1 - abs(total_predicted - total_actual) / total_actual
) * 100
class_metrics = self._calculate_class_metrics(
courses_with_actual.copy(), year, semester
)
summary_data.update(
{
"comparison_mae": comparison_mae,
"comparison_rmse": comparison_rmse,
"total_actual": total_actual,
"total_predicted": total_predicted,
"accuracy_pct": accuracy_pct,
**class_metrics,
}
)
comparison_df = self._prepare_comparison_table(
predictions, actual_data, year, semester
)
predictions_display = self._prepare_predictions_display(predictions)
return PredictionResult(
summary_data=summary_data,
predictions_df=predictions_display,
comparison_df=comparison_df,
has_actual_data=has_actual_data,
)
except Exception as e:
logger.error(f"Error generating predictions: {e}", exc_info=True)
return PredictionResult(
summary_data={},
predictions_df=pd.DataFrame(),
comparison_df=None,
has_actual_data=False,
error=str(e),
)
def generate_multi_year_forecast(
self, year: int, semester: int, years_ahead: int = 3
) -> ForecastResult:
if not self._initialized:
return ForecastResult(
summary_data={},
forecast_df=pd.DataFrame(),
yearly_summary=pd.DataFrame(),
error="System not initialized.",
)
try:
logger.info(f"Generating {years_ahead}-year forecast from {year}...")
assert self._config is not None
assert self._predictor is not None
assert self._processor is not None
assert self._df_enrollment is not None
assert self._elective_codes is not None
forecast_df = self._predictor.generate_multi_year_forecast(
self._df_enrollment,
self._processor.raw_data["courses"],
self._elective_codes,
year,
semester,
years_ahead,
)
if forecast_df.empty:
return ForecastResult(
summary_data={},
forecast_df=pd.DataFrame(),
yearly_summary=pd.DataFrame(),
error="Tidak ada data untuk forecast.",
)
yearly_summary = (
forecast_df.groupby("year")
.agg(
{
"predicted_enrollment": "sum",
"classes_needed": "sum",
"total_capacity": "sum",
"kode_mk": "count",
}
)
.reset_index()
)
yearly_summary.columns = [
"Tahun",
"Total Prediksi",
"Total Kelas",
"Total Kapasitas",
"Jumlah MK",
]
class_capacity = self._config.class_capacity.DEFAULT_CLASS_CAPACITY
semester_name = "Ganjil" if semester == 1 else "Genap"
first_year = yearly_summary.iloc[0]
last_year = yearly_summary.iloc[-1]
growth_classes = int(last_year["Total Kelas"] - first_year["Total Kelas"])
growth_students = int(
last_year["Total Prediksi"] - first_year["Total Prediksi"]
)
summary_data = {
"year": year,
"semester": semester,
"semester_name": semester_name,
"years_ahead": years_ahead,
"class_capacity": class_capacity,
"first_year_classes": int(first_year["Total Kelas"]),
"last_year_classes": int(last_year["Total Kelas"]),
"growth_classes": growth_classes,
"growth_students": growth_students,
}
display_df = forecast_df[
[
"year",
"kode_mk",
"nama_mk",
"predicted_enrollment",
"classes_needed",
"total_capacity",
]
].copy()
display_df.columns = [
"Tahun",
"Kode MK",
"Nama MK",
"Prediksi",
"Kelas",
"Kapasitas",
]
display_df["Prediksi"] = display_df["Prediksi"].round(0).astype(int)
display_df = display_df.sort_values(["Kode MK", "Tahun"])
return ForecastResult(
summary_data=summary_data,
forecast_df=display_df,
yearly_summary=yearly_summary,
)
except Exception as e:
logger.error(f"Error generating forecast: {e}", exc_info=True)
return ForecastResult(
summary_data={},
forecast_df=pd.DataFrame(),
yearly_summary=pd.DataFrame(),
error=str(e),
)
_backend_instance: Optional[PredictionBackend] = None
def get_backend() -> PredictionBackend:
"""Get the singleton backend instance."""
global _backend_instance
if _backend_instance is None:
_backend_instance = PredictionBackend()
return _backend_instance