""" Resource Allocation Engine ========================== Two-stage allocation pipeline: Stage 1 — ASSIGNMENT : Hungarian Algorithm (scipy.optimize.linear_sum_assignment) - Builds a cost matrix: teams × tasks - Cost = distance + priority_penalty + skill_mismatch_penalty - Solves the full assignment in O(n³) — optimal, not greedy - Replaces the previous greedy loop which could strand high-priority tasks Stage 2 — ROUTING : Priority-Weighted Nearest Neighbor + 2-opt - Same algorithms as before but with the O(n³) 2-opt bug fixed - Only two affected edges compared per swap (not full route) Why Hungarian over greedy? Greedy allocation can assign a high-capacity team to low-priority tasks just because they appear first, leaving critical tasks unassigned. Hungarian solves the full bipartite matching at once, minimizing total cost. Allocation Strategies: - manual : explicit team → task assignments from caller - priority_based: auto-assign highest priority tasks first (Hungarian) - proximity_based: minimize total travel distance (Hungarian) - balanced : weighted combination of both (Hungarian) """ from typing import List, Dict, Optional, Tuple from pydantic import BaseModel, Field from fastapi import HTTPException from datetime import datetime import numpy as np try: from scipy.optimize import linear_sum_assignment SCIPY_AVAILABLE = True except ImportError: SCIPY_AVAILABLE = False from enum import Enum # ============================================================================ # DATA MODELS # ============================================================================ class TaskStatus(str, Enum): UNASSIGNED = "unassigned" ASSIGNED = "assigned" IN_PROGRESS = "in_progress" COMPLETED = "completed" CANCELLED = "cancelled" class FieldTeam(BaseModel): id: str name: str base_location: Tuple[float, float] capacity: int = Field(ge=1) skills: Optional[List[str]] = Field(default_factory=list) status: str = "available" current_load: int = 0 class Task(BaseModel): id: str district: str location: Tuple[float, float] priority: float = Field(ge=0.0, le=1.0) status: TaskStatus = TaskStatus.UNASSIGNED required_skills: Optional[List[str]] = Field(default_factory=list) estimated_duration_hours: float = 2.0 disaster_type: Optional[str] = None # NEW: flood/cyclone/landslide/earthquake risk_score: Optional[float] = None # NEW: from FNN prediction description: Optional[str] = None class Allocation(BaseModel): team_id: str task_ids: List[str] assigned_at: datetime = Field(default_factory=datetime.now) optimized_route: Optional[List[str]] = None total_distance_km: Optional[float] = None estimated_completion_hours: Optional[float] = None assignment_method: str = "manual" class AllocationRequest(BaseModel): team_assignments: Dict[str, List[str]] optimize_routes: bool = True respect_capacity: bool = True class AutoAllocationRequest(BaseModel): strategy: str = Field( default="priority_based", description="Strategy: priority_based, proximity_based, or balanced" ) optimize_routes: bool = True priority_weight: float = Field(default=0.5, ge=0.0, le=1.0, description="For 'balanced': weight given to priority vs proximity") # ============================================================================ # IN-MEMORY STORAGE # ============================================================================ TEAMS: Dict[str, FieldTeam] = {} TASKS: Dict[str, Task] = {} ALLOCATIONS: Dict[str, Allocation] = {} # ============================================================================ # DISTANCE UTILITY # ============================================================================ def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: R = 6371 phi1, phi2 = np.radians(lat1), np.radians(lat2) delta_phi = np.radians(lat2 - lat1) delta_lambda = np.radians(lon2 - lon1) a = np.sin(delta_phi/2)**2 + np.cos(phi1)*np.cos(phi2)*np.sin(delta_lambda/2)**2 return R * 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a)) def calculate_route_distance(locations: List[Tuple[float, float]]) -> float: return sum( haversine_distance( locations[i][0], locations[i][1], locations[i+1][0], locations[i+1][1] ) for i in range(len(locations) - 1) ) # ============================================================================ # ROUTE OPTIMIZATION # ============================================================================ class RouteOptimizer: @staticmethod def priority_weighted_nearest_neighbor( start_location: Tuple[float, float], tasks: List[Task] ) -> Tuple[List[str], float]: """Greedy nearest-neighbor weighted by priority. O(n²).""" if not tasks: return [], 0.0 unvisited = tasks.copy() route = [] current = start_location total_distance = 0.0 while unvisited: best_idx, best_score = None, float('inf') for idx, task in enumerate(unvisited): dist = haversine_distance(*current, *task.location) score = dist * (1.0 - task.priority + 0.1) if score < best_score: best_score, best_idx = score, idx task = unvisited.pop(best_idx) dist = haversine_distance(*current, *task.location) total_distance += dist current = task.location route.append(task.id) return route, total_distance @staticmethod def two_opt_optimization( route: List[str], task_dict: Dict[str, Task], start_location: Tuple[float, float], max_iterations: int = 100 ) -> Tuple[List[str], float]: """ 2-opt local search. Fix: compares only the two swapped edges, not the full route. Reduces complexity from O(n³) to O(n²) per iteration. """ if len(route) <= 2: locs = [start_location] + [task_dict[tid].location for tid in route] return route, calculate_route_distance(locs) def get_locs(r): return [start_location] + [task_dict[tid].location for tid in r] current_route = route.copy() all_locs = get_locs(current_route) for _ in range(max_iterations): improved = False for i in range(len(current_route) - 1): for j in range(i + 2, len(current_route)): loc_i = all_locs[i] loc_i1 = all_locs[i + 1] loc_j = all_locs[j] loc_j1 = all_locs[j + 1] if j + 1 < len(all_locs) else None old = haversine_distance(*loc_i, *loc_i1) old += haversine_distance(*loc_j, *loc_j1) if loc_j1 else 0 new = haversine_distance(*loc_i, *loc_j) new += haversine_distance(*loc_i1, *loc_j1) if loc_j1 else 0 if new < old - 1e-10: current_route[i+1:j+1] = current_route[i+1:j+1][::-1] all_locs = get_locs(current_route) improved = True break if improved: break if not improved: break return current_route, calculate_route_distance(all_locs) # ============================================================================ # HUNGARIAN ASSIGNMENT # ============================================================================ class HungarianAssigner: """ Builds a cost matrix and solves team-task assignment optimally. Cost matrix dimensions: (n_teams_expanded × n_tasks) Each team is expanded into [capacity] slots so one team can take multiple tasks while respecting capacity constraints. Cost function (configurable via strategy): priority_based : cost = (1 - priority) — minimize ignored priority proximity_based : cost = distance(team_base, task_location) balanced : cost = α * normalized_distance + (1-α) * (1-priority) Skill mismatch → cost = 1e9 (effectively forbidden assignment) """ LARGE = 1e9 @staticmethod def build_cost_matrix( teams: List[FieldTeam], tasks: List[Task], strategy: str = "balanced", priority_weight: float = 0.5 ) -> Tuple[np.ndarray, List[Tuple[str, int]], List[str]]: """ Returns: cost_matrix : (n_slots, n_tasks) numpy array slot_index : maps row → (team_id, slot_number) task_index : maps col → task_id """ # Expand teams into capacity slots slot_index = [] for team in teams: remaining = team.capacity - team.current_load for slot in range(max(0, remaining)): slot_index.append((team.id, slot)) task_index = [task.id for task in tasks] n_slots = len(slot_index) n_tasks = len(task_index) if n_slots == 0 or n_tasks == 0: return np.zeros((0, 0)), slot_index, task_index # Precompute max distance for normalization all_dists = [] for team in teams: for task in tasks: all_dists.append(haversine_distance( *team.base_location, *task.location )) max_dist = max(all_dists) if all_dists else 1.0 # Build team lookup team_map = {t.id: t for t in teams} cost_matrix = np.full((n_slots, n_tasks), HungarianAssigner.LARGE) for row, (team_id, _) in enumerate(slot_index): team = team_map[team_id] for col, task in enumerate(tasks): # Skill check if task.required_skills: if not all(s in team.skills for s in task.required_skills): continue # Leaves LARGE — forbidden dist = haversine_distance(*team.base_location, *task.location) norm_dist = dist / max_dist if strategy == "priority_based": cost = 1.0 - task.priority elif strategy == "proximity_based": cost = norm_dist else: # balanced α = priority_weight cost = α * (1.0 - task.priority) + (1.0 - α) * norm_dist cost_matrix[row, col] = cost return cost_matrix, slot_index, task_index @staticmethod def solve( teams: List[FieldTeam], tasks: List[Task], strategy: str = "balanced", priority_weight: float = 0.5 ) -> Dict[str, List[str]]: """ Returns dict: {team_id: [task_id, ...]} Uses scipy.optimize.linear_sum_assignment (Hungarian algorithm). Falls back to greedy if scipy unavailable. """ if not SCIPY_AVAILABLE: return HungarianAssigner._greedy_fallback(teams, tasks) cost_matrix, slot_index, task_index = HungarianAssigner.build_cost_matrix( teams, tasks, strategy, priority_weight ) if cost_matrix.size == 0: return {} # Pad to square if needed (Hungarian works on rectangular too with scipy) row_ind, col_ind = linear_sum_assignment(cost_matrix) assignments: Dict[str, List[str]] = {} for row, col in zip(row_ind, col_ind): if cost_matrix[row, col] >= HungarianAssigner.LARGE: continue # Forbidden assignment (skill mismatch) team_id = slot_index[row][0] task_id = task_index[col] if team_id not in assignments: assignments[team_id] = [] assignments[team_id].append(task_id) return assignments @staticmethod def _greedy_fallback( teams: List[FieldTeam], tasks: List[Task] ) -> Dict[str, List[str]]: """Simple greedy fallback if scipy is unavailable.""" assignments: Dict[str, List[str]] = {} unassigned = tasks.copy() unassigned.sort(key=lambda t: t.priority, reverse=True) for team in sorted(teams, key=lambda t: t.current_load): capacity_left = team.capacity - team.current_load assignable = [] for task in unassigned: if len(assignable) >= capacity_left: break if task.required_skills: if not all(s in team.skills for s in task.required_skills): continue assignable.append(task) if assignable: assignments[team.id] = [t.id for t in assignable] for t in assignable: unassigned.remove(t) return assignments # ============================================================================ # ALLOCATION ENGINE # ============================================================================ class AllocationEngine: @staticmethod def validate_allocation( team: FieldTeam, task_ids: List[str], respect_capacity: bool = True ) -> bool: if respect_capacity and (team.current_load + len(task_ids)) > team.capacity: return False for task_id in task_ids: if task_id not in TASKS: return False task = TASKS[task_id] if task.required_skills: if not all(s in team.skills for s in task.required_skills): return False return True @staticmethod def manual_allocation( team_id: str, task_ids: List[str], optimize_route: bool = True, respect_capacity: bool = True ) -> Allocation: if team_id not in TEAMS: raise HTTPException(404, f"Team {team_id} not found") team = TEAMS[team_id] tasks = [] for task_id in task_ids: if task_id not in TASKS: raise HTTPException(404, f"Task {task_id} not found") task = TASKS[task_id] if task.status != TaskStatus.UNASSIGNED: raise HTTPException(400, f"Task {task_id} is already {task.status}") tasks.append(task) if not AllocationEngine.validate_allocation(team, task_ids, respect_capacity): raise HTTPException(400, f"Team {team_id} cannot handle these tasks") optimized_route = task_ids total_distance = 0.0 estimated_hours = sum(TASKS[tid].estimated_duration_hours for tid in task_ids) if optimize_route and len(tasks) > 1: optimized_route, total_distance = RouteOptimizer.priority_weighted_nearest_neighbor( team.base_location, tasks ) task_dict = {t.id: t for t in tasks} optimized_route, total_distance = RouteOptimizer.two_opt_optimization( optimized_route, task_dict, team.base_location ) estimated_hours += total_distance / 60 # assume 60 km/h allocation = Allocation( team_id=team_id, task_ids=optimized_route, optimized_route=optimized_route, total_distance_km=round(total_distance, 2), estimated_completion_hours=round(estimated_hours, 2), assignment_method="manual" ) for task_id in task_ids: TASKS[task_id].status = TaskStatus.ASSIGNED team.current_load += len(task_ids) alloc_id = f"{team_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" ALLOCATIONS[alloc_id] = allocation return allocation @staticmethod def auto_allocation( strategy: str = "balanced", optimize_routes: bool = True, priority_weight: float = 0.5 ) -> List[Allocation]: """ Hungarian-based auto allocation. Solves optimal team-task assignment, then optimizes routes. """ unassigned_tasks = [t for t in TASKS.values() if t.status == TaskStatus.UNASSIGNED] available_teams = [t for t in TEAMS.values() if t.current_load < t.capacity] if not unassigned_tasks or not available_teams: return [] assignments = HungarianAssigner.solve( available_teams, unassigned_tasks, strategy, priority_weight ) allocations = [] for team_id, task_ids in assignments.items(): if not task_ids: continue try: allocation = AllocationEngine.manual_allocation( team_id, task_ids, optimize_route=optimize_routes, respect_capacity=False # Already handled by Hungarian ) allocation.assignment_method = f"hungarian_{strategy}" allocations.append(allocation) except Exception as e: print(f"Allocation failed for {team_id}: {e}") return allocations # Kept for backward compatibility @staticmethod def auto_allocation_priority_based() -> List[Allocation]: return AllocationEngine.auto_allocation("priority_based") @staticmethod def auto_allocation_proximity_based() -> List[Allocation]: return AllocationEngine.auto_allocation("proximity_based") # ============================================================================ # INITIALIZATION & HELPERS # ============================================================================ def initialize_from_districts(districts_df) -> list: import pandas as pd tasks = [] for _, row in districts_df.iterrows(): task = Task( id=f"task_{row['District'].lower().replace(' ', '_')}", district=row['District'], location=(row['Latitude'], row['Longitude']), priority=row.get('Vulnerability_Index', 0.5), description=f"Relief operations in {row['District']}" ) TASKS[task.id] = task tasks.append(task) return tasks def initialize_default_teams(): default_teams = [ FieldTeam(id="team_alpha", name="Team Alpha", base_location=(20.2961, 85.8245), capacity=5, skills=["medical", "rescue", "evacuation"]), FieldTeam(id="team_beta", name="Team Beta", base_location=(20.4625, 85.8828), capacity=4, skills=["rescue", "relief_distribution"]), FieldTeam(id="team_gamma", name="Team Gamma", base_location=(19.3150, 84.7941), capacity=4, skills=["medical", "infrastructure_assessment"]), FieldTeam(id="team_delta", name="Team Delta", base_location=(21.4934, 86.9336), capacity=3, skills=["rescue", "evacuation", "coastal_operations"]), ] for team in default_teams: TEAMS[team.id] = team return default_teams def get_allocation_summary() -> Dict: total = len(TASKS) assigned = sum(1 for t in TASKS.values() if t.status == TaskStatus.ASSIGNED) return { "total_teams": len(TEAMS), "active_allocations": len(ALLOCATIONS), "total_tasks": total, "assigned_tasks": assigned, "unassigned_tasks": total - assigned, "assignment_algorithm": "Hungarian (O(n³) optimal)" if SCIPY_AVAILABLE else "Greedy fallback", "teams": [ { "id": t.id, "name": t.name, "current_load": t.current_load, "capacity": t.capacity, "utilization_pct": round(t.current_load / t.capacity * 100, 1) } for t in TEAMS.values() ] } def reset_all_allocations(): for task in TASKS.values(): task.status = TaskStatus.UNASSIGNED for team in TEAMS.values(): team.current_load = 0 ALLOCATIONS.clear()