Spaces:
Sleeping
Sleeping
| import logging | |
| from typing import Dict, List, Optional, Tuple | |
| import numpy as np | |
| import pandas as pd | |
| from prophet import Prophet | |
| from config import Config | |
| logger = logging.getLogger(__name__) | |
| class ProphetPredictor: | |
| def __init__(self, config: Config): | |
| self.config = config | |
| self.student_model: Optional[Prophet] = None | |
| def train_student_population_model(self, population_data: pd.DataFrame): | |
| df = population_data.copy() | |
| df["ds"] = pd.to_datetime( | |
| df["thn"].astype(str) | |
| + "-" | |
| + df["smt"].map(self.config.prediction.SEMESTER_TO_MONTH) | |
| ) | |
| df["y"] = df["jumlah_aktif"] | |
| self.student_model = Prophet( | |
| growth="linear", | |
| daily_seasonality=False, # type: ignore[arg-type] | |
| weekly_seasonality=False, # type: ignore[arg-type] | |
| yearly_seasonality=True, # type: ignore[arg-type] | |
| ) | |
| self.student_model.fit(df[["ds", "y"]]) | |
| logger.info("Student population model trained.") | |
| def get_student_forecast(self, year: int, semester: int) -> float: | |
| assert self.student_model is not None, "Student model must be trained first" | |
| target_date = pd.to_datetime( | |
| f"{year}-{self.config.prediction.SEMESTER_TO_MONTH[semester]}" | |
| ) | |
| future = pd.DataFrame({"ds": [target_date]}) | |
| forecast = self.student_model.predict(future) | |
| return max(forecast["yhat"].values[0], 100) | |
| def get_multi_year_student_forecast( | |
| self, start_year: int, semester: int, years_ahead: int | |
| ) -> List[Tuple[int, float]]: | |
| assert self.student_model is not None, "Student model must be trained first" | |
| forecasts = [] | |
| for i in range(years_ahead + 1): | |
| year = start_year + i | |
| pop = self.get_student_forecast(year, semester) | |
| forecasts.append((year, pop)) | |
| return forecasts | |
| def predict_course( | |
| self, | |
| course_code: str, | |
| df_history: pd.DataFrame, | |
| target_year: int, | |
| target_smt: int, | |
| student_pop: float, | |
| ) -> dict: | |
| hist = df_history[ | |
| (df_history["kode_mk"] == course_code) & (df_history["smt"] == target_smt) | |
| ].sort_values(["thn", "smt"]) | |
| has_historical_data = len(hist) > 0 | |
| if not has_historical_data: | |
| return { | |
| "val": self.config.model.FALLBACK_DEFAULT, | |
| "strategy": "cold_start", | |
| "confidence": "low", | |
| "classes_needed": self.config.calculate_classes_needed( | |
| self.config.model.FALLBACK_DEFAULT, | |
| course_code, | |
| has_historical_data=False, | |
| ), | |
| "capacity_status": self.config.get_capacity_status( | |
| self.config.model.FALLBACK_DEFAULT, course_code | |
| ), | |
| } | |
| result = self._predict_prophet_with_capacity( | |
| hist, target_year, target_smt, student_pop, course_code | |
| ) | |
| result["classes_needed"] = self.config.calculate_classes_needed( | |
| result["val"], course_code, has_historical_data=has_historical_data | |
| ) | |
| result["capacity_status"] = self.config.get_capacity_status( | |
| result["val"], course_code | |
| ) | |
| return result | |
| def _predict_prophet_with_capacity( | |
| self, hist: pd.DataFrame, year: int, smt: int, pop: float, course_code: str | |
| ) -> dict: | |
| df = hist.copy() | |
| df["ds"] = pd.to_datetime( | |
| df["thn"].astype(int).astype(str) | |
| + "-" | |
| + df["smt"].astype(int).map(self.config.prediction.SEMESTER_TO_MONTH) | |
| ) | |
| df["y"] = df["enrollment"] | |
| if df["y"].notna().sum() < 2: | |
| return { | |
| "val": hist["enrollment"].mean(), | |
| "strategy": "fallback_mean", | |
| "confidence": "medium", | |
| } | |
| if "jumlah_aktif" not in df.columns: | |
| logger.warning( | |
| "jumlah_aktif column missing from historical data - cannot use regressor" | |
| ) | |
| return { | |
| "val": hist["enrollment"].mean(), | |
| "strategy": "fallback_mean", | |
| "confidence": "low", | |
| } | |
| hist_max = float(df["y"].max()) | |
| hist_mean = float(df["y"].mean()) | |
| class_capacity = self.config.get_class_capacity(course_code) | |
| cap_value = min( | |
| hist_max * self.config.prediction.MAX_CAPACITY_MULTIPLIER, | |
| self.config.prediction.ABSOLUTE_MAX_STUDENTS, | |
| ) | |
| if self.config.class_capacity.ENABLE_CAPACITY_CONSTRAINTS: | |
| max_realistic_cap = class_capacity * 4 | |
| cap_value = min(cap_value, max_realistic_cap) | |
| df["cap"] = cap_value | |
| df["floor"] = 0 | |
| try: | |
| m = Prophet( | |
| growth=self.config.model.GROWTH_MODE, | |
| changepoint_prior_scale=self.config.model.CHANGEPOINT_SCALE, | |
| seasonality_mode=self.config.model.SEASONALITY_MODE, | |
| daily_seasonality=False, # type: ignore[arg-type] | |
| weekly_seasonality=False, # type: ignore[arg-type] | |
| ) | |
| if self.config.model.USE_POPULATION_REGRESSOR: | |
| m.add_regressor("jumlah_aktif", mode="multiplicative") | |
| m.fit(df[["ds", "y", "cap", "floor", "jumlah_aktif"]]) | |
| else: | |
| m.fit(df[["ds", "y", "cap", "floor"]]) | |
| future_date = pd.to_datetime( | |
| f"{year}-{self.config.prediction.SEMESTER_TO_MONTH[smt]}" | |
| ) | |
| future = pd.DataFrame( | |
| { | |
| "ds": [future_date], | |
| "cap": [cap_value], | |
| "floor": [0], | |
| } | |
| ) | |
| if self.config.model.USE_POPULATION_REGRESSOR: | |
| future["jumlah_aktif"] = pop | |
| forecast = m.predict(future) | |
| raw_pred = forecast["yhat"].values[0] | |
| if ( | |
| raw_pred < 0 | |
| or not np.isfinite(raw_pred) | |
| or raw_pred > hist_max * 5 | |
| or raw_pred > cap_value * 2 | |
| ): | |
| logger.warning( | |
| f"Prophet prediction ({raw_pred:.1f}) unrealistic for {course_code}. " | |
| f"Using trend-based fallback. (hist_max={hist_max}, cap={cap_value})" | |
| ) | |
| pop_mean = float(df["jumlah_aktif"].mean()) | |
| if len(df) >= 3: | |
| recent_trend = float(df["y"].tail(3).mean()) | |
| pop_growth_factor = pop / pop_mean if pop_mean > 0 else 1.0 | |
| growth_factor = min(max(pop_growth_factor, 0.8), 1.3) | |
| pred = recent_trend * growth_factor | |
| else: | |
| pop_growth_factor = pop / pop_mean if pop_mean > 0 else 1.0 | |
| pred = hist_mean * min(max(pop_growth_factor, 0.8), 1.3) | |
| pred = min(max(pred, 0), cap_value) | |
| return { | |
| "val": pred, | |
| "strategy": "trend_fallback", | |
| "confidence": "medium", | |
| } | |
| pred = min(max(0, raw_pred), cap_value) | |
| return { | |
| "val": pred, | |
| "strategy": "prophet_logistic", | |
| "confidence": "high", | |
| } | |
| except Exception as e: | |
| logger.warning( | |
| f"Prophet failed for course {course_code}. Error: {e}. Using fallback." | |
| ) | |
| return { | |
| "val": hist["enrollment"].mean(), | |
| "strategy": "fallback_mean", | |
| "confidence": "medium", | |
| } | |
| def predict_multi_year( | |
| self, | |
| course_code: str, | |
| df_history: pd.DataFrame, | |
| start_year: int, | |
| target_smt: int, | |
| years_ahead: int = 3, | |
| ) -> List[Dict]: | |
| predictions = [] | |
| for i in range(years_ahead + 1): | |
| year = start_year + i | |
| pop = self.get_student_forecast(year, target_smt) | |
| pred = self.predict_course(course_code, df_history, year, target_smt, pop) | |
| pred["year"] = year | |
| pred["semester"] = target_smt | |
| pred["student_population"] = pop | |
| predictions.append(pred) | |
| return predictions | |
| def generate_batch_predictions( | |
| self, | |
| full_data: pd.DataFrame, | |
| course_metadata: pd.DataFrame, | |
| electives: set, | |
| year: int, | |
| smt: int, | |
| ) -> pd.DataFrame: | |
| student_pop = self.get_student_forecast(year, smt) | |
| results = [] | |
| logger.info( | |
| f"Predicting for {len(electives)} courses (Pop: {int(student_pop)})..." | |
| ) | |
| for code in electives: | |
| meta_rows = course_metadata[course_metadata["kode_mk"] == code] | |
| if len(meta_rows) == 0: | |
| logger.warning(f"No metadata found for course {code}, skipping") | |
| continue | |
| meta = meta_rows.iloc[0] | |
| pred_result = self.predict_course(code, full_data, year, smt, student_pop) | |
| pred_val = pred_result["val"] | |
| course_history = full_data[full_data["kode_mk"] == code] | |
| has_history = len(course_history) > 0 | |
| classes_needed = pred_result.get( | |
| "classes_needed", | |
| self.config.calculate_classes_needed( | |
| pred_val, code, has_historical_data=has_history | |
| ), | |
| ) | |
| course_capacity = self.config.get_class_capacity(code) | |
| if classes_needed > 0: | |
| rec_quota = classes_needed * course_capacity | |
| else: | |
| rec_quota = 0 | |
| min_threshold = self.config.class_capacity.MIN_STUDENTS_TO_OPEN_CLASS | |
| should_open = pred_val >= min_threshold or ( | |
| has_history and self.config.class_capacity.OPEN_CLASS_IF_HAS_HISTORY | |
| ) | |
| status = "BUKA" if should_open else "TUTUP" | |
| if classes_needed > 0: | |
| total_capacity = classes_needed * course_capacity | |
| utilization = (pred_val / total_capacity) * 100 | |
| else: | |
| utilization = 0 | |
| results.append( | |
| { | |
| "kode_mk": code, | |
| "nama_mk": meta["nama_mk"], | |
| "sks": meta.get("sks_mk", 0), | |
| "predicted_enrollment": round(pred_val, 1), | |
| "class_capacity": course_capacity, | |
| "classes_needed": classes_needed, | |
| "total_quota": rec_quota, | |
| "utilization_pct": round(utilization, 1), | |
| "recommendation": status, | |
| "capacity_status": pred_result.get("capacity_status", "NORMAL"), | |
| "strategy": pred_result["strategy"], | |
| "confidence": pred_result["confidence"], | |
| } | |
| ) | |
| return pd.DataFrame(results).sort_values( | |
| "predicted_enrollment", ascending=False | |
| ) | |
| def generate_multi_year_forecast( | |
| self, | |
| full_data: pd.DataFrame, | |
| course_metadata: pd.DataFrame, | |
| electives: set, | |
| start_year: int, | |
| smt: int, | |
| years_ahead: int = 3, | |
| ) -> pd.DataFrame: | |
| all_results = [] | |
| for code in electives: | |
| meta_rows = course_metadata[course_metadata["kode_mk"] == code] | |
| if len(meta_rows) == 0: | |
| continue | |
| meta = meta_rows.iloc[0] | |
| year_predictions = self.predict_multi_year( | |
| code, full_data, start_year, smt, years_ahead | |
| ) | |
| for pred in year_predictions: | |
| course_capacity = self.config.get_class_capacity(code) | |
| classes_needed = pred.get("classes_needed", 0) | |
| all_results.append( | |
| { | |
| "kode_mk": code, | |
| "nama_mk": meta["nama_mk"], | |
| "year": pred["year"], | |
| "semester": pred["semester"], | |
| "predicted_enrollment": round(pred["val"], 1), | |
| "classes_needed": classes_needed, | |
| "total_capacity": classes_needed * course_capacity, | |
| "student_population": round(pred["student_population"], 0), | |
| "strategy": pred["strategy"], | |
| "confidence": pred["confidence"], | |
| } | |
| ) | |
| return pd.DataFrame(all_results).sort_values(["kode_mk", "year"]) | |
| def get_course_trend_analysis( | |
| self, | |
| course_code: str, | |
| df_history: pd.DataFrame, | |
| target_smt: int, | |
| ) -> Dict: | |
| hist = df_history[ | |
| (df_history["kode_mk"] == course_code) & (df_history["smt"] == target_smt) | |
| ].sort_values("thn") | |
| if len(hist) < 2: | |
| return { | |
| "has_sufficient_data": False, | |
| "data_points": len(hist), | |
| } | |
| enrollments = np.array(hist["enrollment"].values, dtype=float) | |
| years = np.array(hist["thn"].values, dtype=float) | |
| growth_rates = [] | |
| for i in range(1, len(enrollments)): | |
| if enrollments[i - 1] > 0: | |
| rate = (enrollments[i] - enrollments[i - 1]) / enrollments[i - 1] | |
| growth_rates.append(rate) | |
| avg_growth_rate = float(np.mean(growth_rates)) if growth_rates else 0.0 | |
| if len(years) >= 2: | |
| coeffs = np.polyfit(years, enrollments, 1) | |
| trend_slope = float(coeffs[0]) | |
| else: | |
| trend_slope = 0.0 | |
| return { | |
| "has_sufficient_data": True, | |
| "data_points": len(hist), | |
| "min_enrollment": int(enrollments.min()), | |
| "max_enrollment": int(enrollments.max()), | |
| "avg_enrollment": round(float(enrollments.mean()), 1), | |
| "latest_enrollment": int(enrollments[-1]), | |
| "avg_growth_rate": round(avg_growth_rate * 100, 1), # as percentage | |
| "trend_slope": round(trend_slope, 2), | |
| "trend_direction": "increasing" | |
| if trend_slope > 0 | |
| else "decreasing" | |
| if trend_slope < 0 | |
| else "stable", | |
| "year_range": f"{int(years.min())}-{int(years.max())}", | |
| } | |
| def predict_course_enrollment( | |
| self, | |
| course_code: str, | |
| train_data: pd.DataFrame, | |
| test_year: int, | |
| test_semester: int, | |
| test_student_count: float, | |
| ) -> tuple: | |
| result = self.predict_course( | |
| course_code=course_code, | |
| df_history=train_data, | |
| target_year=test_year, | |
| target_smt=test_semester, | |
| student_pop=test_student_count, | |
| ) | |
| val = float(result["val"]) | |
| if not np.isfinite(val): | |
| val = self.config.model.FALLBACK_DEFAULT | |
| strategy = "fallback_default" | |
| else: | |
| val = max(0.0, val) | |
| strategy = result["strategy"] | |
| return val, strategy | |