warehouse_env / server /warehouse_environment.py
omaryashraf's picture
Upload folder using huggingface_hub
8fdd693 verified
"""
Core warehouse environment implementation.
This module implements the warehouse logistics optimization environment
with grid-based navigation, package pickup/delivery, and reward calculation.
"""
import random
import uuid
from dataclasses import asdict
from typing import Any, Dict, List, Optional, Tuple
from core.client_types import StepResult
from core.env_server import Environment
from envs.warehouse_env.models import (
Package,
WarehouseAction,
WarehouseObservation,
WarehouseState,
)
# Cell types
EMPTY = 0
WALL = 1
SHELF = 2
PICKUP_ZONE = 3
DROPOFF_ZONE = 4
# Difficulty configurations
DIFFICULTY_CONFIGS = {
1: {"grid_size": (5, 5), "num_packages": 1, "num_obstacles": 0, "max_steps": 50},
2: {"grid_size": (8, 8), "num_packages": 2, "num_obstacles": 3, "max_steps": 100},
3: {"grid_size": (10, 10), "num_packages": 3, "num_obstacles": 8, "max_steps": 150},
4: {
"grid_size": (15, 15),
"num_packages": 5,
"num_obstacles": 20,
"max_steps": 250,
},
5: {
"grid_size": (20, 20),
"num_packages": 8,
"num_obstacles": 40,
"max_steps": 400,
},
}
class WarehouseEnvironment(Environment):
"""
Warehouse optimization environment.
A grid-based environment where a robot must navigate a warehouse,
pick up packages from pickup zones, and deliver them to dropoff zones
while avoiding obstacles.
"""
def __init__(
self,
difficulty_level: int = 2,
grid_width: Optional[int] = None,
grid_height: Optional[int] = None,
num_packages: Optional[int] = None,
max_steps: Optional[int] = None,
random_seed: Optional[int] = None,
):
"""
Initialize the warehouse environment.
Args:
difficulty_level: Preset difficulty (1-5)
grid_width: Custom grid width (overrides difficulty)
grid_height: Custom grid height (overrides difficulty)
num_packages: Custom package count (overrides difficulty)
max_steps: Custom step limit (overrides difficulty)
random_seed: Random seed for reproducibility
"""
super().__init__()
# Get config from difficulty or use custom values
config = DIFFICULTY_CONFIGS.get(difficulty_level, DIFFICULTY_CONFIGS[2])
self.difficulty_level = difficulty_level
self.grid_width = grid_width or config["grid_size"][0]
self.grid_height = grid_height or config["grid_size"][1]
self.num_packages = num_packages or config["num_packages"]
self.max_steps = max_steps or config["max_steps"]
self.num_obstacles = config["num_obstacles"]
if random_seed is not None:
random.seed(random_seed)
# Episode state
self.episode_id: str = ""
self.step_count: int = 0
self.grid: List[List[int]] = []
self.robot_position: Tuple[int, int] = (0, 0)
self.robot_carrying: Optional[int] = None
self.packages: List[Package] = []
self.packages_delivered: int = 0
self.cum_reward: float = 0.0
self.is_done: bool = False
# Pickup and dropoff zones
self.pickup_zones: List[Tuple[int, int]] = []
self.dropoff_zones: List[Tuple[int, int]] = []
def reset(self) -> WarehouseObservation:
"""Reset the environment for a new episode."""
self.episode_id = str(uuid.uuid4())
self.step_count = 0
self.packages_delivered = 0
self.cum_reward = 0.0
self.is_done = False
self.robot_carrying = None
# Generate warehouse layout
self._generate_warehouse()
# Place robot at start position (usually near center)
self.robot_position = (self.grid_width // 2, self.grid_height // 2)
# Generate packages
self._generate_packages()
observation = self._get_observation(
action_success=True,
message="Warehouse environment ready! Navigate to pickup zones to collect packages.",
)
return observation
def step(self, action: WarehouseAction) -> WarehouseObservation:
"""Execute an action and return the result."""
if self.is_done:
return self._get_observation(False, "Episode already finished")
self.step_count += 1
reward = 0.0
action_success = False
message = ""
# Track state before action
packages_delivered_before = self.packages_delivered
# Execute action
if action.action_id in [0, 1, 2, 3]: # Movement actions
action_success, message = self._move_robot(action.action_id)
reward = -0.1 # Small step penalty
if not action_success:
reward = -1.0 # Penalty for invalid move
elif action.action_id == 4: # PICK_UP
action_success, message = self._pickup_package()
if action_success:
reward = 10.0 # Reward for successful pickup
else:
reward = -1.0 # Penalty for invalid pickup
elif action.action_id == 5: # DROP_OFF
action_success, message = self._dropoff_package()
if action_success:
# Major reward for delivery
reward = 100.0
# Time bonus
time_bonus = (self.max_steps - self.step_count) * 0.1
reward += time_bonus
# Check if all packages delivered
if self.packages_delivered == self.num_packages:
reward += 200.0 # Completion bonus
self.is_done = True
message += " All packages delivered! Episode complete!"
else:
reward = -1.0
# Update package waiting times
for package in self.packages:
if package.status == "waiting":
package.time_waiting += 1
# Check timeout
if self.step_count >= self.max_steps:
self.is_done = True
message += " Maximum steps reached. Episode terminated."
self.cum_reward += reward
observation = self._get_observation(action_success, message)
# Set reward and done in observation (these are expected by Observation base class)
observation.reward = reward
observation.done = self.is_done
return observation
@property
def state(self) -> WarehouseState:
"""Get current episode state."""
return WarehouseState(
episode_id=self.episode_id,
step_count=self.step_count,
packages_delivered=self.packages_delivered,
total_packages=self.num_packages,
difficulty_level=self.difficulty_level,
grid_size=(self.grid_width, self.grid_height),
cum_reward=self.cum_reward,
is_done=self.is_done,
)
def _generate_warehouse(self):
"""Generate the warehouse grid layout."""
# Initialize empty grid
self.grid = [
[EMPTY for _ in range(self.grid_width)] for _ in range(self.grid_height)
]
# Add walls around perimeter
for x in range(self.grid_width):
self.grid[0][x] = WALL
self.grid[self.grid_height - 1][x] = WALL
for y in range(self.grid_height):
self.grid[y][0] = WALL
self.grid[y][self.grid_width - 1] = WALL
# Add random shelves/obstacles
obstacles_placed = 0
attempts = 0
while (
obstacles_placed < self.num_obstacles and attempts < self.num_obstacles * 10
):
x = random.randint(2, self.grid_width - 3)
y = random.randint(2, self.grid_height - 3)
# Don't place near center (robot start)
if abs(x - self.grid_width // 2) < 2 and abs(y - self.grid_height // 2) < 2:
attempts += 1
continue
if self.grid[y][x] == EMPTY:
self.grid[y][x] = SHELF
obstacles_placed += 1
attempts += 1
# Create pickup zones (top-left area)
self.pickup_zones = []
for _ in range(min(3, self.num_packages)):
x = random.randint(1, self.grid_width // 3)
y = random.randint(1, self.grid_height // 3)
if self.grid[y][x] == EMPTY:
self.grid[y][x] = PICKUP_ZONE
self.pickup_zones.append((x, y))
# Create dropoff zones (bottom-right area)
self.dropoff_zones = []
for _ in range(min(3, self.num_packages)):
x = random.randint(2 * self.grid_width // 3, self.grid_width - 2)
y = random.randint(2 * self.grid_height // 3, self.grid_height - 2)
if self.grid[y][x] == EMPTY:
self.grid[y][x] = DROPOFF_ZONE
self.dropoff_zones.append((x, y))
def _generate_packages(self):
"""Generate packages with random pickup/dropoff locations."""
self.packages = []
for i in range(self.num_packages):
pickup_loc = (
random.choice(self.pickup_zones) if self.pickup_zones else (1, 1)
)
dropoff_loc = (
random.choice(self.dropoff_zones)
if self.dropoff_zones
else (self.grid_width - 2, self.grid_height - 2)
)
package = Package(
id=i,
status="waiting",
pickup_location=pickup_loc,
dropoff_location=dropoff_loc,
priority=random.randint(1, 3),
time_waiting=0,
)
self.packages.append(package)
def _move_robot(self, direction: int) -> Tuple[bool, str]:
"""
Move robot in specified direction.
Args:
direction: 0=UP, 1=DOWN, 2=LEFT, 3=RIGHT
Returns:
(success, message)
"""
x, y = self.robot_position
if direction == 0: # UP
new_pos = (x, y - 1)
elif direction == 1: # DOWN
new_pos = (x, y + 1)
elif direction == 2: # LEFT
new_pos = (x - 1, y)
elif direction == 3: # RIGHT
new_pos = (x + 1, y)
else:
return False, "Invalid direction"
new_x, new_y = new_pos
# Check bounds
if (
new_x < 0
or new_x >= self.grid_width
or new_y < 0
or new_y >= self.grid_height
):
return False, "Cannot move outside warehouse bounds"
# Check collision with walls/shelves
if self.grid[new_y][new_x] in [WALL, SHELF]:
return False, "Cannot move into obstacle"
self.robot_position = new_pos
return True, f"Moved {WarehouseAction.ACTION_NAMES[direction]}"
def _pickup_package(self) -> Tuple[bool, str]:
"""Attempt to pick up a package."""
if self.robot_carrying is not None:
return False, "Robot already carrying a package"
# Check if at pickup zone
x, y = self.robot_position
if self.grid[y][x] != PICKUP_ZONE:
return False, "Not at a pickup zone"
# Find available package at this location
for package in self.packages:
if package.status == "waiting" and package.pickup_location == (x, y):
package.status = "picked"
self.robot_carrying = package.id
return True, f"Picked up package #{package.id}"
return False, "No packages available at this location"
def _dropoff_package(self) -> Tuple[bool, str]:
"""Attempt to drop off a package."""
if self.robot_carrying is None:
return False, "Not carrying any package"
# Check if at dropoff zone
x, y = self.robot_position
if self.grid[y][x] != DROPOFF_ZONE:
return False, "Not at a dropoff zone"
# Find the package being carried
package = next((p for p in self.packages if p.id == self.robot_carrying), None)
if package is None:
return False, "Package not found"
# Check if correct dropoff location
if package.dropoff_location == (x, y):
package.status = "delivered"
self.packages_delivered += 1
self.robot_carrying = None
return True, f"Successfully delivered package #{package.id}!"
else:
return False, f"Wrong dropoff zone for package #{package.id}"
def _get_observation(
self, action_success: bool, message: str
) -> WarehouseObservation:
"""Create observation object."""
packages_data = [
{
"id": p.id,
"status": p.status,
"pickup_location": p.pickup_location,
"dropoff_location": p.dropoff_location,
"priority": p.priority,
"time_waiting": p.time_waiting,
}
for p in self.packages
]
return WarehouseObservation(
grid=self.grid,
robot_position=self.robot_position,
robot_carrying=self.robot_carrying,
packages=packages_data,
step_count=self.step_count,
packages_delivered=self.packages_delivered,
total_packages=self.num_packages,
time_remaining=self.max_steps - self.step_count,
action_success=action_success,
message=message,
)
def render_ascii(self) -> str:
"""Render warehouse as ASCII art."""
symbols = {
EMPTY: ".",
WALL: "█",
SHELF: "#",
PICKUP_ZONE: "P",
DROPOFF_ZONE: "D",
}
lines = []
lines.append("=" * (self.grid_width * 2 + 1))
lines.append(
f"Step: {self.step_count}/{self.max_steps} | Delivered: {self.packages_delivered}/{self.num_packages} | Reward: {self.cum_reward:.1f}"
)
lines.append("=" * (self.grid_width * 2 + 1))
for y in range(self.grid_height):
row = ""
for x in range(self.grid_width):
if (x, y) == self.robot_position:
if self.robot_carrying is not None:
row += "R " # Robot carrying package
else:
row += "r " # Robot empty
else:
row += symbols[self.grid[y][x]] + " "
lines.append(row)
lines.append("=" * (self.grid_width * 2 + 1))
lines.append(f"Robot at {self.robot_position}, carrying: {self.robot_carrying}")
# Show package info
for package in self.packages:
status_icon = (
"✓"
if package.status == "delivered"
else ("↻" if package.status == "picked" else "○")
)
lines.append(
f"{status_icon} Package #{package.id}: {package.status} (P{package.pickup_location}→D{package.dropoff_location})"
)
lines.append("=" * (self.grid_width * 2 + 1))
lines.append(
"Legend: r/R=Robot(empty/carrying), P=Pickup, D=Dropoff, #=Shelf, █=Wall"
)
return "\n".join(lines)
def render_html(self) -> str:
"""Render warehouse as HTML with CSS styling for web interface."""
cell_size = 40 # pixels
colors = {
EMPTY: "#f0f0f0",
WALL: "#333333",
SHELF: "#8B4513",
PICKUP_ZONE: "#4CAF50",
DROPOFF_ZONE: "#2196F3",
}
html_parts = []
html_parts.append('<div style="font-family: Arial, sans-serif; padding: 20px;">')
html_parts.append(f'<h3>Warehouse Environment - Step {self.step_count}/{self.max_steps}</h3>')
html_parts.append(f'<p><strong>Delivered:</strong> {self.packages_delivered}/{self.num_packages} | ')
html_parts.append(f'<strong>Reward:</strong> {self.cum_reward:.1f}</p>')
# Grid visualization
html_parts.append(f'<div style="display: inline-block; border: 2px solid #333; margin: 10px 0;">')
html_parts.append(f'<svg width="{self.grid_width * cell_size}" height="{self.grid_height * cell_size}">')
# Draw cells
for y in range(self.grid_height):
for x in range(self.grid_width):
cell_type = self.grid[y][x]
color = colors.get(cell_type, "#ffffff")
# Draw cell
html_parts.append(
f'<rect x="{x * cell_size}" y="{y * cell_size}" '
f'width="{cell_size}" height="{cell_size}" '
f'fill="{color}" stroke="#666" stroke-width="1"/>'
)
# Add labels for special cells
text_color = "#fff" if cell_type in [WALL, SHELF] else "#333"
if cell_type == PICKUP_ZONE:
html_parts.append(
f'<text x="{x * cell_size + cell_size/2}" y="{y * cell_size + cell_size/2}" '
f'text-anchor="middle" dominant-baseline="middle" '
f'fill="{text_color}" font-size="16" font-weight="bold">P</text>'
)
elif cell_type == DROPOFF_ZONE:
html_parts.append(
f'<text x="{x * cell_size + cell_size/2}" y="{y * cell_size + cell_size/2}" '
f'text-anchor="middle" dominant-baseline="middle" '
f'fill="{text_color}" font-size="16" font-weight="bold">D</text>'
)
# Draw robot
robot_x, robot_y = self.robot_position
robot_color = "#FF5722" if self.robot_carrying is not None else "#FFC107"
robot_label = "R" if self.robot_carrying is not None else "r"
html_parts.append(
f'<circle cx="{robot_x * cell_size + cell_size/2}" cy="{robot_y * cell_size + cell_size/2}" '
f'r="{cell_size/3}" fill="{robot_color}" stroke="#000" stroke-width="2"/>'
)
html_parts.append(
f'<text x="{robot_x * cell_size + cell_size/2}" y="{robot_y * cell_size + cell_size/2}" '
f'text-anchor="middle" dominant-baseline="middle" '
f'fill="#fff" font-size="20" font-weight="bold">{robot_label}</text>'
)
html_parts.append('</svg>')
html_parts.append('</div>')
# Legend
html_parts.append('<div style="margin: 20px 0;">')
html_parts.append('<h4>Legend:</h4>')
html_parts.append('<div style="display: flex; gap: 20px; flex-wrap: wrap;">')
legend_items = [
("r", "#FFC107", "Robot (empty)"),
("R", "#FF5722", "Robot (carrying)"),
("P", colors[PICKUP_ZONE], "Pickup Zone"),
("D", colors[DROPOFF_ZONE], "Dropoff Zone"),
("#", colors[SHELF], "Shelf"),
("█", colors[WALL], "Wall"),
]
for symbol, color, label in legend_items:
html_parts.append(f'<div style="display: flex; align-items: center; gap: 5px;">')
html_parts.append(f'<div style="width: 30px; height: 30px; background: {color}; border: 1px solid #666; display: flex; align-items: center; justify-content: center; color: #fff; font-weight: bold;">{symbol}</div>')
html_parts.append(f'<span>{label}</span>')
html_parts.append('</div>')
html_parts.append('</div>')
html_parts.append('</div>')
# Package status
html_parts.append('<div style="margin: 20px 0;">')
html_parts.append('<h4>Packages:</h4>')
for package in self.packages:
status_color = {
"waiting": "#FFA726",
"picked": "#42A5F5",
"delivered": "#66BB6A"
}.get(package.status, "#999")
status_icon = {
"waiting": "○",
"picked": "↻",
"delivered": "✓"
}.get(package.status, "")
html_parts.append(
f'<div style="padding: 5px; margin: 5px 0; background: {status_color}22; border-left: 3px solid {status_color};">'
)
html_parts.append(
f'{status_icon} Package #{package.id}: <strong>{package.status}</strong> '
f'(Pickup: {package.pickup_location} → Dropoff: {package.dropoff_location})'
)
html_parts.append('</div>')
html_parts.append('</div>')
html_parts.append('</div>')
return "".join(html_parts)