import logging from typing import Dict, List, Optional, Tuple import numpy as np import pandas as pd from prophet import Prophet from config import Config logger = logging.getLogger(__name__) class ProphetPredictor: def __init__(self, config: Config): self.config = config self.student_model: Optional[Prophet] = None def train_student_population_model(self, population_data: pd.DataFrame): df = population_data.copy() df["ds"] = pd.to_datetime( df["thn"].astype(str) + "-" + df["smt"].map(self.config.prediction.SEMESTER_TO_MONTH) ) df["y"] = df["jumlah_aktif"] self.student_model = Prophet( growth="linear", daily_seasonality=False, # type: ignore[arg-type] weekly_seasonality=False, # type: ignore[arg-type] yearly_seasonality=True, # type: ignore[arg-type] ) self.student_model.fit(df[["ds", "y"]]) logger.info("Student population model trained.") def get_student_forecast(self, year: int, semester: int) -> float: assert self.student_model is not None, "Student model must be trained first" target_date = pd.to_datetime( f"{year}-{self.config.prediction.SEMESTER_TO_MONTH[semester]}" ) future = pd.DataFrame({"ds": [target_date]}) forecast = self.student_model.predict(future) return max(forecast["yhat"].values[0], 100) def get_multi_year_student_forecast( self, start_year: int, semester: int, years_ahead: int ) -> List[Tuple[int, float]]: assert self.student_model is not None, "Student model must be trained first" forecasts = [] for i in range(years_ahead + 1): year = start_year + i pop = self.get_student_forecast(year, semester) forecasts.append((year, pop)) return forecasts def predict_course( self, course_code: str, df_history: pd.DataFrame, target_year: int, target_smt: int, student_pop: float, ) -> dict: hist = df_history[ (df_history["kode_mk"] == course_code) & (df_history["smt"] == target_smt) ].sort_values(["thn", "smt"]) has_historical_data = len(hist) > 0 if not has_historical_data: return { "val": self.config.model.FALLBACK_DEFAULT, "strategy": "cold_start", "confidence": "low", "classes_needed": self.config.calculate_classes_needed( self.config.model.FALLBACK_DEFAULT, course_code, has_historical_data=False, ), "capacity_status": self.config.get_capacity_status( self.config.model.FALLBACK_DEFAULT, course_code ), } result = self._predict_prophet_with_capacity( hist, target_year, target_smt, student_pop, course_code ) result["classes_needed"] = self.config.calculate_classes_needed( result["val"], course_code, has_historical_data=has_historical_data ) result["capacity_status"] = self.config.get_capacity_status( result["val"], course_code ) return result def _predict_prophet_with_capacity( self, hist: pd.DataFrame, year: int, smt: int, pop: float, course_code: str ) -> dict: df = hist.copy() df["ds"] = pd.to_datetime( df["thn"].astype(int).astype(str) + "-" + df["smt"].astype(int).map(self.config.prediction.SEMESTER_TO_MONTH) ) df["y"] = df["enrollment"] if df["y"].notna().sum() < 2: return { "val": hist["enrollment"].mean(), "strategy": "fallback_mean", "confidence": "medium", } if "jumlah_aktif" not in df.columns: logger.warning( "jumlah_aktif column missing from historical data - cannot use regressor" ) return { "val": hist["enrollment"].mean(), "strategy": "fallback_mean", "confidence": "low", } hist_max = float(df["y"].max()) hist_mean = float(df["y"].mean()) class_capacity = self.config.get_class_capacity(course_code) cap_value = min( hist_max * self.config.prediction.MAX_CAPACITY_MULTIPLIER, self.config.prediction.ABSOLUTE_MAX_STUDENTS, ) if self.config.class_capacity.ENABLE_CAPACITY_CONSTRAINTS: max_realistic_cap = class_capacity * 4 cap_value = min(cap_value, max_realistic_cap) df["cap"] = cap_value df["floor"] = 0 try: m = Prophet( growth=self.config.model.GROWTH_MODE, changepoint_prior_scale=self.config.model.CHANGEPOINT_SCALE, seasonality_mode=self.config.model.SEASONALITY_MODE, daily_seasonality=False, # type: ignore[arg-type] weekly_seasonality=False, # type: ignore[arg-type] ) if self.config.model.USE_POPULATION_REGRESSOR: m.add_regressor("jumlah_aktif", mode="multiplicative") m.fit(df[["ds", "y", "cap", "floor", "jumlah_aktif"]]) else: m.fit(df[["ds", "y", "cap", "floor"]]) future_date = pd.to_datetime( f"{year}-{self.config.prediction.SEMESTER_TO_MONTH[smt]}" ) future = pd.DataFrame( { "ds": [future_date], "cap": [cap_value], "floor": [0], } ) if self.config.model.USE_POPULATION_REGRESSOR: future["jumlah_aktif"] = pop forecast = m.predict(future) raw_pred = forecast["yhat"].values[0] if ( raw_pred < 0 or not np.isfinite(raw_pred) or raw_pred > hist_max * 5 or raw_pred > cap_value * 2 ): logger.warning( f"Prophet prediction ({raw_pred:.1f}) unrealistic for {course_code}. " f"Using trend-based fallback. (hist_max={hist_max}, cap={cap_value})" ) pop_mean = float(df["jumlah_aktif"].mean()) if len(df) >= 3: recent_trend = float(df["y"].tail(3).mean()) pop_growth_factor = pop / pop_mean if pop_mean > 0 else 1.0 growth_factor = min(max(pop_growth_factor, 0.8), 1.3) pred = recent_trend * growth_factor else: pop_growth_factor = pop / pop_mean if pop_mean > 0 else 1.0 pred = hist_mean * min(max(pop_growth_factor, 0.8), 1.3) pred = min(max(pred, 0), cap_value) return { "val": pred, "strategy": "trend_fallback", "confidence": "medium", } pred = min(max(0, raw_pred), cap_value) return { "val": pred, "strategy": "prophet_logistic", "confidence": "high", } except Exception as e: logger.warning( f"Prophet failed for course {course_code}. Error: {e}. Using fallback." ) return { "val": hist["enrollment"].mean(), "strategy": "fallback_mean", "confidence": "medium", } def predict_multi_year( self, course_code: str, df_history: pd.DataFrame, start_year: int, target_smt: int, years_ahead: int = 3, ) -> List[Dict]: predictions = [] for i in range(years_ahead + 1): year = start_year + i pop = self.get_student_forecast(year, target_smt) pred = self.predict_course(course_code, df_history, year, target_smt, pop) pred["year"] = year pred["semester"] = target_smt pred["student_population"] = pop predictions.append(pred) return predictions def generate_batch_predictions( self, full_data: pd.DataFrame, course_metadata: pd.DataFrame, electives: set, year: int, smt: int, ) -> pd.DataFrame: student_pop = self.get_student_forecast(year, smt) results = [] logger.info( f"Predicting for {len(electives)} courses (Pop: {int(student_pop)})..." ) for code in electives: meta_rows = course_metadata[course_metadata["kode_mk"] == code] if len(meta_rows) == 0: logger.warning(f"No metadata found for course {code}, skipping") continue meta = meta_rows.iloc[0] pred_result = self.predict_course(code, full_data, year, smt, student_pop) pred_val = pred_result["val"] course_history = full_data[full_data["kode_mk"] == code] has_history = len(course_history) > 0 classes_needed = pred_result.get( "classes_needed", self.config.calculate_classes_needed( pred_val, code, has_historical_data=has_history ), ) course_capacity = self.config.get_class_capacity(code) if classes_needed > 0: rec_quota = classes_needed * course_capacity else: rec_quota = 0 min_threshold = self.config.class_capacity.MIN_STUDENTS_TO_OPEN_CLASS should_open = pred_val >= min_threshold or ( has_history and self.config.class_capacity.OPEN_CLASS_IF_HAS_HISTORY ) status = "BUKA" if should_open else "TUTUP" if classes_needed > 0: total_capacity = classes_needed * course_capacity utilization = (pred_val / total_capacity) * 100 else: utilization = 0 results.append( { "kode_mk": code, "nama_mk": meta["nama_mk"], "sks": meta.get("sks_mk", 0), "predicted_enrollment": round(pred_val, 1), "class_capacity": course_capacity, "classes_needed": classes_needed, "total_quota": rec_quota, "utilization_pct": round(utilization, 1), "recommendation": status, "capacity_status": pred_result.get("capacity_status", "NORMAL"), "strategy": pred_result["strategy"], "confidence": pred_result["confidence"], } ) return pd.DataFrame(results).sort_values( "predicted_enrollment", ascending=False ) def generate_multi_year_forecast( self, full_data: pd.DataFrame, course_metadata: pd.DataFrame, electives: set, start_year: int, smt: int, years_ahead: int = 3, ) -> pd.DataFrame: all_results = [] for code in electives: meta_rows = course_metadata[course_metadata["kode_mk"] == code] if len(meta_rows) == 0: continue meta = meta_rows.iloc[0] year_predictions = self.predict_multi_year( code, full_data, start_year, smt, years_ahead ) for pred in year_predictions: course_capacity = self.config.get_class_capacity(code) classes_needed = pred.get("classes_needed", 0) all_results.append( { "kode_mk": code, "nama_mk": meta["nama_mk"], "year": pred["year"], "semester": pred["semester"], "predicted_enrollment": round(pred["val"], 1), "classes_needed": classes_needed, "total_capacity": classes_needed * course_capacity, "student_population": round(pred["student_population"], 0), "strategy": pred["strategy"], "confidence": pred["confidence"], } ) return pd.DataFrame(all_results).sort_values(["kode_mk", "year"]) def get_course_trend_analysis( self, course_code: str, df_history: pd.DataFrame, target_smt: int, ) -> Dict: hist = df_history[ (df_history["kode_mk"] == course_code) & (df_history["smt"] == target_smt) ].sort_values("thn") if len(hist) < 2: return { "has_sufficient_data": False, "data_points": len(hist), } enrollments = np.array(hist["enrollment"].values, dtype=float) years = np.array(hist["thn"].values, dtype=float) growth_rates = [] for i in range(1, len(enrollments)): if enrollments[i - 1] > 0: rate = (enrollments[i] - enrollments[i - 1]) / enrollments[i - 1] growth_rates.append(rate) avg_growth_rate = float(np.mean(growth_rates)) if growth_rates else 0.0 if len(years) >= 2: coeffs = np.polyfit(years, enrollments, 1) trend_slope = float(coeffs[0]) else: trend_slope = 0.0 return { "has_sufficient_data": True, "data_points": len(hist), "min_enrollment": int(enrollments.min()), "max_enrollment": int(enrollments.max()), "avg_enrollment": round(float(enrollments.mean()), 1), "latest_enrollment": int(enrollments[-1]), "avg_growth_rate": round(avg_growth_rate * 100, 1), # as percentage "trend_slope": round(trend_slope, 2), "trend_direction": "increasing" if trend_slope > 0 else "decreasing" if trend_slope < 0 else "stable", "year_range": f"{int(years.min())}-{int(years.max())}", } def predict_course_enrollment( self, course_code: str, train_data: pd.DataFrame, test_year: int, test_semester: int, test_student_count: float, ) -> tuple: result = self.predict_course( course_code=course_code, df_history=train_data, target_year=test_year, target_smt=test_semester, student_pop=test_student_count, ) val = float(result["val"]) if not np.isfinite(val): val = self.config.model.FALLBACK_DEFAULT strategy = "fallback_default" else: val = max(0.0, val) strategy = result["strategy"] return val, strategy