File size: 10,661 Bytes
7078c69
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5e3ab5d
 
 
 
 
 
 
 
 
7078c69
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5e3ab5d
 
 
 
 
 
 
 
 
 
 
 
 
 
7078c69
 
5e3ab5d
7078c69
 
5e3ab5d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7078c69
5e3ab5d
 
 
7078c69
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# Copyright (c) Space Robotics Lab, SnT, University of Luxembourg, SpaceR
# RANS: arXiv:2310.07393 β€” OpenEnv-compatible implementation

"""
RANSEnvironment
===============
OpenEnv ``Environment`` subclass that wraps the 2-D spacecraft simulator and
the RANS task suite.

Supported tasks (set via RANS_TASK env-var or constructor argument):
  β€’ GoToPosition            β€” reach a target (x, y)
  β€’ GoToPose                β€” reach a target (x, y, ΞΈ)
  β€’ TrackLinearVelocity     β€” maintain (vx_t, vy_t)
  β€’ TrackLinearAngularVelocity β€” maintain (vx_t, vy_t, Ο‰_t)

The environment follows the RANS paper (arXiv:2310.07393) physics and reward
formulations, adapted to run in CPU-only Docker containers without Isaac Gym.
"""

from __future__ import annotations

import math
import os
import uuid
from typing import Any, Dict, Optional

import numpy as np

try:
    from openenv.core.env_server.interfaces import Action, Environment, Observation
except ImportError:
    from pydantic import BaseModel as Action  # type: ignore[assignment]
    from pydantic import BaseModel as Environment  # type: ignore[assignment]
    from pydantic import BaseModel as Observation  # type: ignore[assignment]

try:
    # Installed package import
    from rans_env.models import SpacecraftAction, SpacecraftObservation, SpacecraftState
    from rans_env.server.spacecraft_physics import Spacecraft2D, SpacecraftConfig
    from rans_env.server.tasks import TASK_REGISTRY
except ImportError:
    # Development / test import (package not yet installed, RANS dir on sys.path)
    import sys, os as _os
    sys.path.insert(0, _os.path.dirname(_os.path.dirname(__file__)))
    from models import SpacecraftAction, SpacecraftObservation, SpacecraftState  # type: ignore[no-redef]
    from server.spacecraft_physics import Spacecraft2D, SpacecraftConfig  # type: ignore[no-redef]
    from server.tasks import TASK_REGISTRY  # type: ignore[no-redef]


class RANSEnvironment(Environment):
    """
    RANS spacecraft navigation environment for OpenEnv.

    References
    ----------
    El-Hariry, Richard, Olivares-Mendez (2023).
    "RANS: Highly-Parallelised Simulator for Reinforcement Learning based
    Autonomous Navigating Spacecrafts."  arXiv:2310.07393.
    """

    def __init__(
        self,
        task: str = "GoToPosition",
        spacecraft_config: Optional[SpacecraftConfig] = None,
        task_config: Optional[Dict[str, Any]] = None,
        max_episode_steps: int = 500,
        initial_pos_range: float = 2.0,
        initial_vel_range: float = 0.1,
    ) -> None:
        """
        Parameters
        ----------
        task:
            One of TASK_REGISTRY keys.  Overridden by RANS_TASK env-var.
        spacecraft_config:
            Physical platform configuration.  Uses 8-thruster MFP2D default.
        task_config:
            Dict of task hyper-parameters forwarded to the task constructor.
        max_episode_steps:
            Hard step limit per episode (overrides RANS_MAX_STEPS env-var).
        initial_pos_range:
            Half-width of the uniform distribution for random initial position.
        initial_vel_range:
            Half-width for random initial velocities.
        """
        # Allow env-var overrides (useful for Docker deployments)
        task = os.environ.get("RANS_TASK", task)
        max_episode_steps = int(
            os.environ.get("RANS_MAX_STEPS", str(max_episode_steps))
        )

        if task not in TASK_REGISTRY:
            raise ValueError(
                f"Unknown task '{task}'. "
                f"Available: {sorted(TASK_REGISTRY.keys())}"
            )

        self._task_name = task
        self._max_episode_steps = max_episode_steps
        self._initial_pos_range = initial_pos_range
        self._initial_vel_range = initial_vel_range

        # Physics simulator β€” RANS_NUM_THRUSTERS overrides spacecraft_config
        if spacecraft_config is None:
            n_env = os.environ.get("RANS_NUM_THRUSTERS")
            if n_env is not None:
                n = int(n_env)
                spacecraft_config = SpacecraftConfig.from_num_thrusters(n)
            else:
                spacecraft_config = SpacecraftConfig.default_8_thruster()
        self._spacecraft = Spacecraft2D(spacecraft_config)

        # Task
        self._task = TASK_REGISTRY[task](task_config or {})

        # Episode bookkeeping
        self._step_count: int = 0
        self._total_reward: float = 0.0
        self._ep_state = SpacecraftState(task=self._task_name)

    # ------------------------------------------------------------------
    # OpenEnv Environment interface
    # ------------------------------------------------------------------

    def reset(self) -> Observation:
        """Start a new episode with a randomised initial spacecraft state."""
        init_state = self._sample_initial_state()
        self._spacecraft.reset(init_state)

        task_info = self._task.reset(self._spacecraft.state)

        self._step_count = 0
        self._total_reward = 0.0
        self._ep_state = SpacecraftState(
            episode_id=str(uuid.uuid4()),
            step_count=0,
            task=self._task_name,
            **self._physical_state_dict(),
        )

        return self._make_observation(reward=0.0, done=False, info=task_info)

    def step(self, action: Action) -> Observation:
        """
        Advance the simulation by one step.

        Control mode is selected automatically based on which fields are set:

        1. **Thrusters** (``action.thrusters`` is not None):
           List of per-thruster activations ∈ [0, 1].
        2. **Force/torque** (``action.fx``, ``action.fy``, or ``action.torque``
           is not None):
           Direct world-frame force/torque β€” bypasses thruster geometry.
        3. **Velocity target** (``action.vx_target``, ``action.vy_target``, or
           ``action.omega_target`` is not None):
           PD controller drives the spacecraft toward the requested velocities.
        """
        if not hasattr(action, "thrusters"):
            raise ValueError(
                f"Expected SpacecraftAction, received {type(action).__name__}."
            )

        # ── Mode 1: thruster activations ─────────────────────────────────
        if action.thrusters is not None:
            activations = np.array(action.thrusters, dtype=np.float64)
            n = self._spacecraft.n_thrusters
            if len(activations) != n:
                padded = np.zeros(n, dtype=np.float64)
                padded[: min(len(activations), n)] = activations[:n]
                activations = padded
            self._spacecraft.step(activations)

        # ── Mode 2: direct force / torque ────────────────────────────────
        elif any(v is not None for v in [action.fx, action.fy, action.torque]):
            self._spacecraft.step_force_torque(
                fx_world=float(action.fx or 0.0),
                fy_world=float(action.fy or 0.0),
                torque=float(action.torque or 0.0),
            )

        # ── Mode 3: velocity target ───────────────────────────────────────
        elif any(
            v is not None
            for v in [action.vx_target, action.vy_target, action.omega_target]
        ):
            self._spacecraft.step_velocity_target(
                vx_target=float(action.vx_target or 0.0),
                vy_target=float(action.vy_target or 0.0),
                omega_target=float(action.omega_target or 0.0),
            )

        # ── No action β€” advance with zero forces ──────────────────────────
        else:
            self._spacecraft.step(np.zeros(self._spacecraft.n_thrusters))
        self._step_count += 1

        # Compute task reward
        reward, goal_reached, info = self._task.compute_reward(
            self._spacecraft.state
        )
        self._total_reward += reward

        # Determine episode termination
        done = goal_reached or (self._step_count >= self._max_episode_steps)

        # Rebuild persistent state (Pydantic models are immutable by default)
        self._ep_state = SpacecraftState(
            episode_id=self._ep_state.episode_id,
            step_count=self._step_count,
            task=self._task_name,
            total_reward=self._total_reward,
            goal_reached=goal_reached,
            **self._physical_state_dict(),
        )

        return self._make_observation(reward=reward, done=done, info=info)

    @property
    def state(self) -> SpacecraftState:
        return self._ep_state

    # ------------------------------------------------------------------
    # Helpers
    # ------------------------------------------------------------------

    def _sample_initial_state(self) -> np.ndarray:
        """Uniform random initial state (small velocities, random pose)."""
        r = self._initial_pos_range
        v = self._initial_vel_range
        return np.array(
            [
                np.random.uniform(-r, r),               # x
                np.random.uniform(-r, r),               # y
                np.random.uniform(-math.pi, math.pi),   # ΞΈ
                np.random.uniform(-v, v),               # vx
                np.random.uniform(-v, v),               # vy
                np.random.uniform(-v, v),               # Ο‰
            ],
            dtype=np.float64,
        )

    def _physical_state_dict(self) -> Dict[str, float]:
        s = self._spacecraft.state
        return {
            "x": float(s[0]),
            "y": float(s[1]),
            "heading_rad": float(s[2]),
            "vx": float(s[3]),
            "vy": float(s[4]),
            "angular_velocity_rads": float(s[5]),
        }

    def _make_observation(
        self, reward: float, done: bool, info: Dict[str, Any]
    ) -> SpacecraftObservation:
        task_obs = self._task.get_observation(self._spacecraft.state)
        return SpacecraftObservation(
            state_obs=task_obs.tolist(),
            thruster_transforms=self._spacecraft.get_thruster_transforms().tolist(),
            thruster_masks=self._spacecraft.get_thruster_masks().tolist(),
            mass=self._spacecraft.config.mass,
            inertia=self._spacecraft.config.inertia,
            task=self._task_name,
            reward=float(reward),
            done=bool(done),
            info={**info, "step": self._step_count},
        )