Spaces:
Sleeping
Sleeping
| """ | |
| Resource Allocation Engine | |
| ========================== | |
| Two-stage allocation pipeline: | |
| Stage 1 — ASSIGNMENT : Hungarian Algorithm (scipy.optimize.linear_sum_assignment) | |
| - Builds a cost matrix: teams × tasks | |
| - Cost = distance + priority_penalty + skill_mismatch_penalty | |
| - Solves the full assignment in O(n³) — optimal, not greedy | |
| - Replaces the previous greedy loop which could strand high-priority tasks | |
| Stage 2 — ROUTING : Priority-Weighted Nearest Neighbor + 2-opt | |
| - Same algorithms as before but with the O(n³) 2-opt bug fixed | |
| - Only two affected edges compared per swap (not full route) | |
| Why Hungarian over greedy? | |
| Greedy allocation can assign a high-capacity team to low-priority tasks | |
| just because they appear first, leaving critical tasks unassigned. | |
| Hungarian solves the full bipartite matching at once, minimizing total cost. | |
| Allocation Strategies: | |
| - manual : explicit team → task assignments from caller | |
| - priority_based: auto-assign highest priority tasks first (Hungarian) | |
| - proximity_based: minimize total travel distance (Hungarian) | |
| - balanced : weighted combination of both (Hungarian) | |
| """ | |
| from typing import List, Dict, Optional, Tuple | |
| from pydantic import BaseModel, Field | |
| from fastapi import HTTPException | |
| from datetime import datetime | |
| import numpy as np | |
| try: | |
| from scipy.optimize import linear_sum_assignment | |
| SCIPY_AVAILABLE = True | |
| except ImportError: | |
| SCIPY_AVAILABLE = False | |
| from enum import Enum | |
| # ============================================================================ | |
| # DATA MODELS | |
| # ============================================================================ | |
| class TaskStatus(str, Enum): | |
| UNASSIGNED = "unassigned" | |
| ASSIGNED = "assigned" | |
| IN_PROGRESS = "in_progress" | |
| COMPLETED = "completed" | |
| CANCELLED = "cancelled" | |
| class FieldTeam(BaseModel): | |
| id: str | |
| name: str | |
| base_location: Tuple[float, float] | |
| capacity: int = Field(ge=1) | |
| skills: Optional[List[str]] = Field(default_factory=list) | |
| status: str = "available" | |
| current_load: int = 0 | |
| class Task(BaseModel): | |
| id: str | |
| district: str | |
| location: Tuple[float, float] | |
| priority: float = Field(ge=0.0, le=1.0) | |
| status: TaskStatus = TaskStatus.UNASSIGNED | |
| required_skills: Optional[List[str]] = Field(default_factory=list) | |
| estimated_duration_hours: float = 2.0 | |
| disaster_type: Optional[str] = None # NEW: flood/cyclone/landslide/earthquake | |
| risk_score: Optional[float] = None # NEW: from FNN prediction | |
| description: Optional[str] = None | |
| class Allocation(BaseModel): | |
| team_id: str | |
| task_ids: List[str] | |
| assigned_at: datetime = Field(default_factory=datetime.now) | |
| optimized_route: Optional[List[str]] = None | |
| total_distance_km: Optional[float] = None | |
| estimated_completion_hours: Optional[float] = None | |
| assignment_method: str = "manual" | |
| class AllocationRequest(BaseModel): | |
| team_assignments: Dict[str, List[str]] | |
| optimize_routes: bool = True | |
| respect_capacity: bool = True | |
| class AutoAllocationRequest(BaseModel): | |
| strategy: str = Field( | |
| default="priority_based", | |
| description="Strategy: priority_based, proximity_based, or balanced" | |
| ) | |
| optimize_routes: bool = True | |
| priority_weight: float = Field(default=0.5, ge=0.0, le=1.0, | |
| description="For 'balanced': weight given to priority vs proximity") | |
| # ============================================================================ | |
| # IN-MEMORY STORAGE | |
| # ============================================================================ | |
| TEAMS: Dict[str, FieldTeam] = {} | |
| TASKS: Dict[str, Task] = {} | |
| ALLOCATIONS: Dict[str, Allocation] = {} | |
| # ============================================================================ | |
| # DISTANCE UTILITY | |
| # ============================================================================ | |
| def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float: | |
| R = 6371 | |
| phi1, phi2 = np.radians(lat1), np.radians(lat2) | |
| delta_phi = np.radians(lat2 - lat1) | |
| delta_lambda = np.radians(lon2 - lon1) | |
| a = np.sin(delta_phi/2)**2 + np.cos(phi1)*np.cos(phi2)*np.sin(delta_lambda/2)**2 | |
| return R * 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a)) | |
| def calculate_route_distance(locations: List[Tuple[float, float]]) -> float: | |
| return sum( | |
| haversine_distance( | |
| locations[i][0], locations[i][1], | |
| locations[i+1][0], locations[i+1][1] | |
| ) | |
| for i in range(len(locations) - 1) | |
| ) | |
| # ============================================================================ | |
| # ROUTE OPTIMIZATION | |
| # ============================================================================ | |
| class RouteOptimizer: | |
| def priority_weighted_nearest_neighbor( | |
| start_location: Tuple[float, float], | |
| tasks: List[Task] | |
| ) -> Tuple[List[str], float]: | |
| """Greedy nearest-neighbor weighted by priority. O(n²).""" | |
| if not tasks: | |
| return [], 0.0 | |
| unvisited = tasks.copy() | |
| route = [] | |
| current = start_location | |
| total_distance = 0.0 | |
| while unvisited: | |
| best_idx, best_score = None, float('inf') | |
| for idx, task in enumerate(unvisited): | |
| dist = haversine_distance(*current, *task.location) | |
| score = dist * (1.0 - task.priority + 0.1) | |
| if score < best_score: | |
| best_score, best_idx = score, idx | |
| task = unvisited.pop(best_idx) | |
| dist = haversine_distance(*current, *task.location) | |
| total_distance += dist | |
| current = task.location | |
| route.append(task.id) | |
| return route, total_distance | |
| def two_opt_optimization( | |
| route: List[str], | |
| task_dict: Dict[str, Task], | |
| start_location: Tuple[float, float], | |
| max_iterations: int = 100 | |
| ) -> Tuple[List[str], float]: | |
| """ | |
| 2-opt local search. | |
| Fix: compares only the two swapped edges, not the full route. | |
| Reduces complexity from O(n³) to O(n²) per iteration. | |
| """ | |
| if len(route) <= 2: | |
| locs = [start_location] + [task_dict[tid].location for tid in route] | |
| return route, calculate_route_distance(locs) | |
| def get_locs(r): | |
| return [start_location] + [task_dict[tid].location for tid in r] | |
| current_route = route.copy() | |
| all_locs = get_locs(current_route) | |
| for _ in range(max_iterations): | |
| improved = False | |
| for i in range(len(current_route) - 1): | |
| for j in range(i + 2, len(current_route)): | |
| loc_i = all_locs[i] | |
| loc_i1 = all_locs[i + 1] | |
| loc_j = all_locs[j] | |
| loc_j1 = all_locs[j + 1] if j + 1 < len(all_locs) else None | |
| old = haversine_distance(*loc_i, *loc_i1) | |
| old += haversine_distance(*loc_j, *loc_j1) if loc_j1 else 0 | |
| new = haversine_distance(*loc_i, *loc_j) | |
| new += haversine_distance(*loc_i1, *loc_j1) if loc_j1 else 0 | |
| if new < old - 1e-10: | |
| current_route[i+1:j+1] = current_route[i+1:j+1][::-1] | |
| all_locs = get_locs(current_route) | |
| improved = True | |
| break | |
| if improved: | |
| break | |
| if not improved: | |
| break | |
| return current_route, calculate_route_distance(all_locs) | |
| # ============================================================================ | |
| # HUNGARIAN ASSIGNMENT | |
| # ============================================================================ | |
| class HungarianAssigner: | |
| """ | |
| Builds a cost matrix and solves team-task assignment optimally. | |
| Cost matrix dimensions: (n_teams_expanded × n_tasks) | |
| Each team is expanded into [capacity] slots so one team can take | |
| multiple tasks while respecting capacity constraints. | |
| Cost function (configurable via strategy): | |
| priority_based : cost = (1 - priority) — minimize ignored priority | |
| proximity_based : cost = distance(team_base, task_location) | |
| balanced : cost = α * normalized_distance + (1-α) * (1-priority) | |
| Skill mismatch → cost = 1e9 (effectively forbidden assignment) | |
| """ | |
| LARGE = 1e9 | |
| def build_cost_matrix( | |
| teams: List[FieldTeam], | |
| tasks: List[Task], | |
| strategy: str = "balanced", | |
| priority_weight: float = 0.5 | |
| ) -> Tuple[np.ndarray, List[Tuple[str, int]], List[str]]: | |
| """ | |
| Returns: | |
| cost_matrix : (n_slots, n_tasks) numpy array | |
| slot_index : maps row → (team_id, slot_number) | |
| task_index : maps col → task_id | |
| """ | |
| # Expand teams into capacity slots | |
| slot_index = [] | |
| for team in teams: | |
| remaining = team.capacity - team.current_load | |
| for slot in range(max(0, remaining)): | |
| slot_index.append((team.id, slot)) | |
| task_index = [task.id for task in tasks] | |
| n_slots = len(slot_index) | |
| n_tasks = len(task_index) | |
| if n_slots == 0 or n_tasks == 0: | |
| return np.zeros((0, 0)), slot_index, task_index | |
| # Precompute max distance for normalization | |
| all_dists = [] | |
| for team in teams: | |
| for task in tasks: | |
| all_dists.append(haversine_distance( | |
| *team.base_location, *task.location | |
| )) | |
| max_dist = max(all_dists) if all_dists else 1.0 | |
| # Build team lookup | |
| team_map = {t.id: t for t in teams} | |
| cost_matrix = np.full((n_slots, n_tasks), HungarianAssigner.LARGE) | |
| for row, (team_id, _) in enumerate(slot_index): | |
| team = team_map[team_id] | |
| for col, task in enumerate(tasks): | |
| # Skill check | |
| if task.required_skills: | |
| if not all(s in team.skills for s in task.required_skills): | |
| continue # Leaves LARGE — forbidden | |
| dist = haversine_distance(*team.base_location, *task.location) | |
| norm_dist = dist / max_dist | |
| if strategy == "priority_based": | |
| cost = 1.0 - task.priority | |
| elif strategy == "proximity_based": | |
| cost = norm_dist | |
| else: # balanced | |
| α = priority_weight | |
| cost = α * (1.0 - task.priority) + (1.0 - α) * norm_dist | |
| cost_matrix[row, col] = cost | |
| return cost_matrix, slot_index, task_index | |
| def solve( | |
| teams: List[FieldTeam], | |
| tasks: List[Task], | |
| strategy: str = "balanced", | |
| priority_weight: float = 0.5 | |
| ) -> Dict[str, List[str]]: | |
| """ | |
| Returns dict: {team_id: [task_id, ...]} | |
| Uses scipy.optimize.linear_sum_assignment (Hungarian algorithm). | |
| Falls back to greedy if scipy unavailable. | |
| """ | |
| if not SCIPY_AVAILABLE: | |
| return HungarianAssigner._greedy_fallback(teams, tasks) | |
| cost_matrix, slot_index, task_index = HungarianAssigner.build_cost_matrix( | |
| teams, tasks, strategy, priority_weight | |
| ) | |
| if cost_matrix.size == 0: | |
| return {} | |
| # Pad to square if needed (Hungarian works on rectangular too with scipy) | |
| row_ind, col_ind = linear_sum_assignment(cost_matrix) | |
| assignments: Dict[str, List[str]] = {} | |
| for row, col in zip(row_ind, col_ind): | |
| if cost_matrix[row, col] >= HungarianAssigner.LARGE: | |
| continue # Forbidden assignment (skill mismatch) | |
| team_id = slot_index[row][0] | |
| task_id = task_index[col] | |
| if team_id not in assignments: | |
| assignments[team_id] = [] | |
| assignments[team_id].append(task_id) | |
| return assignments | |
| def _greedy_fallback( | |
| teams: List[FieldTeam], | |
| tasks: List[Task] | |
| ) -> Dict[str, List[str]]: | |
| """Simple greedy fallback if scipy is unavailable.""" | |
| assignments: Dict[str, List[str]] = {} | |
| unassigned = tasks.copy() | |
| unassigned.sort(key=lambda t: t.priority, reverse=True) | |
| for team in sorted(teams, key=lambda t: t.current_load): | |
| capacity_left = team.capacity - team.current_load | |
| assignable = [] | |
| for task in unassigned: | |
| if len(assignable) >= capacity_left: | |
| break | |
| if task.required_skills: | |
| if not all(s in team.skills for s in task.required_skills): | |
| continue | |
| assignable.append(task) | |
| if assignable: | |
| assignments[team.id] = [t.id for t in assignable] | |
| for t in assignable: | |
| unassigned.remove(t) | |
| return assignments | |
| # ============================================================================ | |
| # ALLOCATION ENGINE | |
| # ============================================================================ | |
| class AllocationEngine: | |
| def validate_allocation( | |
| team: FieldTeam, | |
| task_ids: List[str], | |
| respect_capacity: bool = True | |
| ) -> bool: | |
| if respect_capacity and (team.current_load + len(task_ids)) > team.capacity: | |
| return False | |
| for task_id in task_ids: | |
| if task_id not in TASKS: | |
| return False | |
| task = TASKS[task_id] | |
| if task.required_skills: | |
| if not all(s in team.skills for s in task.required_skills): | |
| return False | |
| return True | |
| def manual_allocation( | |
| team_id: str, | |
| task_ids: List[str], | |
| optimize_route: bool = True, | |
| respect_capacity: bool = True | |
| ) -> Allocation: | |
| if team_id not in TEAMS: | |
| raise HTTPException(404, f"Team {team_id} not found") | |
| team = TEAMS[team_id] | |
| tasks = [] | |
| for task_id in task_ids: | |
| if task_id not in TASKS: | |
| raise HTTPException(404, f"Task {task_id} not found") | |
| task = TASKS[task_id] | |
| if task.status != TaskStatus.UNASSIGNED: | |
| raise HTTPException(400, f"Task {task_id} is already {task.status}") | |
| tasks.append(task) | |
| if not AllocationEngine.validate_allocation(team, task_ids, respect_capacity): | |
| raise HTTPException(400, f"Team {team_id} cannot handle these tasks") | |
| optimized_route = task_ids | |
| total_distance = 0.0 | |
| estimated_hours = sum(TASKS[tid].estimated_duration_hours for tid in task_ids) | |
| if optimize_route and len(tasks) > 1: | |
| optimized_route, total_distance = RouteOptimizer.priority_weighted_nearest_neighbor( | |
| team.base_location, tasks | |
| ) | |
| task_dict = {t.id: t for t in tasks} | |
| optimized_route, total_distance = RouteOptimizer.two_opt_optimization( | |
| optimized_route, task_dict, team.base_location | |
| ) | |
| estimated_hours += total_distance / 60 # assume 60 km/h | |
| allocation = Allocation( | |
| team_id=team_id, | |
| task_ids=optimized_route, | |
| optimized_route=optimized_route, | |
| total_distance_km=round(total_distance, 2), | |
| estimated_completion_hours=round(estimated_hours, 2), | |
| assignment_method="manual" | |
| ) | |
| for task_id in task_ids: | |
| TASKS[task_id].status = TaskStatus.ASSIGNED | |
| team.current_load += len(task_ids) | |
| alloc_id = f"{team_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| ALLOCATIONS[alloc_id] = allocation | |
| return allocation | |
| def auto_allocation( | |
| strategy: str = "balanced", | |
| optimize_routes: bool = True, | |
| priority_weight: float = 0.5 | |
| ) -> List[Allocation]: | |
| """ | |
| Hungarian-based auto allocation. | |
| Solves optimal team-task assignment, then optimizes routes. | |
| """ | |
| unassigned_tasks = [t for t in TASKS.values() if t.status == TaskStatus.UNASSIGNED] | |
| available_teams = [t for t in TEAMS.values() if t.current_load < t.capacity] | |
| if not unassigned_tasks or not available_teams: | |
| return [] | |
| assignments = HungarianAssigner.solve( | |
| available_teams, unassigned_tasks, strategy, priority_weight | |
| ) | |
| allocations = [] | |
| for team_id, task_ids in assignments.items(): | |
| if not task_ids: | |
| continue | |
| try: | |
| allocation = AllocationEngine.manual_allocation( | |
| team_id, task_ids, | |
| optimize_route=optimize_routes, | |
| respect_capacity=False # Already handled by Hungarian | |
| ) | |
| allocation.assignment_method = f"hungarian_{strategy}" | |
| allocations.append(allocation) | |
| except Exception as e: | |
| print(f"Allocation failed for {team_id}: {e}") | |
| return allocations | |
| # Kept for backward compatibility | |
| def auto_allocation_priority_based() -> List[Allocation]: | |
| return AllocationEngine.auto_allocation("priority_based") | |
| def auto_allocation_proximity_based() -> List[Allocation]: | |
| return AllocationEngine.auto_allocation("proximity_based") | |
| # ============================================================================ | |
| # INITIALIZATION & HELPERS | |
| # ============================================================================ | |
| def initialize_from_districts(districts_df) -> list: | |
| import pandas as pd | |
| tasks = [] | |
| for _, row in districts_df.iterrows(): | |
| task = Task( | |
| id=f"task_{row['District'].lower().replace(' ', '_')}", | |
| district=row['District'], | |
| location=(row['Latitude'], row['Longitude']), | |
| priority=row.get('Vulnerability_Index', 0.5), | |
| description=f"Relief operations in {row['District']}" | |
| ) | |
| TASKS[task.id] = task | |
| tasks.append(task) | |
| return tasks | |
| def initialize_default_teams(): | |
| default_teams = [ | |
| FieldTeam(id="team_alpha", name="Team Alpha", | |
| base_location=(20.2961, 85.8245), | |
| capacity=5, skills=["medical", "rescue", "evacuation"]), | |
| FieldTeam(id="team_beta", name="Team Beta", | |
| base_location=(20.4625, 85.8828), | |
| capacity=4, skills=["rescue", "relief_distribution"]), | |
| FieldTeam(id="team_gamma", name="Team Gamma", | |
| base_location=(19.3150, 84.7941), | |
| capacity=4, skills=["medical", "infrastructure_assessment"]), | |
| FieldTeam(id="team_delta", name="Team Delta", | |
| base_location=(21.4934, 86.9336), | |
| capacity=3, skills=["rescue", "evacuation", "coastal_operations"]), | |
| ] | |
| for team in default_teams: | |
| TEAMS[team.id] = team | |
| return default_teams | |
| def get_allocation_summary() -> Dict: | |
| total = len(TASKS) | |
| assigned = sum(1 for t in TASKS.values() if t.status == TaskStatus.ASSIGNED) | |
| return { | |
| "total_teams": len(TEAMS), | |
| "active_allocations": len(ALLOCATIONS), | |
| "total_tasks": total, | |
| "assigned_tasks": assigned, | |
| "unassigned_tasks": total - assigned, | |
| "assignment_algorithm": "Hungarian (O(n³) optimal)" if SCIPY_AVAILABLE else "Greedy fallback", | |
| "teams": [ | |
| { | |
| "id": t.id, "name": t.name, | |
| "current_load": t.current_load, "capacity": t.capacity, | |
| "utilization_pct": round(t.current_load / t.capacity * 100, 1) | |
| } | |
| for t in TEAMS.values() | |
| ] | |
| } | |
| def reset_all_allocations(): | |
| for task in TASKS.values(): | |
| task.status = TaskStatus.UNASSIGNED | |
| for team in TEAMS.values(): | |
| team.current_load = 0 | |
| ALLOCATIONS.clear() |