dpang commited on
Commit
8992764
·
verified ·
1 Parent(s): bd4ceb2

Add server/tasks/go_to_pose.py

Browse files
Files changed (1) hide show
  1. server/tasks/go_to_pose.py +105 -0
server/tasks/go_to_pose.py ADDED
@@ -0,0 +1,105 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Copyright (c) Space Robotics Lab, SnT, University of Luxembourg, SpaceR
2
+ # RANS: arXiv:2310.07393
3
+
4
+ """
5
+ GoToPose Task
6
+ =============
7
+ The spacecraft must reach a target position AND heading (x_t, y_t, θ_t).
8
+
9
+ Observation (7 values):
10
+ [Δx_body, Δy_body, cos(Δθ), sin(Δθ), vx, vy, ω]
11
+
12
+ Reward:
13
+ r = w_p · exp(-‖p_error‖² / (2·σ_p²))
14
+ + w_h · exp(-|heading_error|² / (2·σ_h²))
15
+
16
+ Episode terminates when ‖p_error‖ < tol_p AND |heading_error| < tol_h.
17
+ """
18
+
19
+ from __future__ import annotations
20
+
21
+ import math
22
+ from typing import Any, Dict, Tuple
23
+
24
+ import numpy as np
25
+
26
+ from .base import BaseTask
27
+
28
+
29
+ class GoToPoseTask(BaseTask):
30
+ """Navigate spacecraft to a target 2-D pose (position + heading)."""
31
+
32
+ _DEFAULTS: Dict[str, Any] = {
33
+ "tolerance_pos": 0.10, # position success threshold (m)
34
+ "tolerance_heading": 0.10, # heading success threshold (rad)
35
+ "reward_sigma_pos": 1.00, # position reward width
36
+ "reward_sigma_heading": 0.50, # heading reward width
37
+ "position_weight": 0.70, # w_p
38
+ "heading_weight": 0.30, # w_h
39
+ "spawn_min_radius": 0.50,
40
+ "spawn_max_radius": 3.00,
41
+ }
42
+
43
+ def __init__(self, config: Dict[str, Any] | None = None) -> None:
44
+ super().__init__(config)
45
+ cfg = {**self._DEFAULTS, **(config or {})}
46
+ self.tolerance_pos: float = cfg["tolerance_pos"]
47
+ self.tolerance_heading: float = cfg["tolerance_heading"]
48
+ self.reward_sigma_pos: float = cfg["reward_sigma_pos"]
49
+ self.reward_sigma_heading: float = cfg["reward_sigma_heading"]
50
+ self.position_weight: float = cfg["position_weight"]
51
+ self.heading_weight: float = cfg["heading_weight"]
52
+ self.spawn_min_radius: float = cfg["spawn_min_radius"]
53
+ self.spawn_max_radius: float = cfg["spawn_max_radius"]
54
+
55
+ self._target_pos = np.zeros(2, dtype=np.float64)
56
+ self._target_heading: float = 0.0
57
+
58
+ # ------------------------------------------------------------------
59
+ # BaseTask interface
60
+ # ------------------------------------------------------------------
61
+
62
+ def reset(self, spacecraft_state: np.ndarray) -> Dict[str, Any]:
63
+ r = np.random.uniform(self.spawn_min_radius, self.spawn_max_radius)
64
+ angle = np.random.uniform(0.0, 2.0 * math.pi)
65
+ self._target_pos = np.array([r * math.cos(angle), r * math.sin(angle)])
66
+ self._target_heading = np.random.uniform(-math.pi, math.pi)
67
+ return {
68
+ "target_position": self._target_pos.tolist(),
69
+ "target_heading_rad": self._target_heading,
70
+ }
71
+
72
+ def get_observation(self, spacecraft_state: np.ndarray) -> np.ndarray:
73
+ x, y, theta, vx, vy, omega = spacecraft_state
74
+ dx, dy = self._target_pos[0] - x, self._target_pos[1] - y
75
+ dx_b, dy_b = self._world_to_body(dx, dy, theta)
76
+ d_theta = self._wrap_angle(self._target_heading - theta)
77
+ return np.array(
78
+ [dx_b, dy_b, math.cos(d_theta), math.sin(d_theta), vx, vy, omega],
79
+ dtype=np.float32,
80
+ )
81
+
82
+ def compute_reward(
83
+ self, spacecraft_state: np.ndarray
84
+ ) -> Tuple[float, bool, Dict[str, Any]]:
85
+ x, y, theta = spacecraft_state[0], spacecraft_state[1], spacecraft_state[2]
86
+ pos_error = float(np.linalg.norm(self._target_pos - np.array([x, y])))
87
+ heading_error = abs(self._wrap_angle(self._target_heading - theta))
88
+
89
+ r_pos = self._reward_exponential(pos_error, self.reward_sigma_pos)
90
+ r_head = self._reward_exponential(heading_error, self.reward_sigma_heading)
91
+ reward = self.position_weight * r_pos + self.heading_weight * r_head
92
+
93
+ goal_reached = (
94
+ pos_error < self.tolerance_pos and heading_error < self.tolerance_heading
95
+ )
96
+ info = {
97
+ "position_error_m": pos_error,
98
+ "heading_error_rad": heading_error,
99
+ "goal_reached": goal_reached,
100
+ }
101
+ return reward, goal_reached, info
102
+
103
+ @property
104
+ def num_observations(self) -> int:
105
+ return 7