"""API routes for the delivery search application.""" from fastapi import APIRouter, HTTPException from typing import List from ..models.requests import ( GridConfig, SearchRequest, PathRequest, CompareRequest, Position, GenerateResponse, SearchResponse, PlanResponse, ComparisonResult, CompareResponse, AlgorithmInfo, AlgorithmsResponse, GridData, StoreData, DestinationData, TunnelData, SegmentData, ) from ..services import gen_grid, parse_full_state, measure_performance from ..core import DeliverySearch, DeliveryPlanner router = APIRouter() # Algorithm metadata ALGORITHMS = [ AlgorithmInfo( code="BF", name="Breadth-First Search", description="Explores all nodes at current depth before moving deeper. Finds shortest path in terms of steps.", ), AlgorithmInfo( code="DF", name="Depth-First Search", description="Explores as far as possible along each branch. Memory efficient but may not find optimal path.", ), AlgorithmInfo( code="ID", name="Iterative Deepening", description="Combines BFS completeness with DFS space efficiency. Good for unknown depth goals.", ), AlgorithmInfo( code="UC", name="Uniform Cost Search", description="Expands lowest-cost node first. Always finds the optimal (minimum cost) solution.", ), AlgorithmInfo( code="GR1", name="Greedy (Manhattan)", description="Uses Manhattan distance heuristic. Fast but may not find optimal path.", ), AlgorithmInfo( code="GR2", name="Greedy (Euclidean)", description="Uses Euclidean distance heuristic. Fast but may not find optimal path.", ), AlgorithmInfo( code="AS1", name="A* (Manhattan)", description="A* with Manhattan distance. Optimal and complete with admissible heuristic.", ), AlgorithmInfo( code="AS2", name="A* (Tunnel-Aware)", description="A* considering tunnel shortcuts. More informed for grids with tunnels.", ), ] @router.get("/api/health") async def health_check(): """Health check endpoint.""" return {"status": "ok"} @router.get("/api/algorithms", response_model=AlgorithmsResponse) async def list_algorithms(): """List available search algorithms.""" return AlgorithmsResponse(algorithms=ALGORITHMS) @router.post("/api/grid/generate", response_model=GenerateResponse) async def generate_grid(config: GridConfig): """Generate a random grid configuration.""" try: initial_state, traffic, state = gen_grid( width=config.width, height=config.height, num_stores=config.num_stores, num_destinations=config.num_destinations, num_tunnels=config.num_tunnels, obstacle_density=config.obstacle_density, ) # Convert to GridData for frontend parsed = GridData( width=state.grid.width, height=state.grid.height, stores=[ StoreData(id=s.id, position=Position(x=s.position[0], y=s.position[1])) for s in state.stores ], destinations=[ DestinationData( id=d.id, position=Position(x=d.position[0], y=d.position[1]) ) for d in state.destinations ], tunnels=[ TunnelData( entrance1=Position(x=t.entrance1[0], y=t.entrance1[1]), entrance2=Position(x=t.entrance2[0], y=t.entrance2[1]), cost=t.cost, ) for t in state.tunnels ], segments=[ SegmentData( src=Position(x=seg.src[0], y=seg.src[1]), dst=Position(x=seg.dst[0], y=seg.dst[1]), traffic=seg.traffic, ) for seg in state.grid.segments.values() ], ) return GenerateResponse( initial_state=initial_state, traffic=traffic, parsed=parsed ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/search/path", response_model=SearchResponse) async def find_path(request: PathRequest): """Find path from start to goal using specified strategy.""" try: from ..models.grid import Grid from ..models.entities import Tunnel # Build grid from request grid = Grid(width=request.grid_width, height=request.grid_height) for seg in request.segments: grid.add_segment( (seg.src.x, seg.src.y), (seg.dst.x, seg.dst.y), seg.traffic ) # Build tunnels tunnels = [ Tunnel( entrance1=(t.entrance1.x, t.entrance1.y), entrance2=(t.entrance2.x, t.entrance2.y), ) for t in request.tunnels ] # Run search with metrics with measure_performance() as metrics: result, steps = DeliverySearch.path( grid, (request.start.x, request.start.y), (request.goal.x, request.goal.y), tunnels, request.strategy.value, visualize=True, ) metrics.sample() return SearchResponse( plan=result.plan, cost=result.cost, nodes_expanded=result.nodes_expanded, runtime_ms=metrics.runtime_ms, memory_kb=max(0, metrics.memory_kb), cpu_percent=metrics.cpu_percent, path=[Position(x=p[0], y=p[1]) for p in result.path], steps=[s.to_dict() for s in steps] if steps else None, ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/search/plan", response_model=PlanResponse) async def create_plan(request: SearchRequest): """Create full delivery plan for all trucks and destinations.""" try: # Parse state state = parse_full_state(request.initial_state, request.traffic) # Run planner with metrics with measure_performance() as metrics: plan_result, viz_data = DeliveryPlanner.plan_from_state( state.grid, state.stores, state.destinations, state.tunnels, request.strategy.value, request.visualize, ) metrics.sample() return PlanResponse( output=plan_result.to_string(), assignments=[a.to_dict() for a in plan_result.assignments], total_cost=plan_result.total_cost, total_nodes_expanded=plan_result.total_nodes_expanded, runtime_ms=metrics.runtime_ms, memory_kb=max(0, metrics.memory_kb), cpu_percent=metrics.cpu_percent, ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.post("/api/search/compare", response_model=CompareResponse) async def compare_algorithms(request: CompareRequest): """Run all algorithms on same problem and return comparison.""" try: state = parse_full_state(request.initial_state, request.traffic) results: List[ComparisonResult] = [] optimal_cost = float("inf") # Run each algorithm for algo_info in ALGORITHMS: with measure_performance() as metrics: plan_result, _ = DeliveryPlanner.plan_from_state( state.grid, state.stores, state.destinations, state.tunnels, algo_info.code, visualize=False, ) metrics.sample() # Track optimal cost (from UCS or A*) if algo_info.code in ["UC", "AS1", "AS2"]: optimal_cost = min(optimal_cost, plan_result.total_cost) results.append( ComparisonResult( algorithm=algo_info.code, name=algo_info.name, plan=plan_result.to_string(), cost=plan_result.total_cost, nodes_expanded=plan_result.total_nodes_expanded, runtime_ms=metrics.runtime_ms, memory_kb=max(0, metrics.memory_kb), cpu_percent=metrics.cpu_percent, is_optimal=False, # Will be set below ) ) # Mark optimal solutions for result in results: result.is_optimal = result.cost == optimal_cost return CompareResponse(comparisons=results, optimal_cost=optimal_cost) except Exception as e: raise HTTPException(status_code=500, detail=str(e))