dpang commited on
Commit
9cb40fe
·
verified ·
1 Parent(s): 0adb9e2

Add server/tasks/base.py

Browse files
Files changed (1) hide show
  1. server/tasks/base.py +110 -0
server/tasks/base.py ADDED
@@ -0,0 +1,110 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Copyright (c) Space Robotics Lab, SnT, University of Luxembourg, SpaceR
2
+ # RANS: Reinforcement Learning based Autonomous Navigation for Spacecrafts
3
+ # arXiv:2310.07393 — El-Hariry, Richard, Olivares-Mendez
4
+ #
5
+ # OpenEnv-compatible implementation
6
+
7
+ """Base class for RANS spacecraft navigation tasks."""
8
+
9
+ from __future__ import annotations
10
+
11
+ import math
12
+ from abc import ABC, abstractmethod
13
+ from typing import Any, Dict, Tuple
14
+
15
+ import numpy as np
16
+
17
+
18
+ class BaseTask(ABC):
19
+ """
20
+ Abstract base class for all RANS spacecraft navigation tasks.
21
+
22
+ Subclasses define:
23
+ - The task-specific observation vector
24
+ - The reward function (matching the RANS paper's formulations)
25
+ - Target generation and episode reset logic
26
+ """
27
+
28
+ def __init__(self, config: Dict[str, Any] | None = None) -> None:
29
+ self.config: Dict[str, Any] = config or {}
30
+ self._target: Any = None
31
+
32
+ # ------------------------------------------------------------------
33
+ # Abstract interface
34
+ # ------------------------------------------------------------------
35
+
36
+ @abstractmethod
37
+ def reset(self, spacecraft_state: np.ndarray) -> Dict[str, Any]:
38
+ """
39
+ Sample a new target and reset internal episode state.
40
+
41
+ Args:
42
+ spacecraft_state: Current state vector [x, y, θ, vx, vy, ω].
43
+
44
+ Returns:
45
+ Dictionary with task metadata (target values, etc.).
46
+ """
47
+
48
+ @abstractmethod
49
+ def get_observation(self, spacecraft_state: np.ndarray) -> np.ndarray:
50
+ """
51
+ Compute the task-specific observation vector from the spacecraft state.
52
+
53
+ Args:
54
+ spacecraft_state: Current state [x, y, θ, vx, vy, ω].
55
+
56
+ Returns:
57
+ 1-D float32 array of length ``num_observations``.
58
+ """
59
+
60
+ @abstractmethod
61
+ def compute_reward(
62
+ self, spacecraft_state: np.ndarray
63
+ ) -> Tuple[float, bool, Dict[str, Any]]:
64
+ """
65
+ Compute reward, done flag, and diagnostic info.
66
+
67
+ Args:
68
+ spacecraft_state: Current state [x, y, θ, vx, vy, ω].
69
+
70
+ Returns:
71
+ (reward, done, info) tuple.
72
+ """
73
+
74
+ # ------------------------------------------------------------------
75
+ # Common helpers
76
+ # ------------------------------------------------------------------
77
+
78
+ @property
79
+ def num_observations(self) -> int:
80
+ """Size of the task-specific state observation vector."""
81
+ return 0
82
+
83
+ @property
84
+ def name(self) -> str:
85
+ return self.__class__.__name__
86
+
87
+ # ------------------------------------------------------------------
88
+ # Shared reward primitives (from RANS paper Sec. IV-B)
89
+ # ------------------------------------------------------------------
90
+
91
+ @staticmethod
92
+ def _reward_exponential(error: float, sigma: float) -> float:
93
+ """exp(-error² / (2·σ²)) — tight peak near zero."""
94
+ return math.exp(-(error**2) / max(2.0 * sigma**2, 1e-9))
95
+
96
+ @staticmethod
97
+ def _reward_inverse(error: float) -> float:
98
+ """1 / (1 + error) — smooth monotone decay."""
99
+ return 1.0 / (1.0 + error)
100
+
101
+ @staticmethod
102
+ def _wrap_angle(angle: float) -> float:
103
+ """Wrap angle to (−π, π]."""
104
+ return (angle + math.pi) % (2.0 * math.pi) - math.pi
105
+
106
+ @staticmethod
107
+ def _world_to_body(dx: float, dy: float, theta: float) -> Tuple[float, float]:
108
+ """Rotate world-frame displacement into body frame."""
109
+ c, s = math.cos(theta), math.sin(theta)
110
+ return c * dx + s * dy, -s * dx + c * dy