dpang commited on
Commit
2f2f4a9
·
verified ·
1 Parent(s): 8992764

Add server/tasks/track_linear_velocity.py

Browse files
server/tasks/track_linear_velocity.py ADDED
@@ -0,0 +1,84 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Copyright (c) Space Robotics Lab, SnT, University of Luxembourg, SpaceR
2
+ # RANS: arXiv:2310.07393
3
+
4
+ """
5
+ TrackLinearVelocity Task
6
+ ========================
7
+ The spacecraft must maintain a randomly sampled target linear velocity (vx_t, vy_t).
8
+
9
+ Observation (6 values):
10
+ [Δvx, Δvy, cos(θ), sin(θ), vx, vy]
11
+ where Δv = v_target − v_current.
12
+
13
+ Reward:
14
+ r = exp(-‖v_error‖² / (2·σ_v²))
15
+
16
+ Episode terminates when ‖v_error‖ < tolerance.
17
+ """
18
+
19
+ from __future__ import annotations
20
+
21
+ import math
22
+ from typing import Any, Dict, Tuple
23
+
24
+ import numpy as np
25
+
26
+ from .base import BaseTask
27
+
28
+
29
+ class TrackLinearVelocityTask(BaseTask):
30
+ """Track a target 2-D linear velocity in the world frame."""
31
+
32
+ _DEFAULTS: Dict[str, Any] = {
33
+ "tolerance": 0.05, # success threshold (m/s)
34
+ "reward_sigma": 0.50, # velocity reward width
35
+ "max_target_speed": 1.00, # maximum sampled target speed (m/s)
36
+ }
37
+
38
+ def __init__(self, config: Dict[str, Any] | None = None) -> None:
39
+ super().__init__(config)
40
+ cfg = {**self._DEFAULTS, **(config or {})}
41
+ self.tolerance: float = cfg["tolerance"]
42
+ self.reward_sigma: float = cfg["reward_sigma"]
43
+ self.max_target_speed: float = cfg["max_target_speed"]
44
+
45
+ self._target_vel = np.zeros(2, dtype=np.float64)
46
+
47
+ # ------------------------------------------------------------------
48
+ # BaseTask interface
49
+ # ------------------------------------------------------------------
50
+
51
+ def reset(self, spacecraft_state: np.ndarray) -> Dict[str, Any]:
52
+ speed = np.random.uniform(0.0, self.max_target_speed)
53
+ direction = np.random.uniform(0.0, 2.0 * math.pi)
54
+ self._target_vel = np.array(
55
+ [speed * math.cos(direction), speed * math.sin(direction)]
56
+ )
57
+ return {"target_linear_velocity": self._target_vel.tolist()}
58
+
59
+ def get_observation(self, spacecraft_state: np.ndarray) -> np.ndarray:
60
+ _, _, theta, vx, vy, _ = spacecraft_state
61
+ dvx = self._target_vel[0] - vx
62
+ dvy = self._target_vel[1] - vy
63
+ return np.array(
64
+ [dvx, dvy, math.cos(theta), math.sin(theta), vx, vy],
65
+ dtype=np.float32,
66
+ )
67
+
68
+ def compute_reward(
69
+ self, spacecraft_state: np.ndarray
70
+ ) -> Tuple[float, bool, Dict[str, Any]]:
71
+ vx, vy = spacecraft_state[3], spacecraft_state[4]
72
+ vel_error = float(np.linalg.norm(self._target_vel - np.array([vx, vy])))
73
+ reward = self._reward_exponential(vel_error, self.reward_sigma)
74
+ goal_reached = vel_error < self.tolerance
75
+ info = {
76
+ "velocity_error_ms": vel_error,
77
+ "goal_reached": goal_reached,
78
+ "target_linear_velocity": self._target_vel.tolist(),
79
+ }
80
+ return reward, goal_reached, info
81
+
82
+ @property
83
+ def num_observations(self) -> int:
84
+ return 6