| """ |
| Learning Agent service for Phase 8. |
| Implements Multi-Armed Bandit (Thompson Sampling) for fairness weight tuning |
| and per-driver XGBoost models for personalized effort prediction. |
| """ |
|
|
| import hashlib |
| import itertools |
| import pickle |
| import uuid |
| from datetime import datetime, date, timedelta |
| from typing import Dict, List, Optional, Tuple, Any |
|
|
| import numpy as np |
|
|
| from sqlalchemy import select, func, and_ |
| from sqlalchemy.ext.asyncio import AsyncSession |
|
|
| from app.models.learning_episode import LearningEpisode |
| from app.models.driver_effort_model import DriverEffortModel |
| from app.models.driver import DriverStatsDaily, DriverFeedback |
| from app.models.assignment import Assignment |
| from app.models.allocation_run import AllocationRun |
| from app.models.fairness_config import FairnessConfig |
|
|
|
|
| def hash_config(config: dict) -> str: |
| """Generate SHA256 hash of a FairnessConfig dict.""" |
| |
| config_str = str(sorted(config.items())) |
| return hashlib.sha256(config_str.encode()).hexdigest()[:64] |
|
|
|
|
| class FairnessBandit: |
| """ |
| Multi-Armed Bandit using Thompson Sampling for fairness weight tuning. |
| |
| Each arm represents a different FairnessConfig. The bandit learns |
| which configurations lead to higher driver satisfaction. |
| """ |
| |
| |
| GINI_OPTIONS = [0.28, 0.33, 0.38] |
| STDDEV_OPTIONS = [20.0, 25.0, 30.0] |
| RECOVERY_OPTIONS = [0.6, 0.7, 0.8] |
| EV_PENALTY_OPTIONS = [0.2, 0.3, 0.4] |
| |
| def __init__(self, db: AsyncSession): |
| """Initialize bandit with database session.""" |
| self.db = db |
| self.arms = self._generate_arm_space() |
| self.arm_to_idx = {hash_config(arm): idx for idx, arm in enumerate(self.arms)} |
| self.num_arms = len(self.arms) |
| |
| |
| self.alphas = np.ones(self.num_arms) |
| self.betas = np.ones(self.num_arms) |
| self.samples = np.zeros(self.num_arms, dtype=int) |
| |
| def _generate_arm_space(self) -> List[dict]: |
| """Generate all possible arm configurations (discretized FairnessConfigs).""" |
| arms = [] |
| for gini, stddev, recovery, ev_penalty in itertools.product( |
| self.GINI_OPTIONS, |
| self.STDDEV_OPTIONS, |
| self.RECOVERY_OPTIONS, |
| self.EV_PENALTY_OPTIONS, |
| ): |
| arms.append({ |
| "gini_threshold": gini, |
| "stddev_threshold": stddev, |
| "recovery_lightening_factor": recovery, |
| "ev_charging_penalty_weight": ev_penalty, |
| |
| "max_gap_threshold": 25.0, |
| "workload_weight_packages": 1.0, |
| "workload_weight_weight_kg": 0.5, |
| "workload_weight_difficulty": 10.0, |
| "workload_weight_time": 0.2, |
| "recovery_mode_enabled": True, |
| "complexity_debt_hard_threshold": 2.0, |
| "recovery_penalty_weight": 3.0, |
| "ev_safety_margin_pct": 10.0, |
| }) |
| return arms |
| |
| async def load_priors(self) -> None: |
| """Load alpha/beta priors from database based on historical episodes.""" |
| |
| cutoff = datetime.utcnow() - timedelta(days=30) |
| result = await self.db.execute( |
| select(LearningEpisode) |
| .where(LearningEpisode.created_at >= cutoff) |
| .where(LearningEpisode.episode_reward.isnot(None)) |
| ) |
| episodes = result.scalars().all() |
| |
| |
| for episode in episodes: |
| config_hash = episode.config_hash |
| if config_hash in self.arm_to_idx: |
| arm_idx = self.arm_to_idx[config_hash] |
| reward = episode.episode_reward |
| |
| self.alphas[arm_idx] += reward |
| self.betas[arm_idx] += (1 - reward) |
| self.samples[arm_idx] += 1 |
| |
| async def select_arm(self, experimental: bool = False) -> Tuple[dict, int, float, float]: |
| """ |
| Select an arm using Thompson Sampling. |
| |
| Args: |
| experimental: If True, may select a less-explored arm for A/B testing. |
| |
| Returns: |
| Tuple of (config dict, arm_idx, alpha, beta) |
| """ |
| await self.load_priors() |
| |
| if experimental: |
| |
| exploration_bonus = np.log(np.sum(self.samples) + 1) / (self.samples + 1) |
| exploration_bonus = exploration_bonus / np.max(exploration_bonus + 0.001) |
| else: |
| exploration_bonus = np.zeros(self.num_arms) |
| |
| |
| scores = [] |
| for arm_idx in range(self.num_arms): |
| theta = np.random.beta(self.alphas[arm_idx], self.betas[arm_idx]) |
| theta += exploration_bonus[arm_idx] * 0.1 |
| scores.append(theta) |
| |
| best_arm_idx = int(np.argmax(scores)) |
| return ( |
| self.arms[best_arm_idx], |
| best_arm_idx, |
| float(self.alphas[best_arm_idx]), |
| float(self.betas[best_arm_idx]), |
| ) |
| |
| async def update(self, config_hash: str, reward: float) -> bool: |
| """ |
| Update posteriors with new episode reward. |
| |
| Args: |
| config_hash: Hash of the config used |
| reward: Normalized reward in [0, 1] |
| |
| Returns: |
| True if update successful, False if config not found |
| """ |
| if config_hash not in self.arm_to_idx: |
| return False |
| |
| arm_idx = self.arm_to_idx[config_hash] |
| |
| reward = max(0.0, min(1.0, reward)) |
| |
| |
| self.alphas[arm_idx] += reward |
| self.betas[arm_idx] += (1 - reward) |
| self.samples[arm_idx] += 1 |
| |
| return True |
| |
| def get_arm_statistics(self) -> List[dict]: |
| """Get statistics for all arms.""" |
| stats = [] |
| for arm_idx, arm_config in enumerate(self.arms): |
| config_hash = hash_config(arm_config) |
| mean = self.alphas[arm_idx] / (self.alphas[arm_idx] + self.betas[arm_idx]) |
| stats.append({ |
| "arm_idx": arm_idx, |
| "config_hash": config_hash, |
| "config": arm_config, |
| "alpha": float(self.alphas[arm_idx]), |
| "beta": float(self.betas[arm_idx]), |
| "samples": int(self.samples[arm_idx]), |
| "mean_reward": float(mean), |
| }) |
| return sorted(stats, key=lambda x: x["mean_reward"], reverse=True) |
| |
| def get_top_configs(self, n: int = 5) -> List[dict]: |
| """Get top N performing configurations.""" |
| stats = self.get_arm_statistics() |
| return stats[:n] |
|
|
|
|
| class RewardComputer: |
| """ |
| Computes episode rewards from driver feedback. |
| |
| Reward formula: |
| reward = 0.4 * avg_fairness_rating + 0.3 * (1 - avg_stress_level/10) + |
| 0.2 * completion_rate + 0.1 * retention_score |
| """ |
| |
| FAIRNESS_WEIGHT = 0.4 |
| STRESS_WEIGHT = 0.3 |
| COMPLETION_WEIGHT = 0.2 |
| RETENTION_WEIGHT = 0.1 |
| |
| def __init__(self, db: AsyncSession): |
| self.db = db |
| |
| async def compute_episode_reward( |
| self, |
| allocation_run_id: uuid.UUID, |
| ) -> Tuple[float, dict]: |
| """ |
| Compute reward for an allocation run based on driver feedback. |
| |
| Args: |
| allocation_run_id: ID of the allocation run |
| |
| Returns: |
| Tuple of (reward, feedback_stats) |
| """ |
| |
| assignments_result = await self.db.execute( |
| select(Assignment) |
| .where(Assignment.allocation_run_id == allocation_run_id) |
| ) |
| assignments = assignments_result.scalars().all() |
| |
| if not assignments: |
| return 0.5, {"error": "no_assignments"} |
| |
| assignment_ids = [a.id for a in assignments] |
| |
| |
| feedback_result = await self.db.execute( |
| select(DriverFeedback) |
| .where(DriverFeedback.assignment_id.in_(assignment_ids)) |
| ) |
| feedbacks = feedback_result.scalars().all() |
| |
| |
| if not feedbacks: |
| |
| return 0.5, { |
| "feedback_count": 0, |
| "avg_fairness_rating": None, |
| "avg_stress_level": None, |
| "completion_rate": None, |
| } |
| |
| |
| fairness_ratings = [f.fairness_rating for f in feedbacks if f.fairness_rating] |
| avg_fairness = (np.mean(fairness_ratings) - 1) / 4 if fairness_ratings else 0.5 |
| |
| |
| stress_levels = [f.stress_level for f in feedbacks if f.stress_level] |
| avg_stress = np.mean(stress_levels) / 10 if stress_levels else 0.5 |
| stress_component = 1 - avg_stress |
| |
| |
| would_take = [f.would_take_similar_route_again for f in feedbacks |
| if f.would_take_similar_route_again is not None] |
| completion_rate = np.mean(would_take) if would_take else 0.5 |
| |
| |
| tiredness_levels = [f.tiredness_level for f in feedbacks if f.tiredness_level] |
| avg_tiredness = np.mean(tiredness_levels) / 5 if tiredness_levels else 0.5 |
| retention_score = 1 - avg_tiredness |
| |
| |
| reward = ( |
| self.FAIRNESS_WEIGHT * avg_fairness + |
| self.STRESS_WEIGHT * stress_component + |
| self.COMPLETION_WEIGHT * completion_rate + |
| self.RETENTION_WEIGHT * retention_score |
| ) |
| |
| |
| reward = max(0.0, min(1.0, reward)) |
| |
| feedback_stats = { |
| "feedback_count": len(feedbacks), |
| "avg_fairness_rating": float(np.mean(fairness_ratings)) if fairness_ratings else None, |
| "avg_stress_level": float(np.mean(stress_levels)) if stress_levels else None, |
| "completion_rate": float(completion_rate), |
| "retention_score": float(retention_score), |
| } |
| |
| return reward, feedback_stats |
|
|
|
|
| class DriverEffortLearner: |
| """ |
| Per-driver XGBoost models for personalized effort prediction. |
| |
| Each driver gets their own model trained on their historical |
| assignment data. Models are retrained periodically (daily cron). |
| """ |
| |
| MIN_TRAINING_SAMPLES = 10 |
| MAX_TRAINING_SAMPLES = 100 |
| FEATURE_NAMES = [ |
| "num_packages", |
| "total_weight_kg", |
| "num_stops", |
| "route_difficulty_score", |
| "estimated_time_minutes", |
| "experience_days", |
| "recent_avg_workload", |
| "recent_hard_days", |
| ] |
| |
| def __init__(self, db: AsyncSession): |
| self.db = db |
| self._xgb_available = self._check_xgboost() |
| |
| def _check_xgboost(self) -> bool: |
| """Check if XGBoost is available.""" |
| try: |
| import xgboost |
| return True |
| except ImportError: |
| return False |
| |
| async def load_model(self, driver_id: uuid.UUID) -> Optional[Any]: |
| """Load a driver's effort model from the database.""" |
| result = await self.db.execute( |
| select(DriverEffortModel) |
| .where(DriverEffortModel.driver_id == driver_id) |
| .where(DriverEffortModel.active == True) |
| ) |
| model_record = result.scalar_one_or_none() |
| |
| if not model_record or not model_record.model_pickle: |
| return None |
| |
| try: |
| model = pickle.loads(model_record.model_pickle) |
| return model |
| except Exception: |
| return None |
| |
| async def get_model_version(self, driver_id: uuid.UUID) -> Optional[int]: |
| """Get current model version for a driver.""" |
| result = await self.db.execute( |
| select(DriverEffortModel.model_version) |
| .where(DriverEffortModel.driver_id == driver_id) |
| .where(DriverEffortModel.active == True) |
| ) |
| version = result.scalar_one_or_none() |
| return version |
| |
| async def predict_effort( |
| self, |
| driver_id: uuid.UUID, |
| route_features: dict, |
| ) -> Tuple[Optional[float], Optional[int]]: |
| """ |
| Predict effort for a driver-route pair. |
| |
| Args: |
| driver_id: Driver's UUID |
| route_features: Dict with route features |
| |
| Returns: |
| Tuple of (predicted_effort, model_version) or (None, None) |
| """ |
| if not self._xgb_available: |
| return None, None |
| |
| model = await self.load_model(driver_id) |
| if model is None: |
| return None, None |
| |
| version = await self.get_model_version(driver_id) |
| |
| try: |
| import pandas as pd |
| |
| |
| features = {name: route_features.get(name, 0.0) for name in self.FEATURE_NAMES} |
| X = pd.DataFrame([features]) |
| |
| prediction = float(model.predict(X)[0]) |
| return prediction, version |
| except Exception: |
| return None, version |
| |
| async def update_model(self, driver_id: uuid.UUID) -> dict: |
| """ |
| Retrain XGBoost model for a driver using their history. |
| |
| Returns: |
| Dict with training status and metrics |
| """ |
| if not self._xgb_available: |
| return {"status": "skipped", "reason": "xgboost_not_available"} |
| |
| import xgboost as xgb |
| import pandas as pd |
| from sklearn.metrics import mean_squared_error, r2_score |
| |
| |
| result = await self.db.execute( |
| select(DriverStatsDaily) |
| .where(DriverStatsDaily.driver_id == driver_id) |
| .where(DriverStatsDaily.actual_effort.isnot(None)) |
| .order_by(DriverStatsDaily.date.desc()) |
| .limit(self.MAX_TRAINING_SAMPLES) |
| ) |
| stats = result.scalars().all() |
| |
| if len(stats) < self.MIN_TRAINING_SAMPLES: |
| return { |
| "status": "skipped", |
| "reason": "insufficient_data", |
| "samples": len(stats), |
| "required": self.MIN_TRAINING_SAMPLES, |
| } |
| |
| |
| X_data = [] |
| y_data = [] |
| |
| for stat in stats: |
| |
| assignment_result = await self.db.execute( |
| select(Assignment) |
| .where(Assignment.driver_id == driver_id) |
| .where(Assignment.date == stat.date) |
| .limit(1) |
| ) |
| assignment = assignment_result.scalar_one_or_none() |
| |
| if assignment and assignment.route: |
| route = assignment.route |
| features = { |
| "num_packages": route.num_packages, |
| "total_weight_kg": route.total_weight_kg, |
| "num_stops": route.num_stops, |
| "route_difficulty_score": route.route_difficulty_score, |
| "estimated_time_minutes": route.estimated_time_minutes, |
| "experience_days": (stat.date - stat.driver.created_at.date()).days if stat.driver else 0, |
| "recent_avg_workload": stat.avg_workload_score, |
| "recent_hard_days": 1 if stat.is_hard_day else 0, |
| } |
| X_data.append(features) |
| y_data.append(stat.actual_effort) |
| |
| if len(X_data) < self.MIN_TRAINING_SAMPLES: |
| return { |
| "status": "skipped", |
| "reason": "insufficient_route_data", |
| "samples": len(X_data), |
| } |
| |
| |
| X = pd.DataFrame(X_data) |
| y = np.array(y_data) |
| |
| |
| model = xgb.XGBRegressor( |
| n_estimators=50, |
| max_depth=4, |
| learning_rate=0.1, |
| random_state=42, |
| ) |
| model.fit(X, y) |
| |
| |
| y_pred = model.predict(X) |
| mse = float(mean_squared_error(y, y_pred)) |
| r2 = float(r2_score(y, y_pred)) |
| |
| |
| model_pickle = pickle.dumps(model) |
| |
| |
| existing_result = await self.db.execute( |
| select(DriverEffortModel) |
| .where(DriverEffortModel.driver_id == driver_id) |
| ) |
| existing = existing_result.scalar_one_or_none() |
| |
| if existing: |
| |
| existing.model_version += 1 |
| existing.model_pickle = model_pickle |
| existing.training_samples = len(X_data) |
| existing.feature_names = {"names": self.FEATURE_NAMES} |
| existing.current_mse = mse |
| existing.r2_score = r2 |
| existing.last_trained_at = datetime.utcnow() |
| |
| |
| mse_history = existing.mse_history or [] |
| if isinstance(mse_history, dict): |
| mse_history = mse_history.get("values", []) |
| mse_history.append(mse) |
| mse_history = mse_history[-10:] |
| existing.mse_history = {"values": mse_history} |
| |
| version = existing.model_version |
| else: |
| |
| new_model = DriverEffortModel( |
| driver_id=driver_id, |
| model_version=1, |
| model_pickle=model_pickle, |
| training_samples=len(X_data), |
| feature_names={"names": self.FEATURE_NAMES}, |
| mse_history={"values": [mse]}, |
| current_mse=mse, |
| r2_score=r2, |
| active=True, |
| last_trained_at=datetime.utcnow(), |
| ) |
| self.db.add(new_model) |
| version = 1 |
| |
| return { |
| "status": "success", |
| "driver_id": str(driver_id), |
| "model_version": version, |
| "training_samples": len(X_data), |
| "mse": mse, |
| "r2_score": r2, |
| } |
| |
| async def get_model_status(self, driver_id: uuid.UUID) -> Optional[dict]: |
| """Get status of a driver's effort model.""" |
| result = await self.db.execute( |
| select(DriverEffortModel) |
| .where(DriverEffortModel.driver_id == driver_id) |
| ) |
| model = result.scalar_one_or_none() |
| |
| if not model: |
| return None |
| |
| return { |
| "driver_id": str(driver_id), |
| "model_version": model.model_version, |
| "training_samples": model.training_samples, |
| "current_mse": model.current_mse, |
| "r2_score": model.r2_score, |
| "mse_history": model.mse_history, |
| "active": model.active, |
| "last_trained_at": model.last_trained_at.isoformat() if model.last_trained_at else None, |
| } |
|
|
|
|
| class LearningAgent: |
| """ |
| Main Learning Agent that orchestrates bandit and per-driver models. |
| """ |
| |
| def __init__(self, db: AsyncSession): |
| self.db = db |
| self.bandit = FairnessBandit(db) |
| self.reward_computer = RewardComputer(db) |
| self.effort_learner = DriverEffortLearner(db) |
| |
| async def create_episode( |
| self, |
| allocation_run_id: uuid.UUID, |
| fairness_config: dict, |
| num_drivers: int, |
| num_routes: int, |
| is_experimental: bool = False, |
| ) -> LearningEpisode: |
| """Create a learning episode for an allocation run.""" |
| config_hash = hash_config(fairness_config) |
| arm_idx = self.bandit.arm_to_idx.get(config_hash, -1) |
| |
| |
| await self.bandit.load_priors() |
| alpha = float(self.bandit.alphas[arm_idx]) if arm_idx >= 0 else 1.0 |
| beta = float(self.bandit.betas[arm_idx]) if arm_idx >= 0 else 1.0 |
| samples = int(self.bandit.samples[arm_idx]) if arm_idx >= 0 else 0 |
| |
| episode = LearningEpisode( |
| allocation_run_id=allocation_run_id, |
| config_hash=config_hash, |
| fairness_config=fairness_config, |
| arm_idx=arm_idx, |
| num_drivers=num_drivers, |
| num_routes=num_routes, |
| alpha_prior=alpha, |
| beta_prior=beta, |
| samples_count=samples, |
| is_experimental=is_experimental, |
| ) |
| self.db.add(episode) |
| |
| return episode |
| |
| async def process_episode_reward(self, episode_id: uuid.UUID) -> dict: |
| """Process and update reward for an episode.""" |
| |
| result = await self.db.execute( |
| select(LearningEpisode) |
| .where(LearningEpisode.id == episode_id) |
| ) |
| episode = result.scalar_one_or_none() |
| |
| if not episode: |
| return {"status": "error", "reason": "episode_not_found"} |
| |
| if episode.episode_reward is not None: |
| return {"status": "skipped", "reason": "already_computed"} |
| |
| |
| reward, stats = await self.reward_computer.compute_episode_reward( |
| episode.allocation_run_id |
| ) |
| |
| |
| episode.episode_reward = reward |
| episode.reward_computed_at = datetime.utcnow() |
| episode.avg_fairness_rating = stats.get("avg_fairness_rating") |
| episode.avg_stress_level = stats.get("avg_stress_level") |
| episode.completion_rate = stats.get("completion_rate") |
| episode.feedback_count = stats.get("feedback_count", 0) |
| |
| |
| await self.bandit.update(episode.config_hash, reward) |
| |
| return { |
| "status": "success", |
| "episode_id": str(episode_id), |
| "reward": reward, |
| "feedback_stats": stats, |
| } |
| |
| async def select_config(self, experimental: bool = False) -> dict: |
| """Select a FairnessConfig using the bandit.""" |
| config, arm_idx, alpha, beta = await self.bandit.select_arm(experimental) |
| return { |
| "config": config, |
| "arm_idx": arm_idx, |
| "alpha": alpha, |
| "beta": beta, |
| "config_hash": hash_config(config), |
| } |
| |
| async def get_learning_status(self) -> dict: |
| """Get overall learning status for admin API.""" |
| await self.bandit.load_priors() |
| |
| |
| top_configs = self.bandit.get_top_configs(5) |
| |
| |
| result = await self.db.execute( |
| select(func.count(DriverEffortModel.driver_id)) |
| .where(DriverEffortModel.active == True) |
| ) |
| active_models = result.scalar() or 0 |
| |
| |
| result = await self.db.execute( |
| select(func.avg(DriverEffortModel.current_mse)) |
| .where(DriverEffortModel.active == True) |
| .where(DriverEffortModel.current_mse.isnot(None)) |
| ) |
| avg_mse = result.scalar() or 0.0 |
| |
| |
| cutoff = datetime.utcnow() - timedelta(days=7) |
| result = await self.db.execute( |
| select(func.count(LearningEpisode.id)) |
| .where(LearningEpisode.created_at >= cutoff) |
| ) |
| recent_episodes = result.scalar() or 0 |
| |
| |
| result = await self.db.execute( |
| select(FairnessConfig) |
| .where(FairnessConfig.is_active == True) |
| .limit(1) |
| ) |
| current_config_model = result.scalar_one_or_none() |
| |
| current_config = None |
| if current_config_model: |
| current_config = { |
| "gini_threshold": current_config_model.gini_threshold, |
| "stddev_threshold": current_config_model.stddev_threshold, |
| "recovery_lightening_factor": current_config_model.recovery_lightening_factor, |
| "ev_charging_penalty_weight": current_config_model.ev_charging_penalty_weight, |
| } |
| |
| return { |
| "current_config": current_config, |
| "top_performing_configs": top_configs, |
| "driver_models_active": active_models, |
| "avg_prediction_mse": float(avg_mse), |
| "recent_episodes_7d": recent_episodes, |
| "total_arms": self.bandit.num_arms, |
| "bandit_statistics": { |
| "total_samples": int(np.sum(self.bandit.samples)), |
| "explored_arms": int(np.sum(self.bandit.samples > 0)), |
| } |
| } |
|
|