classquota / prophet_predictor.py
muhalwan's picture
Revised version
6a0a429
raw
history blame
15.2 kB
import logging
from typing import Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from prophet import Prophet
from config import Config
logger = logging.getLogger(__name__)
class ProphetPredictor:
def __init__(self, config: Config):
self.config = config
self.student_model: Optional[Prophet] = None
def train_student_population_model(self, population_data: pd.DataFrame):
df = population_data.copy()
df["ds"] = pd.to_datetime(
df["thn"].astype(str)
+ "-"
+ df["smt"].map(self.config.prediction.SEMESTER_TO_MONTH)
)
df["y"] = df["jumlah_aktif"]
self.student_model = Prophet(
growth="linear",
daily_seasonality=False, # type: ignore[arg-type]
weekly_seasonality=False, # type: ignore[arg-type]
yearly_seasonality=True, # type: ignore[arg-type]
)
self.student_model.fit(df[["ds", "y"]])
logger.info("Student population model trained.")
def get_student_forecast(self, year: int, semester: int) -> float:
assert self.student_model is not None, "Student model must be trained first"
target_date = pd.to_datetime(
f"{year}-{self.config.prediction.SEMESTER_TO_MONTH[semester]}"
)
future = pd.DataFrame({"ds": [target_date]})
forecast = self.student_model.predict(future)
return max(forecast["yhat"].values[0], 100)
def get_multi_year_student_forecast(
self, start_year: int, semester: int, years_ahead: int
) -> List[Tuple[int, float]]:
assert self.student_model is not None, "Student model must be trained first"
forecasts = []
for i in range(years_ahead + 1):
year = start_year + i
pop = self.get_student_forecast(year, semester)
forecasts.append((year, pop))
return forecasts
def predict_course(
self,
course_code: str,
df_history: pd.DataFrame,
target_year: int,
target_smt: int,
student_pop: float,
) -> dict:
hist = df_history[
(df_history["kode_mk"] == course_code) & (df_history["smt"] == target_smt)
].sort_values(["thn", "smt"])
has_historical_data = len(hist) > 0
if not has_historical_data:
return {
"val": self.config.model.FALLBACK_DEFAULT,
"strategy": "cold_start",
"confidence": "low",
"classes_needed": self.config.calculate_classes_needed(
self.config.model.FALLBACK_DEFAULT,
course_code,
has_historical_data=False,
),
"capacity_status": self.config.get_capacity_status(
self.config.model.FALLBACK_DEFAULT, course_code
),
}
result = self._predict_prophet_with_capacity(
hist, target_year, target_smt, student_pop, course_code
)
result["classes_needed"] = self.config.calculate_classes_needed(
result["val"], course_code, has_historical_data=has_historical_data
)
result["capacity_status"] = self.config.get_capacity_status(
result["val"], course_code
)
return result
def _predict_prophet_with_capacity(
self, hist: pd.DataFrame, year: int, smt: int, pop: float, course_code: str
) -> dict:
df = hist.copy()
df["ds"] = pd.to_datetime(
df["thn"].astype(int).astype(str)
+ "-"
+ df["smt"].astype(int).map(self.config.prediction.SEMESTER_TO_MONTH)
)
df["y"] = df["enrollment"]
if df["y"].notna().sum() < 2:
return {
"val": hist["enrollment"].mean(),
"strategy": "fallback_mean",
"confidence": "medium",
}
if "jumlah_aktif" not in df.columns:
logger.warning(
"jumlah_aktif column missing from historical data - cannot use regressor"
)
return {
"val": hist["enrollment"].mean(),
"strategy": "fallback_mean",
"confidence": "low",
}
hist_max = float(df["y"].max())
hist_mean = float(df["y"].mean())
class_capacity = self.config.get_class_capacity(course_code)
cap_value = min(
hist_max * self.config.prediction.MAX_CAPACITY_MULTIPLIER,
self.config.prediction.ABSOLUTE_MAX_STUDENTS,
)
if self.config.class_capacity.ENABLE_CAPACITY_CONSTRAINTS:
max_realistic_cap = class_capacity * 4
cap_value = min(cap_value, max_realistic_cap)
df["cap"] = cap_value
df["floor"] = 0
try:
m = Prophet(
growth=self.config.model.GROWTH_MODE,
changepoint_prior_scale=self.config.model.CHANGEPOINT_SCALE,
seasonality_mode=self.config.model.SEASONALITY_MODE,
daily_seasonality=False, # type: ignore[arg-type]
weekly_seasonality=False, # type: ignore[arg-type]
)
if self.config.model.USE_POPULATION_REGRESSOR:
m.add_regressor("jumlah_aktif", mode="multiplicative")
m.fit(df[["ds", "y", "cap", "floor", "jumlah_aktif"]])
else:
m.fit(df[["ds", "y", "cap", "floor"]])
future_date = pd.to_datetime(
f"{year}-{self.config.prediction.SEMESTER_TO_MONTH[smt]}"
)
future = pd.DataFrame(
{
"ds": [future_date],
"cap": [cap_value],
"floor": [0],
}
)
if self.config.model.USE_POPULATION_REGRESSOR:
future["jumlah_aktif"] = pop
forecast = m.predict(future)
raw_pred = forecast["yhat"].values[0]
if (
raw_pred < 0
or not np.isfinite(raw_pred)
or raw_pred > hist_max * 5
or raw_pred > cap_value * 2
):
logger.warning(
f"Prophet prediction ({raw_pred:.1f}) unrealistic for {course_code}. "
f"Using trend-based fallback. (hist_max={hist_max}, cap={cap_value})"
)
pop_mean = float(df["jumlah_aktif"].mean())
if len(df) >= 3:
recent_trend = float(df["y"].tail(3).mean())
pop_growth_factor = pop / pop_mean if pop_mean > 0 else 1.0
growth_factor = min(max(pop_growth_factor, 0.8), 1.3)
pred = recent_trend * growth_factor
else:
pop_growth_factor = pop / pop_mean if pop_mean > 0 else 1.0
pred = hist_mean * min(max(pop_growth_factor, 0.8), 1.3)
pred = min(max(pred, 0), cap_value)
return {
"val": pred,
"strategy": "trend_fallback",
"confidence": "medium",
}
pred = min(max(0, raw_pred), cap_value)
return {
"val": pred,
"strategy": "prophet_logistic",
"confidence": "high",
}
except Exception as e:
logger.warning(
f"Prophet failed for course {course_code}. Error: {e}. Using fallback."
)
return {
"val": hist["enrollment"].mean(),
"strategy": "fallback_mean",
"confidence": "medium",
}
def predict_multi_year(
self,
course_code: str,
df_history: pd.DataFrame,
start_year: int,
target_smt: int,
years_ahead: int = 3,
) -> List[Dict]:
predictions = []
for i in range(years_ahead + 1):
year = start_year + i
pop = self.get_student_forecast(year, target_smt)
pred = self.predict_course(course_code, df_history, year, target_smt, pop)
pred["year"] = year
pred["semester"] = target_smt
pred["student_population"] = pop
predictions.append(pred)
return predictions
def generate_batch_predictions(
self,
full_data: pd.DataFrame,
course_metadata: pd.DataFrame,
electives: set,
year: int,
smt: int,
) -> pd.DataFrame:
student_pop = self.get_student_forecast(year, smt)
results = []
logger.info(
f"Predicting for {len(electives)} courses (Pop: {int(student_pop)})..."
)
for code in electives:
meta_rows = course_metadata[course_metadata["kode_mk"] == code]
if len(meta_rows) == 0:
logger.warning(f"No metadata found for course {code}, skipping")
continue
meta = meta_rows.iloc[0]
pred_result = self.predict_course(code, full_data, year, smt, student_pop)
pred_val = pred_result["val"]
course_history = full_data[full_data["kode_mk"] == code]
has_history = len(course_history) > 0
classes_needed = pred_result.get(
"classes_needed",
self.config.calculate_classes_needed(
pred_val, code, has_historical_data=has_history
),
)
course_capacity = self.config.get_class_capacity(code)
if classes_needed > 0:
rec_quota = classes_needed * course_capacity
else:
rec_quota = 0
min_threshold = self.config.class_capacity.MIN_STUDENTS_TO_OPEN_CLASS
should_open = pred_val >= min_threshold or (
has_history and self.config.class_capacity.OPEN_CLASS_IF_HAS_HISTORY
)
status = "BUKA" if should_open else "TUTUP"
if classes_needed > 0:
total_capacity = classes_needed * course_capacity
utilization = (pred_val / total_capacity) * 100
else:
utilization = 0
results.append(
{
"kode_mk": code,
"nama_mk": meta["nama_mk"],
"sks": meta.get("sks_mk", 0),
"predicted_enrollment": round(pred_val, 1),
"class_capacity": course_capacity,
"classes_needed": classes_needed,
"total_quota": rec_quota,
"utilization_pct": round(utilization, 1),
"recommendation": status,
"capacity_status": pred_result.get("capacity_status", "NORMAL"),
"strategy": pred_result["strategy"],
"confidence": pred_result["confidence"],
}
)
return pd.DataFrame(results).sort_values(
"predicted_enrollment", ascending=False
)
def generate_multi_year_forecast(
self,
full_data: pd.DataFrame,
course_metadata: pd.DataFrame,
electives: set,
start_year: int,
smt: int,
years_ahead: int = 3,
) -> pd.DataFrame:
all_results = []
for code in electives:
meta_rows = course_metadata[course_metadata["kode_mk"] == code]
if len(meta_rows) == 0:
continue
meta = meta_rows.iloc[0]
year_predictions = self.predict_multi_year(
code, full_data, start_year, smt, years_ahead
)
for pred in year_predictions:
course_capacity = self.config.get_class_capacity(code)
classes_needed = pred.get("classes_needed", 0)
all_results.append(
{
"kode_mk": code,
"nama_mk": meta["nama_mk"],
"year": pred["year"],
"semester": pred["semester"],
"predicted_enrollment": round(pred["val"], 1),
"classes_needed": classes_needed,
"total_capacity": classes_needed * course_capacity,
"student_population": round(pred["student_population"], 0),
"strategy": pred["strategy"],
"confidence": pred["confidence"],
}
)
return pd.DataFrame(all_results).sort_values(["kode_mk", "year"])
def get_course_trend_analysis(
self,
course_code: str,
df_history: pd.DataFrame,
target_smt: int,
) -> Dict:
hist = df_history[
(df_history["kode_mk"] == course_code) & (df_history["smt"] == target_smt)
].sort_values("thn")
if len(hist) < 2:
return {
"has_sufficient_data": False,
"data_points": len(hist),
}
enrollments = np.array(hist["enrollment"].values, dtype=float)
years = np.array(hist["thn"].values, dtype=float)
growth_rates = []
for i in range(1, len(enrollments)):
if enrollments[i - 1] > 0:
rate = (enrollments[i] - enrollments[i - 1]) / enrollments[i - 1]
growth_rates.append(rate)
avg_growth_rate = float(np.mean(growth_rates)) if growth_rates else 0.0
if len(years) >= 2:
coeffs = np.polyfit(years, enrollments, 1)
trend_slope = float(coeffs[0])
else:
trend_slope = 0.0
return {
"has_sufficient_data": True,
"data_points": len(hist),
"min_enrollment": int(enrollments.min()),
"max_enrollment": int(enrollments.max()),
"avg_enrollment": round(float(enrollments.mean()), 1),
"latest_enrollment": int(enrollments[-1]),
"avg_growth_rate": round(avg_growth_rate * 100, 1), # as percentage
"trend_slope": round(trend_slope, 2),
"trend_direction": "increasing"
if trend_slope > 0
else "decreasing"
if trend_slope < 0
else "stable",
"year_range": f"{int(years.min())}-{int(years.max())}",
}
def predict_course_enrollment(
self,
course_code: str,
train_data: pd.DataFrame,
test_year: int,
test_semester: int,
test_student_count: float,
) -> tuple:
result = self.predict_course(
course_code=course_code,
df_history=train_data,
target_year=test_year,
target_smt=test_semester,
student_pop=test_student_count,
)
val = float(result["val"])
if not np.isfinite(val):
val = self.config.model.FALLBACK_DEFAULT
strategy = "fallback_default"
else:
val = max(0.0, val)
strategy = result["strategy"]
return val, strategy