Kacemath's picture
feat: update with latest changes
47bba68
"""API routes for the delivery search application."""
from fastapi import APIRouter, HTTPException
from typing import List
from ..models.requests import (
GridConfig,
SearchRequest,
PathRequest,
CompareRequest,
Position,
GenerateResponse,
SearchResponse,
PlanResponse,
ComparisonResult,
CompareResponse,
AlgorithmInfo,
AlgorithmsResponse,
GridData,
StoreData,
DestinationData,
TunnelData,
SegmentData,
)
from ..services import gen_grid, parse_full_state, measure_performance
from ..core import DeliverySearch, DeliveryPlanner
router = APIRouter()
# Algorithm metadata
ALGORITHMS = [
AlgorithmInfo(
code="BF",
name="Breadth-First Search",
description="Explores all nodes at current depth before moving deeper. Finds shortest path in terms of steps.",
),
AlgorithmInfo(
code="DF",
name="Depth-First Search",
description="Explores as far as possible along each branch. Memory efficient but may not find optimal path.",
),
AlgorithmInfo(
code="ID",
name="Iterative Deepening",
description="Combines BFS completeness with DFS space efficiency. Good for unknown depth goals.",
),
AlgorithmInfo(
code="UC",
name="Uniform Cost Search",
description="Expands lowest-cost node first. Always finds the optimal (minimum cost) solution.",
),
AlgorithmInfo(
code="GR1",
name="Greedy (Manhattan)",
description="Uses Manhattan distance heuristic. Fast but may not find optimal path.",
),
AlgorithmInfo(
code="GR2",
name="Greedy (Euclidean)",
description="Uses Euclidean distance heuristic. Fast but may not find optimal path.",
),
AlgorithmInfo(
code="AS1",
name="A* (Manhattan)",
description="A* with Manhattan distance. Optimal and complete with admissible heuristic.",
),
AlgorithmInfo(
code="AS2",
name="A* (Tunnel-Aware)",
description="A* considering tunnel shortcuts. More informed for grids with tunnels.",
),
]
@router.get("/api/health")
async def health_check():
"""Health check endpoint."""
return {"status": "ok"}
@router.get("/api/algorithms", response_model=AlgorithmsResponse)
async def list_algorithms():
"""List available search algorithms."""
return AlgorithmsResponse(algorithms=ALGORITHMS)
@router.post("/api/grid/generate", response_model=GenerateResponse)
async def generate_grid(config: GridConfig):
"""Generate a random grid configuration."""
try:
initial_state, traffic, state = gen_grid(
width=config.width,
height=config.height,
num_stores=config.num_stores,
num_destinations=config.num_destinations,
num_tunnels=config.num_tunnels,
obstacle_density=config.obstacle_density,
)
# Convert to GridData for frontend
parsed = GridData(
width=state.grid.width,
height=state.grid.height,
stores=[
StoreData(id=s.id, position=Position(x=s.position[0], y=s.position[1]))
for s in state.stores
],
destinations=[
DestinationData(
id=d.id, position=Position(x=d.position[0], y=d.position[1])
)
for d in state.destinations
],
tunnels=[
TunnelData(
entrance1=Position(x=t.entrance1[0], y=t.entrance1[1]),
entrance2=Position(x=t.entrance2[0], y=t.entrance2[1]),
cost=t.cost,
)
for t in state.tunnels
],
segments=[
SegmentData(
src=Position(x=seg.src[0], y=seg.src[1]),
dst=Position(x=seg.dst[0], y=seg.dst[1]),
traffic=seg.traffic,
)
for seg in state.grid.segments.values()
],
)
return GenerateResponse(
initial_state=initial_state, traffic=traffic, parsed=parsed
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/search/path", response_model=SearchResponse)
async def find_path(request: PathRequest):
"""Find path from start to goal using specified strategy."""
try:
from ..models.grid import Grid
from ..models.entities import Tunnel
# Build grid from request
grid = Grid(width=request.grid_width, height=request.grid_height)
for seg in request.segments:
grid.add_segment(
(seg.src.x, seg.src.y), (seg.dst.x, seg.dst.y), seg.traffic
)
# Build tunnels
tunnels = [
Tunnel(
entrance1=(t.entrance1.x, t.entrance1.y),
entrance2=(t.entrance2.x, t.entrance2.y),
)
for t in request.tunnels
]
# Run search with metrics
with measure_performance() as metrics:
result, steps = DeliverySearch.path(
grid,
(request.start.x, request.start.y),
(request.goal.x, request.goal.y),
tunnels,
request.strategy.value,
visualize=True,
)
metrics.sample()
return SearchResponse(
plan=result.plan,
cost=result.cost,
nodes_expanded=result.nodes_expanded,
runtime_ms=metrics.runtime_ms,
memory_kb=max(0, metrics.memory_kb),
cpu_percent=metrics.cpu_percent,
path=[Position(x=p[0], y=p[1]) for p in result.path],
steps=[s.to_dict() for s in steps] if steps else None,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/search/plan", response_model=PlanResponse)
async def create_plan(request: SearchRequest):
"""Create full delivery plan for all trucks and destinations."""
try:
# Parse state
state = parse_full_state(request.initial_state, request.traffic)
# Run planner with metrics
with measure_performance() as metrics:
plan_result, viz_data = DeliveryPlanner.plan_from_state(
state.grid,
state.stores,
state.destinations,
state.tunnels,
request.strategy.value,
request.visualize,
)
metrics.sample()
return PlanResponse(
output=plan_result.to_string(),
assignments=[a.to_dict() for a in plan_result.assignments],
total_cost=plan_result.total_cost,
total_nodes_expanded=plan_result.total_nodes_expanded,
runtime_ms=metrics.runtime_ms,
memory_kb=max(0, metrics.memory_kb),
cpu_percent=metrics.cpu_percent,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/api/search/compare", response_model=CompareResponse)
async def compare_algorithms(request: CompareRequest):
"""Run all algorithms on same problem and return comparison."""
try:
state = parse_full_state(request.initial_state, request.traffic)
results: List[ComparisonResult] = []
optimal_cost = float("inf")
# Run each algorithm
for algo_info in ALGORITHMS:
with measure_performance() as metrics:
plan_result, _ = DeliveryPlanner.plan_from_state(
state.grid,
state.stores,
state.destinations,
state.tunnels,
algo_info.code,
visualize=False,
)
metrics.sample()
# Track optimal cost (from UCS or A*)
if algo_info.code in ["UC", "AS1", "AS2"]:
optimal_cost = min(optimal_cost, plan_result.total_cost)
results.append(
ComparisonResult(
algorithm=algo_info.code,
name=algo_info.name,
plan=plan_result.to_string(),
cost=plan_result.total_cost,
nodes_expanded=plan_result.total_nodes_expanded,
runtime_ms=metrics.runtime_ms,
memory_kb=max(0, metrics.memory_kb),
cpu_percent=metrics.cpu_percent,
is_optimal=False, # Will be set below
)
)
# Mark optimal solutions
for result in results:
result.is_optimal = result.cost == optimal_cost
return CompareResponse(comparisons=results, optimal_cost=optimal_cost)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))