classquota / prophet_predictor.py
muhalwan's picture
Revised version
6a0a429
import logging
from typing import Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from prophet import Prophet
from config import Config
logger = logging.getLogger(__name__)
class ProphetPredictor:
def __init__(self, config: Config):
self.config = config
self.student_model: Optional[Prophet] = None
def train_student_population_model(self, population_data: pd.DataFrame):
df = population_data.copy()
df["ds"] = pd.to_datetime(
df["thn"].astype(str)
+ "-"
+ df["smt"].map(self.config.prediction.SEMESTER_TO_MONTH)
)
df["y"] = df["jumlah_aktif"]
self.student_model = Prophet(
growth="linear",
daily_seasonality=False, # type: ignore[arg-type]
weekly_seasonality=False, # type: ignore[arg-type]
yearly_seasonality=True, # type: ignore[arg-type]
)
self.student_model.fit(df[["ds", "y"]])
logger.info("Student population model trained.")
def get_student_forecast(self, year: int, semester: int) -> float:
assert self.student_model is not None, "Student model must be trained first"
target_date = pd.to_datetime(
f"{year}-{self.config.prediction.SEMESTER_TO_MONTH[semester]}"
)
future = pd.DataFrame({"ds": [target_date]})
forecast = self.student_model.predict(future)
return max(forecast["yhat"].values[0], 100)
def get_multi_year_student_forecast(
self, start_year: int, semester: int, years_ahead: int
) -> List[Tuple[int, float]]:
assert self.student_model is not None, "Student model must be trained first"
forecasts = []
for i in range(years_ahead + 1):
year = start_year + i
pop = self.get_student_forecast(year, semester)
forecasts.append((year, pop))
return forecasts
def predict_course(
self,
course_code: str,
df_history: pd.DataFrame,
target_year: int,
target_smt: int,
student_pop: float,
) -> dict:
hist = df_history[
(df_history["kode_mk"] == course_code) & (df_history["smt"] == target_smt)
].sort_values(["thn", "smt"])
has_historical_data = len(hist) > 0
if not has_historical_data:
return {
"val": self.config.model.FALLBACK_DEFAULT,
"strategy": "cold_start",
"confidence": "low",
"classes_needed": self.config.calculate_classes_needed(
self.config.model.FALLBACK_DEFAULT,
course_code,
has_historical_data=False,
),
"capacity_status": self.config.get_capacity_status(
self.config.model.FALLBACK_DEFAULT, course_code
),
}
result = self._predict_prophet_with_capacity(
hist, target_year, target_smt, student_pop, course_code
)
result["classes_needed"] = self.config.calculate_classes_needed(
result["val"], course_code, has_historical_data=has_historical_data
)
result["capacity_status"] = self.config.get_capacity_status(
result["val"], course_code
)
return result
def _predict_prophet_with_capacity(
self, hist: pd.DataFrame, year: int, smt: int, pop: float, course_code: str
) -> dict:
df = hist.copy()
df["ds"] = pd.to_datetime(
df["thn"].astype(int).astype(str)
+ "-"
+ df["smt"].astype(int).map(self.config.prediction.SEMESTER_TO_MONTH)
)
df["y"] = df["enrollment"]
if df["y"].notna().sum() < 2:
return {
"val": hist["enrollment"].mean(),
"strategy": "fallback_mean",
"confidence": "medium",
}
if "jumlah_aktif" not in df.columns:
logger.warning(
"jumlah_aktif column missing from historical data - cannot use regressor"
)
return {
"val": hist["enrollment"].mean(),
"strategy": "fallback_mean",
"confidence": "low",
}
hist_max = float(df["y"].max())
hist_mean = float(df["y"].mean())
class_capacity = self.config.get_class_capacity(course_code)
cap_value = min(
hist_max * self.config.prediction.MAX_CAPACITY_MULTIPLIER,
self.config.prediction.ABSOLUTE_MAX_STUDENTS,
)
if self.config.class_capacity.ENABLE_CAPACITY_CONSTRAINTS:
max_realistic_cap = class_capacity * 4
cap_value = min(cap_value, max_realistic_cap)
df["cap"] = cap_value
df["floor"] = 0
try:
m = Prophet(
growth=self.config.model.GROWTH_MODE,
changepoint_prior_scale=self.config.model.CHANGEPOINT_SCALE,
seasonality_mode=self.config.model.SEASONALITY_MODE,
daily_seasonality=False, # type: ignore[arg-type]
weekly_seasonality=False, # type: ignore[arg-type]
)
if self.config.model.USE_POPULATION_REGRESSOR:
m.add_regressor("jumlah_aktif", mode="multiplicative")
m.fit(df[["ds", "y", "cap", "floor", "jumlah_aktif"]])
else:
m.fit(df[["ds", "y", "cap", "floor"]])
future_date = pd.to_datetime(
f"{year}-{self.config.prediction.SEMESTER_TO_MONTH[smt]}"
)
future = pd.DataFrame(
{
"ds": [future_date],
"cap": [cap_value],
"floor": [0],
}
)
if self.config.model.USE_POPULATION_REGRESSOR:
future["jumlah_aktif"] = pop
forecast = m.predict(future)
raw_pred = forecast["yhat"].values[0]
if (
raw_pred < 0
or not np.isfinite(raw_pred)
or raw_pred > hist_max * 5
or raw_pred > cap_value * 2
):
logger.warning(
f"Prophet prediction ({raw_pred:.1f}) unrealistic for {course_code}. "
f"Using trend-based fallback. (hist_max={hist_max}, cap={cap_value})"
)
pop_mean = float(df["jumlah_aktif"].mean())
if len(df) >= 3:
recent_trend = float(df["y"].tail(3).mean())
pop_growth_factor = pop / pop_mean if pop_mean > 0 else 1.0
growth_factor = min(max(pop_growth_factor, 0.8), 1.3)
pred = recent_trend * growth_factor
else:
pop_growth_factor = pop / pop_mean if pop_mean > 0 else 1.0
pred = hist_mean * min(max(pop_growth_factor, 0.8), 1.3)
pred = min(max(pred, 0), cap_value)
return {
"val": pred,
"strategy": "trend_fallback",
"confidence": "medium",
}
pred = min(max(0, raw_pred), cap_value)
return {
"val": pred,
"strategy": "prophet_logistic",
"confidence": "high",
}
except Exception as e:
logger.warning(
f"Prophet failed for course {course_code}. Error: {e}. Using fallback."
)
return {
"val": hist["enrollment"].mean(),
"strategy": "fallback_mean",
"confidence": "medium",
}
def predict_multi_year(
self,
course_code: str,
df_history: pd.DataFrame,
start_year: int,
target_smt: int,
years_ahead: int = 3,
) -> List[Dict]:
predictions = []
for i in range(years_ahead + 1):
year = start_year + i
pop = self.get_student_forecast(year, target_smt)
pred = self.predict_course(course_code, df_history, year, target_smt, pop)
pred["year"] = year
pred["semester"] = target_smt
pred["student_population"] = pop
predictions.append(pred)
return predictions
def generate_batch_predictions(
self,
full_data: pd.DataFrame,
course_metadata: pd.DataFrame,
electives: set,
year: int,
smt: int,
) -> pd.DataFrame:
student_pop = self.get_student_forecast(year, smt)
results = []
logger.info(
f"Predicting for {len(electives)} courses (Pop: {int(student_pop)})..."
)
for code in electives:
meta_rows = course_metadata[course_metadata["kode_mk"] == code]
if len(meta_rows) == 0:
logger.warning(f"No metadata found for course {code}, skipping")
continue
meta = meta_rows.iloc[0]
pred_result = self.predict_course(code, full_data, year, smt, student_pop)
pred_val = pred_result["val"]
course_history = full_data[full_data["kode_mk"] == code]
has_history = len(course_history) > 0
classes_needed = pred_result.get(
"classes_needed",
self.config.calculate_classes_needed(
pred_val, code, has_historical_data=has_history
),
)
course_capacity = self.config.get_class_capacity(code)
if classes_needed > 0:
rec_quota = classes_needed * course_capacity
else:
rec_quota = 0
min_threshold = self.config.class_capacity.MIN_STUDENTS_TO_OPEN_CLASS
should_open = pred_val >= min_threshold or (
has_history and self.config.class_capacity.OPEN_CLASS_IF_HAS_HISTORY
)
status = "BUKA" if should_open else "TUTUP"
if classes_needed > 0:
total_capacity = classes_needed * course_capacity
utilization = (pred_val / total_capacity) * 100
else:
utilization = 0
results.append(
{
"kode_mk": code,
"nama_mk": meta["nama_mk"],
"sks": meta.get("sks_mk", 0),
"predicted_enrollment": round(pred_val, 1),
"class_capacity": course_capacity,
"classes_needed": classes_needed,
"total_quota": rec_quota,
"utilization_pct": round(utilization, 1),
"recommendation": status,
"capacity_status": pred_result.get("capacity_status", "NORMAL"),
"strategy": pred_result["strategy"],
"confidence": pred_result["confidence"],
}
)
return pd.DataFrame(results).sort_values(
"predicted_enrollment", ascending=False
)
def generate_multi_year_forecast(
self,
full_data: pd.DataFrame,
course_metadata: pd.DataFrame,
electives: set,
start_year: int,
smt: int,
years_ahead: int = 3,
) -> pd.DataFrame:
all_results = []
for code in electives:
meta_rows = course_metadata[course_metadata["kode_mk"] == code]
if len(meta_rows) == 0:
continue
meta = meta_rows.iloc[0]
year_predictions = self.predict_multi_year(
code, full_data, start_year, smt, years_ahead
)
for pred in year_predictions:
course_capacity = self.config.get_class_capacity(code)
classes_needed = pred.get("classes_needed", 0)
all_results.append(
{
"kode_mk": code,
"nama_mk": meta["nama_mk"],
"year": pred["year"],
"semester": pred["semester"],
"predicted_enrollment": round(pred["val"], 1),
"classes_needed": classes_needed,
"total_capacity": classes_needed * course_capacity,
"student_population": round(pred["student_population"], 0),
"strategy": pred["strategy"],
"confidence": pred["confidence"],
}
)
return pd.DataFrame(all_results).sort_values(["kode_mk", "year"])
def get_course_trend_analysis(
self,
course_code: str,
df_history: pd.DataFrame,
target_smt: int,
) -> Dict:
hist = df_history[
(df_history["kode_mk"] == course_code) & (df_history["smt"] == target_smt)
].sort_values("thn")
if len(hist) < 2:
return {
"has_sufficient_data": False,
"data_points": len(hist),
}
enrollments = np.array(hist["enrollment"].values, dtype=float)
years = np.array(hist["thn"].values, dtype=float)
growth_rates = []
for i in range(1, len(enrollments)):
if enrollments[i - 1] > 0:
rate = (enrollments[i] - enrollments[i - 1]) / enrollments[i - 1]
growth_rates.append(rate)
avg_growth_rate = float(np.mean(growth_rates)) if growth_rates else 0.0
if len(years) >= 2:
coeffs = np.polyfit(years, enrollments, 1)
trend_slope = float(coeffs[0])
else:
trend_slope = 0.0
return {
"has_sufficient_data": True,
"data_points": len(hist),
"min_enrollment": int(enrollments.min()),
"max_enrollment": int(enrollments.max()),
"avg_enrollment": round(float(enrollments.mean()), 1),
"latest_enrollment": int(enrollments[-1]),
"avg_growth_rate": round(avg_growth_rate * 100, 1), # as percentage
"trend_slope": round(trend_slope, 2),
"trend_direction": "increasing"
if trend_slope > 0
else "decreasing"
if trend_slope < 0
else "stable",
"year_range": f"{int(years.min())}-{int(years.max())}",
}
def predict_course_enrollment(
self,
course_code: str,
train_data: pd.DataFrame,
test_year: int,
test_semester: int,
test_student_count: float,
) -> tuple:
result = self.predict_course(
course_code=course_code,
df_history=train_data,
target_year=test_year,
target_smt=test_semester,
student_pop=test_student_count,
)
val = float(result["val"])
if not np.isfinite(val):
val = self.config.model.FALLBACK_DEFAULT
strategy = "fallback_default"
else:
val = max(0.0, val)
strategy = result["strategy"]
return val, strategy