Spaces:
Sleeping
Sleeping
| """Intelligent retraining trigger with a dual-gate decision. | |
| Retraining fires only when ALL conditions hold: | |
| 1. Feature drift detected (PSI >= threshold or KS p-value < threshold) | |
| 2. Performance degradation detected (RMSE increased by >= N%) | |
| 3. Minimum samples collected since last retrain | |
| 4. Cooldown period has elapsed | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import time | |
| from pathlib import Path | |
| from typing import Optional | |
| from src.utils.config import settings, resolve | |
| from src.utils.logging_config import get_logger | |
| log = get_logger(__name__) | |
| class RetrainingTrigger: | |
| """Evaluate whether all retraining conditions are met.""" | |
| def __init__(self) -> None: | |
| self._last_retrain_time: Optional[float] = None | |
| self._retrain_log_path = resolve(settings.retraining.retrain_log_path) | |
| self._cooldown_seconds = settings.retraining.cooldown_hours * 3600 | |
| def should_retrain( | |
| self, | |
| feature_drift_report: dict, | |
| performance_report: dict, | |
| samples_since_last_retrain: int, | |
| ) -> dict: | |
| """Return a decision dict with should_retrain, reasons, and blocking_reasons.""" | |
| reasons: list[str] = [] | |
| blocking: list[str] = [] | |
| feature_drifted = feature_drift_report.get("drift_detected", False) | |
| perf_drifted = performance_report.get("drift_detected", False) | |
| if feature_drifted: | |
| drifted_features = feature_drift_report.get("drifted_features", []) | |
| reasons.append(f"Feature drift detected in: {drifted_features}") | |
| else: | |
| blocking.append("No significant feature drift.") | |
| if perf_drifted: | |
| pct = performance_report.get("pct_change", 0.0) | |
| reasons.append(f"Performance degraded by {pct:.1f}% vs baseline.") | |
| else: | |
| blocking.append("Performance within acceptable range.") | |
| min_samples = settings.retraining.min_samples_since_last_retrain | |
| if samples_since_last_retrain < min_samples: | |
| blocking.append( | |
| f"Only {samples_since_last_retrain} new samples (need {min_samples})." | |
| ) | |
| if self._last_retrain_time is not None: | |
| elapsed = time.time() - self._last_retrain_time | |
| if elapsed < self._cooldown_seconds: | |
| remaining = int(self._cooldown_seconds - elapsed) | |
| blocking.append( | |
| f"Cooldown active: {remaining}s remaining." | |
| ) | |
| should_retrain = ( | |
| feature_drifted | |
| and perf_drifted | |
| and samples_since_last_retrain >= min_samples | |
| and ( | |
| self._last_retrain_time is None | |
| or (time.time() - self._last_retrain_time) >= self._cooldown_seconds | |
| ) | |
| ) | |
| decision = { | |
| "should_retrain": should_retrain, | |
| "reasons": reasons, | |
| "blocking_reasons": blocking, | |
| "feature_drift": feature_drifted, | |
| "performance_drift": perf_drifted, | |
| "samples_since_last_retrain": samples_since_last_retrain, | |
| "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), | |
| } | |
| if should_retrain: | |
| log.warning("RETRAINING TRIGGERED! Reasons: %s", reasons) | |
| else: | |
| log.info("Retraining NOT triggered. Blocking: %s", blocking or ["none"]) | |
| self._log_decision(decision) | |
| return decision | |
| def record_retrain_completed(self) -> None: | |
| """Reset cooldown after a successful retraining run.""" | |
| self._last_retrain_time = time.time() | |
| log.info("Retraining cooldown reset at %s", time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())) | |
| def _log_decision(self, decision: dict) -> None: | |
| with open(self._retrain_log_path, "a", encoding="utf-8") as fh: | |
| fh.write(json.dumps(decision) + "\n") | |