File size: 5,071 Bytes
bb89e8d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""
HTTP client for Warehouse Optimization Environment.

This module provides the client-side interface for interacting with
the warehouse environment server.
"""

from typing import Optional

import requests

from core.client_types import StepResult
from core.http_env_client import HTTPEnvClient
from envs.warehouse_env.models import (
    WarehouseAction,
    WarehouseObservation,
    WarehouseState,
)


class WarehouseEnv(HTTPEnvClient[WarehouseAction, WarehouseObservation]):
    """
    HTTP client for the Warehouse Optimization environment.

    This environment simulates a warehouse robot that must pick up and deliver
    packages while navigating a grid-based warehouse with obstacles.

    Example usage:
        ```python
        from envs.warehouse_env import WarehouseEnv, WarehouseAction

        # Start environment from Docker image
        env = WarehouseEnv.from_docker_image(
            "warehouse-env:latest",
            environment={"DIFFICULTY_LEVEL": "2"}
        )

        # Reset environment
        result = env.reset()
        print(f"Grid size: {len(result.observation.grid)}x{len(result.observation.grid[0])}")
        print(f"Packages: {result.observation.total_packages}")

        # Take actions
        for step in range(100):
            # Example: move randomly or pick/drop based on state
            if result.observation.robot_carrying is None:
                action = WarehouseAction(action_id=4)  # Try to pick up
            else:
                action = WarehouseAction(action_id=5)  # Try to drop off

            result = env.step(action)
            print(f"Step {step}: {result.observation.message}")

            if result.done:
                print(f"Episode finished! Delivered {result.observation.packages_delivered} packages")
                break

        env.close()
        ```

    Configuration (via environment variables):
        - DIFFICULTY_LEVEL: 1-5 (default: 2)
          1 = Simple (5x5, 1 package)
          2 = Easy (8x8, 2 packages)
          3 = Medium (10x10, 3 packages)
          4 = Hard (15x15, 5 packages)
          5 = Expert (20x20, 8 packages)

        - GRID_WIDTH: Custom grid width (overrides difficulty)
        - GRID_HEIGHT: Custom grid height (overrides difficulty)
        - NUM_PACKAGES: Custom number of packages (overrides difficulty)
        - MAX_STEPS: Maximum steps per episode (default: based on difficulty)
        - RANDOM_SEED: Random seed for reproducibility (default: None)
    """

    def __init__(self, base_url: str = "http://localhost:8000", **kwargs):
        """Initialize warehouse environment client."""
        super().__init__(base_url=base_url, **kwargs)
        self.base_url = base_url  # Store for render_ascii() method

    def _step_payload(self, action: WarehouseAction) -> dict:
        """Convert WarehouseAction to JSON payload for step request."""
        return {"action_id": action.action_id}

    def _parse_result(self, payload: dict) -> StepResult[WarehouseObservation]:
        """Parse server response into StepResult[WarehouseObservation]."""
        obs_data = payload.get("observation", {})

        observation = WarehouseObservation(
            grid=obs_data.get("grid", []),
            robot_position=tuple(obs_data.get("robot_position", [0, 0])),
            robot_carrying=obs_data.get("robot_carrying"),
            packages=obs_data.get("packages", []),
            step_count=obs_data.get("step_count", 0),
            packages_delivered=obs_data.get("packages_delivered", 0),
            total_packages=obs_data.get("total_packages", 0),
            time_remaining=obs_data.get("time_remaining", 0),
            action_success=obs_data.get("action_success", False),
            message=obs_data.get("message", ""),
            metadata=obs_data.get("metadata", {}),
        )

        return StepResult(
            observation=observation,
            reward=payload.get("reward", 0.0),
            done=payload.get("done", False),
        )

    def _parse_state(self, payload: dict) -> WarehouseState:
        """Parse server response into WarehouseState object."""
        return WarehouseState(
            episode_id=payload.get("episode_id", ""),
            step_count=payload.get("step_count", 0),
            packages_delivered=payload.get("packages_delivered", 0),
            total_packages=payload.get("total_packages", 0),
            difficulty_level=payload.get("difficulty_level", 2),
            grid_size=tuple(payload.get("grid_size", [0, 0])),
            cum_reward=payload.get("cum_reward", 0.0),
            is_done=payload.get("is_done", False),
        )

    def render_ascii(self) -> str:
        """
        Get ASCII visualization of the current warehouse state.

        Returns:
            String representation of the warehouse grid
        """
        try:
            response = requests.get(f"{self.base_url}/render")
            response.raise_for_status()
            return response.json()["ascii"]
        except Exception as e:
            return f"Error rendering: {str(e)}"