warehouse_env / README.md
omaryashraf's picture
Upload folder using huggingface_hub
eb59cdf verified
metadata
title: Warehouse Env Environment Server
emoji: 🏭
colorFrom: blue
colorTo: indigo
sdk: docker
pinned: false
app_port: 8000
base_path: /demo
tags:
  - openenv
  - reinforcement-learning
  - logistics
  - warehouse
  - robotics

Warehouse Optimization Environment

A grid-based warehouse logistics optimization environment for reinforcement learning. This environment simulates a warehouse robot that must navigate through obstacles, pick up packages from pickup zones, and deliver them to designated dropoff zones while optimizing for time and efficiency.

Overview

The Warehouse Environment is designed for training reinforcement learning agents on logistics and pathfinding tasks. It features:

  • Grid-based navigation with walls and obstacles
  • Package pickup and delivery mechanics
  • Multi-objective optimization (speed, deliveries, efficiency)
  • Scalable difficulty levels (1-5)
  • Dense reward signals for effective learning
  • ASCII visualization for debugging

Quick Start

Using Docker (Recommended)

# Build the Docker image (from OpenEnv root)
cd /path/to/OpenEnv
docker build -f src/envs/warehouse_env/server/Dockerfile -t warehouse-env:latest .

# Run with default settings (difficulty level 2)
docker run -p 8000:8000 warehouse-env:latest

# Run with custom difficulty
docker run -p 8000:8000 -e DIFFICULTY_LEVEL=3 warehouse-env:latest

Using Python Client

from envs.warehouse_env import WarehouseEnv, WarehouseAction

# Connect to server (or start from Docker)
env = WarehouseEnv.from_docker_image(
    "warehouse-env:latest",
    environment={"DIFFICULTY_LEVEL": "2"}
)

# Reset environment
result = env.reset()
print(f"Warehouse size: {len(result.observation.grid)}x{len(result.observation.grid[0])}")
print(f"Packages to deliver: {result.observation.total_packages}")

# Run episode
done = False
while not done:
    # Simple policy: move toward pickup if not carrying, else toward dropoff
    if result.observation.robot_carrying is None:
        action = WarehouseAction(action_id=4)  # Try to pick up
    else:
        action = WarehouseAction(action_id=5)  # Try to drop off

    result = env.step(action)
    print(f"Step {result.observation.step_count}: {result.observation.message}")
    print(f"Reward: {result.reward:.2f}")

    done = result.done

print(f"\nEpisode finished!")
print(f"Delivered: {result.observation.packages_delivered}/{result.observation.total_packages}")
print(f"Total reward: {env.state().cum_reward:.2f}")

env.close()

Environment Specification

State Space

The environment provides rich observations including:

  • Grid layout: 2D array with cell types (empty, wall, shelf, pickup zone, dropoff zone)
  • Robot state: Position, carrying status
  • Package information: Locations, status (waiting/picked/delivered), priorities
  • Episode metrics: Step count, deliveries, time remaining

Action Space

6 discrete actions:

Action ID Action Name Description
0 MOVE_UP Move robot one cell up
1 MOVE_DOWN Move robot one cell down
2 MOVE_LEFT Move robot one cell left
3 MOVE_RIGHT Move robot one cell right
4 PICK_UP Pick up package at current location
5 DROP_OFF Drop off package at current location

Reward Structure

Multi-component reward function:

  • +100: Successful package delivery
  • +10: Successful package pickup
  • +0.1 Γ— time_remaining: Time bonus for fast deliveries
  • +200: Completion bonus (all packages delivered)
  • -0.1: Small step penalty (encourages efficiency)
  • -1: Invalid action penalty

Episode Termination

Episodes end when:

  • All packages are delivered (success!)
  • Maximum steps reached (timeout)

Difficulty Levels

Level 1: Simple

  • Grid: 5Γ—5
  • Packages: 1
  • Obstacles: 0
  • Max steps: 50
  • Best for: Testing, debugging, quick validation

Level 2: Easy (Default)

  • Grid: 8Γ—8
  • Packages: 2
  • Obstacles: 3
  • Max steps: 100
  • Best for: Initial training, curriculum learning start

Level 3: Medium

  • Grid: 10Γ—10
  • Packages: 3
  • Obstacles: 8
  • Max steps: 150
  • Best for: Intermediate training, testing learned policies

Level 4: Hard

  • Grid: 15Γ—15
  • Packages: 5
  • Obstacles: 20
  • Max steps: 250
  • Best for: Advanced training, evaluation

Level 5: Expert

  • Grid: 20Γ—20
  • Packages: 8
  • Obstacles: 40
  • Max steps: 400
  • Best for: Final evaluation, research benchmarks

Configuration

Environment Variables

Configure the warehouse via environment variables:

# Difficulty level (1-5)
DIFFICULTY_LEVEL=2

# Custom grid size (overrides difficulty)
GRID_WIDTH=12
GRID_HEIGHT=12

# Custom package count (overrides difficulty)
NUM_PACKAGES=4

# Custom step limit (overrides difficulty)
MAX_STEPS=200

# Random seed for reproducibility
RANDOM_SEED=42

Docker Example

docker run -p 8000:8000 \
  -e DIFFICULTY_LEVEL=3 \
  -e RANDOM_SEED=42 \
  warehouse-env:latest

Python Client Example

env = WarehouseEnv.from_docker_image(
    "warehouse-env:latest",
    environment={
        "DIFFICULTY_LEVEL": "3",
        "GRID_WIDTH": "12",
        "GRID_HEIGHT": "12",
        "NUM_PACKAGES": "4",
        "MAX_STEPS": "200",
        "RANDOM_SEED": "42"
    }
)

Visualization

ASCII Rendering

Get a visual representation of the warehouse state:

# Get ASCII visualization
ascii_art = env.render_ascii()
print(ascii_art)

Example output: ```

Step: 15/100 | Delivered: 1/2 | Reward: 109.9

β–ˆ β–ˆ β–ˆ β–ˆ β–ˆ β–ˆ β–ˆ β–ˆ β–ˆ P . . . # . β–ˆ β–ˆ . # . . . . β–ˆ β–ˆ . . R . # . β–ˆ β–ˆ . # . . . . β–ˆ β–ˆ . . . . D . β–ˆ β–ˆ . . . . . . β–ˆ β–ˆ β–ˆ β–ˆ β–ˆ β–ˆ β–ˆ β–ˆ β–ˆ

Robot at (3, 3), carrying: 1 βœ“ Package #0: delivered (P(1,1)β†’D(5,5)) ↻ Package #1: picked (P(1,1)β†’D(5,5))

Legend: r/R=Robot(empty/carrying), P=Pickup, D=Dropoff, #=Shelf, β–ˆ=Wall


## Training Examples

### Random Agent

```python
import random
from envs.warehouse_env import WarehouseEnv, WarehouseAction

env = WarehouseEnv.from_docker_image("warehouse-env:latest")

for episode in range(100):
    result = env.reset()
    done = False

    while not done:
        # Random action
        action = WarehouseAction(action_id=random.randint(0, 5))
        result = env.step(action)
        done = result.done

    print(f"Episode {episode}: Delivered {result.observation.packages_delivered}")

env.close()

Greedy Agent (Move toward target)

from envs.warehouse_env import WarehouseEnv, WarehouseAction

def get_greedy_action(obs):
    """Simple greedy policy: move toward nearest target."""
    robot_x, robot_y = obs.robot_position

    # If not carrying, move toward nearest waiting package
    if obs.robot_carrying is None:
        for pkg in obs.packages:
            if pkg["status"] == "waiting":
                target_x, target_y = pkg["pickup_location"]
                break
        else:
            return 4  # Try to pick up if at location
    else:
        # Move toward dropoff zone
        pkg = next(p for p in obs.packages if p["id"] == obs.robot_carrying)
        target_x, target_y = pkg["dropoff_location"]

    # Simple pathfinding: move closer on one axis
    if robot_x < target_x:
        return 3  # RIGHT
    elif robot_x > target_x:
        return 2  # LEFT
    elif robot_y < target_y:
        return 1  # DOWN
    elif robot_y > target_y:
        return 0  # UP
    else:
        # At target location
        return 4 if obs.robot_carrying is None else 5

env = WarehouseEnv.from_docker_image("warehouse-env:latest")

for episode in range(10):
    result = env.reset()
    done = False

    while not done:
        action_id = get_greedy_action(result.observation)
        action = WarehouseAction(action_id=action_id)
        result = env.step(action)
        done = result.done

    state = env.state()
    print(f"Episode {episode}: {state.packages_delivered}/{state.total_packages} delivered, "
          f"reward: {state.cum_reward:.2f}")

env.close()

Integration with RL Libraries

Stable Baselines 3

import gymnasium as gym
import numpy as np
from stable_baselines3 import PPO
from envs.warehouse_env import WarehouseEnv, WarehouseAction

class WarehouseGymWrapper(gym.Env):
    """Gymnasium wrapper for Warehouse environment."""

    def __init__(self, base_url="http://localhost:8000"):
        super().__init__()
        self.env = WarehouseEnv(base_url=base_url)

        # Define spaces (simplified)
        self.action_space = gym.spaces.Discrete(6)

        # Observation: grid + robot state + package info
        # For simplicity, use flattened representation
        self.observation_space = gym.spaces.Box(
            low=0, high=255,
            shape=(200,),  # Adjust based on grid size
            dtype=np.float32
        )

    def reset(self, **kwargs):
        result = self.env.reset()
        obs = self._process_obs(result.observation)
        return obs, {}

    def step(self, action):
        result = self.env.step(WarehouseAction(action_id=int(action)))
        obs = self._process_obs(result.observation)
        return obs, result.reward, result.done, False, {}

    def _process_obs(self, observation):
        # Flatten grid and add robot/package info
        grid_flat = np.array(observation.grid).flatten()
        robot_pos = np.array(observation.robot_position)
        carrying = np.array([1 if observation.robot_carrying else 0])

        # Pad or truncate to fixed size
        obs = np.concatenate([
            grid_flat[:196],  # Grid (max 14x14)
            robot_pos,        # Robot position (2)
            carrying,         # Carrying status (1)
            [observation.packages_delivered]  # Progress (1)
        ])
        return obs.astype(np.float32)

    def close(self):
        self.env.close()

# Train with PPO
env = WarehouseGymWrapper()
model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=10000)
model.save("warehouse_ppo")

env.close()

API Reference

WarehouseAction

@dataclass
class WarehouseAction(Action):
    action_id: int  # 0-5

WarehouseObservation

@dataclass
class WarehouseObservation(Observation):
    grid: List[List[int]]              # Warehouse layout
    robot_position: tuple[int, int]    # Robot (x, y)
    robot_carrying: Optional[int]      # Package ID or None
    packages: List[Dict[str, Any]]     # Package states
    step_count: int                    # Current step
    packages_delivered: int            # Successful deliveries
    total_packages: int                # Total packages
    time_remaining: int                # Steps left
    action_success: bool               # Last action valid
    message: str                       # Status message

WarehouseState

@dataclass
class WarehouseState(State):
    episode_id: str                    # Unique episode ID
    step_count: int                    # Steps taken
    packages_delivered: int            # Deliveries
    total_packages: int                # Total packages
    difficulty_level: int              # Difficulty (1-5)
    grid_size: tuple[int, int]         # Grid dimensions
    cum_reward: float                  # Cumulative reward
    is_done: bool                      # Episode finished

Development

Local Setup (without Docker)

# Install dependencies
cd OpenEnv/src/envs/warehouse_env
pip install -r server/requirements.txt

# Run server
python -m uvicorn envs.warehouse_env.server.app:app --host 0.0.0.0 --port 8000

Running Tests

# Run basic test
python examples/warehouse_simple.py

Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  RL Training Framework (Client)     β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ Agent Policy (PPO/DQN/etc)   β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β”‚             β”‚                        β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚ WarehouseEnv (HTTPEnvClient) β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
              β”‚ HTTP/JSON
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Docker Container                   β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚ FastAPI Server              β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚ WarehouseEnvironment        β”‚    β”‚
β”‚  β”‚ - Grid generation           β”‚    β”‚
β”‚  β”‚ - Collision detection       β”‚    β”‚
β”‚  β”‚ - Reward calculation        β”‚    β”‚
β”‚  β”‚ - Package management        β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Real-World Applications

This environment simulates real warehouse optimization problems:

  • Amazon fulfillment centers: Robot pathfinding and package routing
  • Manufacturing warehouses: Material handling optimization
  • Distribution centers: Inventory management and delivery sequencing
  • Automated storage: Efficient retrieval systems

Research & Benchmarking

The warehouse environment is suitable for research on:

  • Pathfinding algorithms: A*, Dijkstra, learned policies
  • Multi-objective RL: Balancing speed, safety, and coverage
  • Curriculum learning: Progressive difficulty scaling
  • Transfer learning: Generalization across warehouse layouts
  • Hierarchical RL: High-level planning + low-level control

Contributing

We welcome contributions! Areas for enhancement:

  • Multi-robot coordination: Multiple robots working together
  • Dynamic obstacles: Moving shelves or other robots
  • Battery management: Energy constraints and charging stations
  • Priority queuing: Handling different package urgencies
  • 3D visualization: Enhanced rendering

License

BSD 3-Clause License (see LICENSE file)

Citation

If you use this environment in your research, please cite:

@software{warehouse_env_openenv,
  title = {Warehouse Optimization Environment for OpenEnv},
  author = {OpenEnv Contributors},
  year = {2024},
  url = {https://github.com/meta-pytorch/OpenEnv}
}

References