Spaces:
Sleeping
Sleeping
| """API routes for the delivery search application.""" | |
| from fastapi import APIRouter, HTTPException | |
| from typing import List | |
| from ..models.requests import ( | |
| GridConfig, | |
| SearchRequest, | |
| PathRequest, | |
| CompareRequest, | |
| Position, | |
| GenerateResponse, | |
| SearchResponse, | |
| PlanResponse, | |
| ComparisonResult, | |
| CompareResponse, | |
| AlgorithmInfo, | |
| AlgorithmsResponse, | |
| GridData, | |
| StoreData, | |
| DestinationData, | |
| TunnelData, | |
| SegmentData, | |
| ) | |
| from ..services import gen_grid, parse_full_state, measure_performance | |
| from ..core import DeliverySearch, DeliveryPlanner | |
| router = APIRouter() | |
| # Algorithm metadata | |
| ALGORITHMS = [ | |
| AlgorithmInfo( | |
| code="BF", | |
| name="Breadth-First Search", | |
| description="Explores all nodes at current depth before moving deeper. Finds shortest path in terms of steps.", | |
| ), | |
| AlgorithmInfo( | |
| code="DF", | |
| name="Depth-First Search", | |
| description="Explores as far as possible along each branch. Memory efficient but may not find optimal path.", | |
| ), | |
| AlgorithmInfo( | |
| code="ID", | |
| name="Iterative Deepening", | |
| description="Combines BFS completeness with DFS space efficiency. Good for unknown depth goals.", | |
| ), | |
| AlgorithmInfo( | |
| code="UC", | |
| name="Uniform Cost Search", | |
| description="Expands lowest-cost node first. Always finds the optimal (minimum cost) solution.", | |
| ), | |
| AlgorithmInfo( | |
| code="GR1", | |
| name="Greedy (Manhattan)", | |
| description="Uses Manhattan distance heuristic. Fast but may not find optimal path.", | |
| ), | |
| AlgorithmInfo( | |
| code="GR2", | |
| name="Greedy (Euclidean)", | |
| description="Uses Euclidean distance heuristic. Fast but may not find optimal path.", | |
| ), | |
| AlgorithmInfo( | |
| code="AS1", | |
| name="A* (Manhattan)", | |
| description="A* with Manhattan distance. Optimal and complete with admissible heuristic.", | |
| ), | |
| AlgorithmInfo( | |
| code="AS2", | |
| name="A* (Tunnel-Aware)", | |
| description="A* considering tunnel shortcuts. More informed for grids with tunnels.", | |
| ), | |
| ] | |
| async def health_check(): | |
| """Health check endpoint.""" | |
| return {"status": "ok"} | |
| async def list_algorithms(): | |
| """List available search algorithms.""" | |
| return AlgorithmsResponse(algorithms=ALGORITHMS) | |
| async def generate_grid(config: GridConfig): | |
| """Generate a random grid configuration.""" | |
| try: | |
| initial_state, traffic, state = gen_grid( | |
| width=config.width, | |
| height=config.height, | |
| num_stores=config.num_stores, | |
| num_destinations=config.num_destinations, | |
| num_tunnels=config.num_tunnels, | |
| obstacle_density=config.obstacle_density, | |
| ) | |
| # Convert to GridData for frontend | |
| parsed = GridData( | |
| width=state.grid.width, | |
| height=state.grid.height, | |
| stores=[ | |
| StoreData(id=s.id, position=Position(x=s.position[0], y=s.position[1])) | |
| for s in state.stores | |
| ], | |
| destinations=[ | |
| DestinationData( | |
| id=d.id, position=Position(x=d.position[0], y=d.position[1]) | |
| ) | |
| for d in state.destinations | |
| ], | |
| tunnels=[ | |
| TunnelData( | |
| entrance1=Position(x=t.entrance1[0], y=t.entrance1[1]), | |
| entrance2=Position(x=t.entrance2[0], y=t.entrance2[1]), | |
| cost=t.cost, | |
| ) | |
| for t in state.tunnels | |
| ], | |
| segments=[ | |
| SegmentData( | |
| src=Position(x=seg.src[0], y=seg.src[1]), | |
| dst=Position(x=seg.dst[0], y=seg.dst[1]), | |
| traffic=seg.traffic, | |
| ) | |
| for seg in state.grid.segments.values() | |
| ], | |
| ) | |
| return GenerateResponse( | |
| initial_state=initial_state, traffic=traffic, parsed=parsed | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def find_path(request: PathRequest): | |
| """Find path from start to goal using specified strategy.""" | |
| try: | |
| from ..models.grid import Grid | |
| from ..models.entities import Tunnel | |
| # Build grid from request | |
| grid = Grid(width=request.grid_width, height=request.grid_height) | |
| for seg in request.segments: | |
| grid.add_segment( | |
| (seg.src.x, seg.src.y), (seg.dst.x, seg.dst.y), seg.traffic | |
| ) | |
| # Build tunnels | |
| tunnels = [ | |
| Tunnel( | |
| entrance1=(t.entrance1.x, t.entrance1.y), | |
| entrance2=(t.entrance2.x, t.entrance2.y), | |
| ) | |
| for t in request.tunnels | |
| ] | |
| # Run search with metrics | |
| with measure_performance() as metrics: | |
| result, steps = DeliverySearch.path( | |
| grid, | |
| (request.start.x, request.start.y), | |
| (request.goal.x, request.goal.y), | |
| tunnels, | |
| request.strategy.value, | |
| visualize=True, | |
| ) | |
| metrics.sample() | |
| return SearchResponse( | |
| plan=result.plan, | |
| cost=result.cost, | |
| nodes_expanded=result.nodes_expanded, | |
| runtime_ms=metrics.runtime_ms, | |
| memory_kb=max(0, metrics.memory_kb), | |
| cpu_percent=metrics.cpu_percent, | |
| path=[Position(x=p[0], y=p[1]) for p in result.path], | |
| steps=[s.to_dict() for s in steps] if steps else None, | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def create_plan(request: SearchRequest): | |
| """Create full delivery plan for all trucks and destinations.""" | |
| try: | |
| # Parse state | |
| state = parse_full_state(request.initial_state, request.traffic) | |
| # Run planner with metrics | |
| with measure_performance() as metrics: | |
| plan_result, viz_data = DeliveryPlanner.plan_from_state( | |
| state.grid, | |
| state.stores, | |
| state.destinations, | |
| state.tunnels, | |
| request.strategy.value, | |
| request.visualize, | |
| ) | |
| metrics.sample() | |
| return PlanResponse( | |
| output=plan_result.to_string(), | |
| assignments=[a.to_dict() for a in plan_result.assignments], | |
| total_cost=plan_result.total_cost, | |
| total_nodes_expanded=plan_result.total_nodes_expanded, | |
| runtime_ms=metrics.runtime_ms, | |
| memory_kb=max(0, metrics.memory_kb), | |
| cpu_percent=metrics.cpu_percent, | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def compare_algorithms(request: CompareRequest): | |
| """Run all algorithms on same problem and return comparison.""" | |
| try: | |
| state = parse_full_state(request.initial_state, request.traffic) | |
| results: List[ComparisonResult] = [] | |
| optimal_cost = float("inf") | |
| # Run each algorithm | |
| for algo_info in ALGORITHMS: | |
| with measure_performance() as metrics: | |
| plan_result, _ = DeliveryPlanner.plan_from_state( | |
| state.grid, | |
| state.stores, | |
| state.destinations, | |
| state.tunnels, | |
| algo_info.code, | |
| visualize=False, | |
| ) | |
| metrics.sample() | |
| # Track optimal cost (from UCS or A*) | |
| if algo_info.code in ["UC", "AS1", "AS2"]: | |
| optimal_cost = min(optimal_cost, plan_result.total_cost) | |
| results.append( | |
| ComparisonResult( | |
| algorithm=algo_info.code, | |
| name=algo_info.name, | |
| plan=plan_result.to_string(), | |
| cost=plan_result.total_cost, | |
| nodes_expanded=plan_result.total_nodes_expanded, | |
| runtime_ms=metrics.runtime_ms, | |
| memory_kb=max(0, metrics.memory_kb), | |
| cpu_percent=metrics.cpu_percent, | |
| is_optimal=False, # Will be set below | |
| ) | |
| ) | |
| # Mark optimal solutions | |
| for result in results: | |
| result.is_optimal = result.cost == optimal_cost | |
| return CompareResponse(comparisons=results, optimal_cost=optimal_cost) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |