Spaces:
Sleeping
Sleeping
| # backend/src/predictor.py | |
| """ | |
| predictor.py — per-slot vacancy forecasting with Facebook Prophet | |
| Training (run once after detector has populated occupancy_log): | |
| python -m src.predictor train | |
| Runtime (called by api/main.py): | |
| from src.predictor import Predictor | |
| p = Predictor() | |
| forecasts = p.predict(horizon_minutes=30) | |
| # forecasts = [{"slot_id": "slot_001", "vacancy_prob": 0.72}, ...] | |
| """ | |
| import json | |
| import pickle | |
| import sys | |
| from pathlib import Path | |
| from datetime import datetime, timezone, timedelta | |
| import numpy as np | |
| import pandas as pd | |
| from prophet import Prophet | |
| from src.database import get_connection, save_predictions | |
| # --------------------------------------------------------------------------- | |
| # Paths | |
| # --------------------------------------------------------------------------- | |
| BASE_DIR = Path(__file__).resolve().parent.parent | |
| MODELS_DIR = BASE_DIR / "models" / "prophet_models" | |
| MODELS_DIR.mkdir(parents=True, exist_ok=True) | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def _model_path(slot_id: str) -> Path: | |
| return MODELS_DIR / f"{slot_id}.pkl" | |
| def _load_occupancy_series(slot_id: str) -> pd.DataFrame: | |
| """ | |
| Pull occupancy_log rows for one slot and return a Prophet-ready DataFrame | |
| with columns [ds, y] where y = 1 means vacant (empty), 0 means occupied. | |
| Prophet predicts the vacancy signal so that higher values mean | |
| more likely to be free — which is what the recommender needs. | |
| """ | |
| conn = get_connection() | |
| rows = conn.execute( | |
| """ | |
| SELECT logged_at, status | |
| FROM occupancy_log | |
| WHERE slot_id = ? | |
| ORDER BY logged_at | |
| """, | |
| (slot_id,), | |
| ).fetchall() | |
| conn.close() | |
| if not rows: | |
| return pd.DataFrame(columns=["ds", "y"]) | |
| records = [] | |
| for row in rows: | |
| ts = pd.to_datetime(row["logged_at"], utc=True).tz_localize(None) | |
| vacant = 1 if row["status"] == "empty" else 0 | |
| records.append({"ds": ts, "y": vacant}) | |
| return pd.DataFrame(records) | |
| def _get_all_slot_ids() -> list[str]: | |
| conn = get_connection() | |
| rows = conn.execute( | |
| "SELECT DISTINCT slot_id FROM occupancy_log ORDER BY slot_id" | |
| ).fetchall() | |
| conn.close() | |
| return [r["slot_id"] for r in rows] | |
| # --------------------------------------------------------------------------- | |
| # Training | |
| # --------------------------------------------------------------------------- | |
| def train_all(min_rows: int = 10) -> None: | |
| """ | |
| Fit one Prophet model per slot and save to models/prophet_models/. | |
| Slots with fewer than min_rows observations are skipped — Prophet | |
| cannot fit a meaningful trend on very short series. | |
| """ | |
| slot_ids = _get_all_slot_ids() | |
| if not slot_ids: | |
| print("No data in occupancy_log. Run the detector first.") | |
| return | |
| print(f"Training Prophet models for {len(slot_ids)} slots ...") | |
| skipped = 0 | |
| trained = 0 | |
| for i, slot_id in enumerate(slot_ids, 1): | |
| df = _load_occupancy_series(slot_id) | |
| if len(df) < min_rows: | |
| skipped += 1 | |
| continue | |
| # Prophet configuration: | |
| # - daily_seasonality captures morning/evening patterns | |
| # - weekly_seasonality captures weekday vs weekend | |
| # - yearly_seasonality off — PKLot data spans only a few months | |
| # - interval_width sets the uncertainty interval (not used for point | |
| # estimates, but kept at default 0.80) | |
| model = Prophet( | |
| daily_seasonality=True, | |
| weekly_seasonality=True, | |
| yearly_seasonality=False, | |
| changepoint_prior_scale=0.05, # conservative — avoids overfitting | |
| interval_width=0.80, | |
| ) | |
| # Suppress Prophet's verbose Stan output | |
| import logging | |
| logging.getLogger("prophet").setLevel(logging.WARNING) | |
| logging.getLogger("cmdstanpy").setLevel(logging.WARNING) | |
| model.fit(df) | |
| with open(_model_path(slot_id), "wb") as f: | |
| pickle.dump(model, f) | |
| trained += 1 | |
| print(f" [{i}/{len(slot_ids)}] {slot_id} — {len(df)} observations OK") | |
| print(f"\nDone. Trained: {trained} | Skipped (too few rows): {skipped}") | |
| print(f"Models saved to: {MODELS_DIR}") | |
| # --------------------------------------------------------------------------- | |
| # Predictor class (used by the API at runtime) | |
| # --------------------------------------------------------------------------- | |
| class Predictor: | |
| """ | |
| Loads serialized Prophet models from disk once. | |
| Call .predict(horizon_minutes) to get forecasts for all slots. | |
| """ | |
| def __init__(self): | |
| pkl_files = sorted(MODELS_DIR.glob("*.pkl")) | |
| if not pkl_files: | |
| raise RuntimeError( | |
| f"No Prophet models found in {MODELS_DIR}. " | |
| "Run: python -m src.predictor train" | |
| ) | |
| self.models: dict[str, Prophet] = {} | |
| for path in pkl_files: | |
| slot_id = path.stem # filename without .pkl | |
| with open(path, "rb") as f: | |
| self.models[slot_id] = pickle.load(f) | |
| print(f"[Predictor] {len(self.models)} Prophet models loaded.") | |
| def predict(self, horizon_minutes: int = 30) -> list[dict]: | |
| """ | |
| Forecast vacancy probability for every slot at now + horizon_minutes. | |
| Returns | |
| ------- | |
| list of dicts: | |
| [{"slot_id": "slot_001", "vacancy_prob": 0.72}, ...] | |
| vacancy_prob is clipped to [0, 1]. It represents the probability | |
| that the slot will be free at the forecast horizon. | |
| """ | |
| future_time = datetime.now(timezone.utc) + timedelta(minutes=horizon_minutes) | |
| future_time = future_time.replace(tzinfo=None) # Prophet expects tz-naive | |
| results = [] | |
| for slot_id, model in self.models.items(): | |
| future_df = pd.DataFrame({"ds": [future_time]}) | |
| forecast = model.predict(future_df) | |
| # yhat is the predicted vacancy signal (0–1 trained range). | |
| # Clip so that noise does not push it outside [0, 1]. | |
| yhat = float(forecast["yhat"].iloc[0]) | |
| vacancy_prob = float(np.clip(yhat, 0.0, 1.0)) | |
| results.append({ | |
| "slot_id": slot_id, | |
| "vacancy_prob": vacancy_prob, | |
| }) | |
| # Persist to DB so /predict endpoint can also serve cached results | |
| save_predictions([ | |
| {**r, "horizon_minutes": horizon_minutes} | |
| for r in results | |
| ]) | |
| return sorted(results, key=lambda x: x["slot_id"]) | |
| # --------------------------------------------------------------------------- | |
| # CLI entry point | |
| # --------------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| if len(sys.argv) < 2 or sys.argv[1] != "train": | |
| print("Usage: python -m src.predictor train") | |
| sys.exit(1) | |
| train_all() |