""" Learning Agent service for Phase 8. Implements Multi-Armed Bandit (Thompson Sampling) for fairness weight tuning and per-driver XGBoost models for personalized effort prediction. """ import hashlib import itertools import pickle import uuid from datetime import datetime, date, timedelta from typing import Dict, List, Optional, Tuple, Any import numpy as np from sqlalchemy import select, func, and_ from sqlalchemy.ext.asyncio import AsyncSession from app.models.learning_episode import LearningEpisode from app.models.driver_effort_model import DriverEffortModel from app.models.driver import DriverStatsDaily, DriverFeedback from app.models.assignment import Assignment from app.models.allocation_run import AllocationRun from app.models.fairness_config import FairnessConfig def hash_config(config: dict) -> str: """Generate SHA256 hash of a FairnessConfig dict.""" # Sort keys for consistent hashing config_str = str(sorted(config.items())) return hashlib.sha256(config_str.encode()).hexdigest()[:64] class FairnessBandit: """ Multi-Armed Bandit using Thompson Sampling for fairness weight tuning. Each arm represents a different FairnessConfig. The bandit learns which configurations lead to higher driver satisfaction. """ # Define the arm space (discretized config values) GINI_OPTIONS = [0.28, 0.33, 0.38] STDDEV_OPTIONS = [20.0, 25.0, 30.0] RECOVERY_OPTIONS = [0.6, 0.7, 0.8] EV_PENALTY_OPTIONS = [0.2, 0.3, 0.4] def __init__(self, db: AsyncSession): """Initialize bandit with database session.""" self.db = db self.arms = self._generate_arm_space() self.arm_to_idx = {hash_config(arm): idx for idx, arm in enumerate(self.arms)} self.num_arms = len(self.arms) # Initialize priors (will be loaded from DB) self.alphas = np.ones(self.num_arms) self.betas = np.ones(self.num_arms) self.samples = np.zeros(self.num_arms, dtype=int) def _generate_arm_space(self) -> List[dict]: """Generate all possible arm configurations (discretized FairnessConfigs).""" arms = [] for gini, stddev, recovery, ev_penalty in itertools.product( self.GINI_OPTIONS, self.STDDEV_OPTIONS, self.RECOVERY_OPTIONS, self.EV_PENALTY_OPTIONS, ): arms.append({ "gini_threshold": gini, "stddev_threshold": stddev, "recovery_lightening_factor": recovery, "ev_charging_penalty_weight": ev_penalty, # Fixed defaults for other params "max_gap_threshold": 25.0, "workload_weight_packages": 1.0, "workload_weight_weight_kg": 0.5, "workload_weight_difficulty": 10.0, "workload_weight_time": 0.2, "recovery_mode_enabled": True, "complexity_debt_hard_threshold": 2.0, "recovery_penalty_weight": 3.0, "ev_safety_margin_pct": 10.0, }) return arms async def load_priors(self) -> None: """Load alpha/beta priors from database based on historical episodes.""" # Get recent episodes (last 30 days) cutoff = datetime.utcnow() - timedelta(days=30) result = await self.db.execute( select(LearningEpisode) .where(LearningEpisode.created_at >= cutoff) .where(LearningEpisode.episode_reward.isnot(None)) ) episodes = result.scalars().all() # Aggregate rewards per arm for episode in episodes: config_hash = episode.config_hash if config_hash in self.arm_to_idx: arm_idx = self.arm_to_idx[config_hash] reward = episode.episode_reward # Update priors: alpha += reward, beta += (1 - reward) self.alphas[arm_idx] += reward self.betas[arm_idx] += (1 - reward) self.samples[arm_idx] += 1 async def select_arm(self, experimental: bool = False) -> Tuple[dict, int, float, float]: """ Select an arm using Thompson Sampling. Args: experimental: If True, may select a less-explored arm for A/B testing. Returns: Tuple of (config dict, arm_idx, alpha, beta) """ await self.load_priors() if experimental: # For experimental cohort, boost exploration of under-sampled arms exploration_bonus = np.log(np.sum(self.samples) + 1) / (self.samples + 1) exploration_bonus = exploration_bonus / np.max(exploration_bonus + 0.001) else: exploration_bonus = np.zeros(self.num_arms) # Thompson Sampling: sample from Beta(alpha, beta) for each arm scores = [] for arm_idx in range(self.num_arms): theta = np.random.beta(self.alphas[arm_idx], self.betas[arm_idx]) theta += exploration_bonus[arm_idx] * 0.1 # Small exploration boost scores.append(theta) best_arm_idx = int(np.argmax(scores)) return ( self.arms[best_arm_idx], best_arm_idx, float(self.alphas[best_arm_idx]), float(self.betas[best_arm_idx]), ) async def update(self, config_hash: str, reward: float) -> bool: """ Update posteriors with new episode reward. Args: config_hash: Hash of the config used reward: Normalized reward in [0, 1] Returns: True if update successful, False if config not found """ if config_hash not in self.arm_to_idx: return False arm_idx = self.arm_to_idx[config_hash] # Clamp reward to [0, 1] reward = max(0.0, min(1.0, reward)) # Update priors self.alphas[arm_idx] += reward self.betas[arm_idx] += (1 - reward) self.samples[arm_idx] += 1 return True def get_arm_statistics(self) -> List[dict]: """Get statistics for all arms.""" stats = [] for arm_idx, arm_config in enumerate(self.arms): config_hash = hash_config(arm_config) mean = self.alphas[arm_idx] / (self.alphas[arm_idx] + self.betas[arm_idx]) stats.append({ "arm_idx": arm_idx, "config_hash": config_hash, "config": arm_config, "alpha": float(self.alphas[arm_idx]), "beta": float(self.betas[arm_idx]), "samples": int(self.samples[arm_idx]), "mean_reward": float(mean), }) return sorted(stats, key=lambda x: x["mean_reward"], reverse=True) def get_top_configs(self, n: int = 5) -> List[dict]: """Get top N performing configurations.""" stats = self.get_arm_statistics() return stats[:n] class RewardComputer: """ Computes episode rewards from driver feedback. Reward formula: reward = 0.4 * avg_fairness_rating + 0.3 * (1 - avg_stress_level/10) + 0.2 * completion_rate + 0.1 * retention_score """ FAIRNESS_WEIGHT = 0.4 STRESS_WEIGHT = 0.3 COMPLETION_WEIGHT = 0.2 RETENTION_WEIGHT = 0.1 def __init__(self, db: AsyncSession): self.db = db async def compute_episode_reward( self, allocation_run_id: uuid.UUID, ) -> Tuple[float, dict]: """ Compute reward for an allocation run based on driver feedback. Args: allocation_run_id: ID of the allocation run Returns: Tuple of (reward, feedback_stats) """ # Get all assignments for this run assignments_result = await self.db.execute( select(Assignment) .where(Assignment.allocation_run_id == allocation_run_id) ) assignments = assignments_result.scalars().all() if not assignments: return 0.5, {"error": "no_assignments"} assignment_ids = [a.id for a in assignments] # Get feedback for these assignments feedback_result = await self.db.execute( select(DriverFeedback) .where(DriverFeedback.assignment_id.in_(assignment_ids)) ) feedbacks = feedback_result.scalars().all() # Calculate metrics if not feedbacks: # No feedback yet, return neutral reward return 0.5, { "feedback_count": 0, "avg_fairness_rating": None, "avg_stress_level": None, "completion_rate": None, } # Average fairness rating (1-5 scale, normalize to 0-1) fairness_ratings = [f.fairness_rating for f in feedbacks if f.fairness_rating] avg_fairness = (np.mean(fairness_ratings) - 1) / 4 if fairness_ratings else 0.5 # Average stress level (1-10 scale, normalize and invert) stress_levels = [f.stress_level for f in feedbacks if f.stress_level] avg_stress = np.mean(stress_levels) / 10 if stress_levels else 0.5 stress_component = 1 - avg_stress # Completion rate (based on "would_take_similar_route_again") would_take = [f.would_take_similar_route_again for f in feedbacks if f.would_take_similar_route_again is not None] completion_rate = np.mean(would_take) if would_take else 0.5 # Retention score (based on tiredness, inverse relationship) tiredness_levels = [f.tiredness_level for f in feedbacks if f.tiredness_level] avg_tiredness = np.mean(tiredness_levels) / 5 if tiredness_levels else 0.5 retention_score = 1 - avg_tiredness # Compute final reward reward = ( self.FAIRNESS_WEIGHT * avg_fairness + self.STRESS_WEIGHT * stress_component + self.COMPLETION_WEIGHT * completion_rate + self.RETENTION_WEIGHT * retention_score ) # Clamp to [0, 1] reward = max(0.0, min(1.0, reward)) feedback_stats = { "feedback_count": len(feedbacks), "avg_fairness_rating": float(np.mean(fairness_ratings)) if fairness_ratings else None, "avg_stress_level": float(np.mean(stress_levels)) if stress_levels else None, "completion_rate": float(completion_rate), "retention_score": float(retention_score), } return reward, feedback_stats class DriverEffortLearner: """ Per-driver XGBoost models for personalized effort prediction. Each driver gets their own model trained on their historical assignment data. Models are retrained periodically (daily cron). """ MIN_TRAINING_SAMPLES = 10 MAX_TRAINING_SAMPLES = 100 FEATURE_NAMES = [ "num_packages", "total_weight_kg", "num_stops", "route_difficulty_score", "estimated_time_minutes", "experience_days", "recent_avg_workload", "recent_hard_days", ] def __init__(self, db: AsyncSession): self.db = db self._xgb_available = self._check_xgboost() def _check_xgboost(self) -> bool: """Check if XGBoost is available.""" try: import xgboost return True except ImportError: return False async def load_model(self, driver_id: uuid.UUID) -> Optional[Any]: """Load a driver's effort model from the database.""" result = await self.db.execute( select(DriverEffortModel) .where(DriverEffortModel.driver_id == driver_id) .where(DriverEffortModel.active == True) ) model_record = result.scalar_one_or_none() if not model_record or not model_record.model_pickle: return None try: model = pickle.loads(model_record.model_pickle) return model except Exception: return None async def get_model_version(self, driver_id: uuid.UUID) -> Optional[int]: """Get current model version for a driver.""" result = await self.db.execute( select(DriverEffortModel.model_version) .where(DriverEffortModel.driver_id == driver_id) .where(DriverEffortModel.active == True) ) version = result.scalar_one_or_none() return version async def predict_effort( self, driver_id: uuid.UUID, route_features: dict, ) -> Tuple[Optional[float], Optional[int]]: """ Predict effort for a driver-route pair. Args: driver_id: Driver's UUID route_features: Dict with route features Returns: Tuple of (predicted_effort, model_version) or (None, None) """ if not self._xgb_available: return None, None model = await self.load_model(driver_id) if model is None: return None, None version = await self.get_model_version(driver_id) try: import pandas as pd # Build feature vector features = {name: route_features.get(name, 0.0) for name in self.FEATURE_NAMES} X = pd.DataFrame([features]) prediction = float(model.predict(X)[0]) return prediction, version except Exception: return None, version async def update_model(self, driver_id: uuid.UUID) -> dict: """ Retrain XGBoost model for a driver using their history. Returns: Dict with training status and metrics """ if not self._xgb_available: return {"status": "skipped", "reason": "xgboost_not_available"} import xgboost as xgb import pandas as pd from sklearn.metrics import mean_squared_error, r2_score # Get driver's historical stats with actual effort result = await self.db.execute( select(DriverStatsDaily) .where(DriverStatsDaily.driver_id == driver_id) .where(DriverStatsDaily.actual_effort.isnot(None)) .order_by(DriverStatsDaily.date.desc()) .limit(self.MAX_TRAINING_SAMPLES) ) stats = result.scalars().all() if len(stats) < self.MIN_TRAINING_SAMPLES: return { "status": "skipped", "reason": "insufficient_data", "samples": len(stats), "required": self.MIN_TRAINING_SAMPLES, } # Build training data X_data = [] y_data = [] for stat in stats: # Get the assignment for this stat assignment_result = await self.db.execute( select(Assignment) .where(Assignment.driver_id == driver_id) .where(Assignment.date == stat.date) .limit(1) ) assignment = assignment_result.scalar_one_or_none() if assignment and assignment.route: route = assignment.route features = { "num_packages": route.num_packages, "total_weight_kg": route.total_weight_kg, "num_stops": route.num_stops, "route_difficulty_score": route.route_difficulty_score, "estimated_time_minutes": route.estimated_time_minutes, "experience_days": (stat.date - stat.driver.created_at.date()).days if stat.driver else 0, "recent_avg_workload": stat.avg_workload_score, "recent_hard_days": 1 if stat.is_hard_day else 0, } X_data.append(features) y_data.append(stat.actual_effort) if len(X_data) < self.MIN_TRAINING_SAMPLES: return { "status": "skipped", "reason": "insufficient_route_data", "samples": len(X_data), } # Convert to DataFrame X = pd.DataFrame(X_data) y = np.array(y_data) # Train XGBoost model model = xgb.XGBRegressor( n_estimators=50, max_depth=4, learning_rate=0.1, random_state=42, ) model.fit(X, y) # Compute metrics y_pred = model.predict(X) mse = float(mean_squared_error(y, y_pred)) r2 = float(r2_score(y, y_pred)) # Save model to database model_pickle = pickle.dumps(model) # Check if model record exists existing_result = await self.db.execute( select(DriverEffortModel) .where(DriverEffortModel.driver_id == driver_id) ) existing = existing_result.scalar_one_or_none() if existing: # Update existing record existing.model_version += 1 existing.model_pickle = model_pickle existing.training_samples = len(X_data) existing.feature_names = {"names": self.FEATURE_NAMES} existing.current_mse = mse existing.r2_score = r2 existing.last_trained_at = datetime.utcnow() # Update MSE history mse_history = existing.mse_history or [] if isinstance(mse_history, dict): mse_history = mse_history.get("values", []) mse_history.append(mse) mse_history = mse_history[-10:] # Keep last 10 existing.mse_history = {"values": mse_history} version = existing.model_version else: # Create new record new_model = DriverEffortModel( driver_id=driver_id, model_version=1, model_pickle=model_pickle, training_samples=len(X_data), feature_names={"names": self.FEATURE_NAMES}, mse_history={"values": [mse]}, current_mse=mse, r2_score=r2, active=True, last_trained_at=datetime.utcnow(), ) self.db.add(new_model) version = 1 return { "status": "success", "driver_id": str(driver_id), "model_version": version, "training_samples": len(X_data), "mse": mse, "r2_score": r2, } async def get_model_status(self, driver_id: uuid.UUID) -> Optional[dict]: """Get status of a driver's effort model.""" result = await self.db.execute( select(DriverEffortModel) .where(DriverEffortModel.driver_id == driver_id) ) model = result.scalar_one_or_none() if not model: return None return { "driver_id": str(driver_id), "model_version": model.model_version, "training_samples": model.training_samples, "current_mse": model.current_mse, "r2_score": model.r2_score, "mse_history": model.mse_history, "active": model.active, "last_trained_at": model.last_trained_at.isoformat() if model.last_trained_at else None, } class LearningAgent: """ Main Learning Agent that orchestrates bandit and per-driver models. """ def __init__(self, db: AsyncSession): self.db = db self.bandit = FairnessBandit(db) self.reward_computer = RewardComputer(db) self.effort_learner = DriverEffortLearner(db) async def create_episode( self, allocation_run_id: uuid.UUID, fairness_config: dict, num_drivers: int, num_routes: int, is_experimental: bool = False, ) -> LearningEpisode: """Create a learning episode for an allocation run.""" config_hash = hash_config(fairness_config) arm_idx = self.bandit.arm_to_idx.get(config_hash, -1) # Get current priors await self.bandit.load_priors() alpha = float(self.bandit.alphas[arm_idx]) if arm_idx >= 0 else 1.0 beta = float(self.bandit.betas[arm_idx]) if arm_idx >= 0 else 1.0 samples = int(self.bandit.samples[arm_idx]) if arm_idx >= 0 else 0 episode = LearningEpisode( allocation_run_id=allocation_run_id, config_hash=config_hash, fairness_config=fairness_config, arm_idx=arm_idx, num_drivers=num_drivers, num_routes=num_routes, alpha_prior=alpha, beta_prior=beta, samples_count=samples, is_experimental=is_experimental, ) self.db.add(episode) return episode async def process_episode_reward(self, episode_id: uuid.UUID) -> dict: """Process and update reward for an episode.""" # Get episode result = await self.db.execute( select(LearningEpisode) .where(LearningEpisode.id == episode_id) ) episode = result.scalar_one_or_none() if not episode: return {"status": "error", "reason": "episode_not_found"} if episode.episode_reward is not None: return {"status": "skipped", "reason": "already_computed"} # Compute reward reward, stats = await self.reward_computer.compute_episode_reward( episode.allocation_run_id ) # Update episode episode.episode_reward = reward episode.reward_computed_at = datetime.utcnow() episode.avg_fairness_rating = stats.get("avg_fairness_rating") episode.avg_stress_level = stats.get("avg_stress_level") episode.completion_rate = stats.get("completion_rate") episode.feedback_count = stats.get("feedback_count", 0) # Update bandit await self.bandit.update(episode.config_hash, reward) return { "status": "success", "episode_id": str(episode_id), "reward": reward, "feedback_stats": stats, } async def select_config(self, experimental: bool = False) -> dict: """Select a FairnessConfig using the bandit.""" config, arm_idx, alpha, beta = await self.bandit.select_arm(experimental) return { "config": config, "arm_idx": arm_idx, "alpha": alpha, "beta": beta, "config_hash": hash_config(config), } async def get_learning_status(self) -> dict: """Get overall learning status for admin API.""" await self.bandit.load_priors() # Get top configs top_configs = self.bandit.get_top_configs(5) # Count active driver models result = await self.db.execute( select(func.count(DriverEffortModel.driver_id)) .where(DriverEffortModel.active == True) ) active_models = result.scalar() or 0 # Get average MSE result = await self.db.execute( select(func.avg(DriverEffortModel.current_mse)) .where(DriverEffortModel.active == True) .where(DriverEffortModel.current_mse.isnot(None)) ) avg_mse = result.scalar() or 0.0 # Get recent episode count cutoff = datetime.utcnow() - timedelta(days=7) result = await self.db.execute( select(func.count(LearningEpisode.id)) .where(LearningEpisode.created_at >= cutoff) ) recent_episodes = result.scalar() or 0 # Get current active config result = await self.db.execute( select(FairnessConfig) .where(FairnessConfig.is_active == True) .limit(1) ) current_config_model = result.scalar_one_or_none() current_config = None if current_config_model: current_config = { "gini_threshold": current_config_model.gini_threshold, "stddev_threshold": current_config_model.stddev_threshold, "recovery_lightening_factor": current_config_model.recovery_lightening_factor, "ev_charging_penalty_weight": current_config_model.ev_charging_penalty_weight, } return { "current_config": current_config, "top_performing_configs": top_configs, "driver_models_active": active_models, "avg_prediction_mse": float(avg_mse), "recent_episodes_7d": recent_episodes, "total_arms": self.bandit.num_arms, "bandit_statistics": { "total_samples": int(np.sum(self.bandit.samples)), "explored_arms": int(np.sum(self.bandit.samples > 0)), } }