Spaces:
Sleeping
Sleeping
File size: 9,140 Bytes
bb89e8d f2c79b8 bb89e8d eff6f73 bb89e8d f2c79b8 bb89e8d 8fdd693 6e845f4 bb89e8d 8fdd693 eff6f73 9abdea0 eff6f73 9abdea0 eff6f73 9abdea0 bb89e8d eff6f73 bb89e8d f2c79b8 bb89e8d f2c79b8 bb89e8d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 | """
FastAPI server for Warehouse Optimization Environment.
This module creates the HTTP server that exposes the warehouse environment
via REST API endpoints.
"""
import os
from core.env_server import create_app
from envs.warehouse_env.models import WarehouseAction, WarehouseObservation
from envs.warehouse_env.server.warehouse_environment import WarehouseEnvironment
from fastapi import FastAPI
from fastapi.responses import JSONResponse, HTMLResponse, FileResponse
# Get configuration from environment variables
DIFFICULTY_LEVEL = int(os.getenv("DIFFICULTY_LEVEL", "2"))
GRID_WIDTH = int(os.getenv("GRID_WIDTH", "0")) or None
GRID_HEIGHT = int(os.getenv("GRID_HEIGHT", "0")) or None
NUM_PACKAGES = int(os.getenv("NUM_PACKAGES", "0")) or None
MAX_STEPS = int(os.getenv("MAX_STEPS", "0")) or None
RANDOM_SEED = int(os.getenv("RANDOM_SEED", "0")) or None
# Create the warehouse environment instance
warehouse_env = WarehouseEnvironment(
difficulty_level=DIFFICULTY_LEVEL,
grid_width=GRID_WIDTH,
grid_height=GRID_HEIGHT,
num_packages=NUM_PACKAGES,
max_steps=MAX_STEPS,
random_seed=RANDOM_SEED,
)
# Create FastAPI app using OpenEnv's helper (with web interface if enabled)
app = create_app(warehouse_env, WarehouseAction, WarehouseObservation, env_name="warehouse_env")
# Add custom render endpoints
@app.post("/set-difficulty")
async def set_difficulty(request: dict):
"""Change the difficulty level and reset the environment."""
try:
difficulty = int(request.get("difficulty", 2))
if difficulty < 1 or difficulty > 5:
return JSONResponse(
status_code=400,
content={"error": "Difficulty must be between 1 and 5"}
)
# Recreate the warehouse environment with new difficulty
global warehouse_env
warehouse_env = WarehouseEnvironment(
difficulty_level=difficulty,
grid_width=None,
grid_height=None,
num_packages=None,
max_steps=None,
random_seed=None,
)
# Reset the environment
observation = warehouse_env.reset()
return JSONResponse(content={
"success": True,
"difficulty": difficulty,
"grid_size": (warehouse_env.grid_width, warehouse_env.grid_height),
"num_packages": warehouse_env.num_packages,
"max_steps": warehouse_env.max_steps,
"observation": {
"step_count": observation.step_count,
"packages_delivered": observation.packages_delivered,
"total_packages": observation.total_packages,
"robot_position": observation.robot_position,
}
})
except Exception as e:
return JSONResponse(
status_code=500,
content={"error": f"Failed to set difficulty: {str(e)}"}
)
@app.get("/render")
async def render():
"""Get ASCII visualization of warehouse state."""
try:
ascii_art = warehouse_env.render_ascii()
return JSONResponse(content={"ascii": ascii_art})
except Exception as e:
return JSONResponse(
status_code=500, content={"error": f"Failed to render: {str(e)}"}
)
@app.get("/render/html")
async def render_html():
"""Get HTML visualization of warehouse state."""
try:
html_content = warehouse_env.render_html()
return HTMLResponse(content=html_content)
except Exception as e:
return JSONResponse(
status_code=500, content={"error": f"Failed to render HTML: {str(e)}"}
)
@app.post("/auto-step")
async def auto_step():
"""Execute one step using a greedy agent."""
try:
# Get current observation
if warehouse_env.is_done:
return JSONResponse(content={
"done": True,
"message": "Episode finished. Reset to start a new episode."
})
# Simple greedy policy
action_id = _get_greedy_action()
action = WarehouseAction(action_id=action_id)
# Execute step
result = warehouse_env.step(action)
return JSONResponse(content={
"action": action.action_name,
"message": result.message,
"reward": result.reward,
"done": result.done,
"step_count": result.step_count,
"packages_delivered": result.packages_delivered,
"robot_position": result.robot_position,
})
except Exception as e:
return JSONResponse(
status_code=500, content={"error": f"Failed to execute auto-step: {str(e)}"}
)
def _get_greedy_action() -> int:
"""Simple greedy policy with obstacle avoidance."""
robot_x, robot_y = warehouse_env.robot_position
# Determine target location
if warehouse_env.robot_carrying is None:
# Not carrying: move toward nearest waiting package
target = None
min_dist = float('inf')
for package in warehouse_env.packages:
if package.status == "waiting":
px, py = package.pickup_location
dist = abs(robot_x - px) + abs(robot_y - py)
if dist < min_dist:
min_dist = dist
target = (px, py)
if target is None:
return 4 # Try to pick up if at location
target_x, target_y = target
else:
# Carrying: move toward dropoff zone
package = next((p for p in warehouse_env.packages if p.id == warehouse_env.robot_carrying), None)
if package:
target_x, target_y = package.dropoff_location
else:
return 5 # Try to drop off
# Check if at target location
if robot_x == target_x and robot_y == target_y:
return 4 if warehouse_env.robot_carrying is None else 5
# Try to move toward target, checking for obstacles
# Priority: move on axis with larger distance first
dx = target_x - robot_x
dy = target_y - robot_y
# List of possible moves in order of preference
moves = []
if abs(dx) > abs(dy):
# Prioritize horizontal movement
if dx > 0:
moves.append((3, robot_x + 1, robot_y)) # RIGHT
elif dx < 0:
moves.append((2, robot_x - 1, robot_y)) # LEFT
if dy > 0:
moves.append((1, robot_x, robot_y + 1)) # DOWN
elif dy < 0:
moves.append((0, robot_x, robot_y - 1)) # UP
else:
# Prioritize vertical movement
if dy > 0:
moves.append((1, robot_x, robot_y + 1)) # DOWN
elif dy < 0:
moves.append((0, robot_x, robot_y - 1)) # UP
if dx > 0:
moves.append((3, robot_x + 1, robot_y)) # RIGHT
elif dx < 0:
moves.append((2, robot_x - 1, robot_y)) # LEFT
# Add perpendicular moves as fallback
if dx == 0 and dy != 0:
moves.append((3, robot_x + 1, robot_y)) # RIGHT
moves.append((2, robot_x - 1, robot_y)) # LEFT
elif dy == 0 and dx != 0:
moves.append((1, robot_x, robot_y + 1)) # DOWN
moves.append((0, robot_x, robot_y - 1)) # UP
# Try moves in order until we find a valid one
WALL = 1
SHELF = 2
for action_id, new_x, new_y in moves:
# Check bounds
if 0 <= new_x < warehouse_env.grid_width and 0 <= new_y < warehouse_env.grid_height:
# Check if cell is passable
if warehouse_env.grid[new_y][new_x] not in [WALL, SHELF]:
return action_id
# If no valid move toward target, try any valid move
for action_id, dx, dy in [(0, 0, -1), (1, 0, 1), (2, -1, 0), (3, 1, 0)]:
new_x, new_y = robot_x + dx, robot_y + dy
if 0 <= new_x < warehouse_env.grid_width and 0 <= new_y < warehouse_env.grid_height:
if warehouse_env.grid[new_y][new_x] not in [WALL, SHELF]:
return action_id
# Last resort: try pickup/dropoff
return 4 if warehouse_env.robot_carrying is None else 5
# Add health check endpoint
@app.get("/health")
async def health():
"""Health check endpoint."""
return {
"status": "healthy",
"environment": "warehouse_env",
"difficulty_level": DIFFICULTY_LEVEL,
"grid_size": (warehouse_env.grid_width, warehouse_env.grid_height),
"num_packages": warehouse_env.num_packages,
"max_steps": warehouse_env.max_steps,
}
@app.get("/demo")
async def demo():
"""Serve the interactive demo page."""
import pathlib
demo_path = pathlib.Path(__file__).parent / "demo.html"
if demo_path.exists():
return FileResponse(demo_path)
else:
return HTMLResponse(content="<h1>Demo page not found</h1><p>Please check the server configuration.</p>")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
def main():
"""Entry point for warehouse-server command."""
import uvicorn
import os
port = int(os.getenv("PORT", "8000"))
host = os.getenv("HOST", "0.0.0.0")
uvicorn.run(app, host=host, port=port)
|