|
|
""" |
|
|
Visual Training Mode with Interactive HOLD Control |
|
|
=================================================== |
|
|
|
|
|
Watch DreamerV3 train in real-time with the Panda3D visualizer. |
|
|
|
|
|
INTERACTIVE CONTROLS: |
|
|
--------------------- |
|
|
SPACE - Pause/Resume (Full HOLD) |
|
|
F - Freeze AI only (environment continues) |
|
|
. - Step one frame |
|
|
|
|
|
Speed: |
|
|
+/- - Speed up/down |
|
|
|
|
|
SHOWCASE MODES (demonstrate capabilities): |
|
|
1 - Systems Check (test all actuators) |
|
|
2 - Defensive Sphere (max coverage) |
|
|
3 - Interception Drill (threat pursuit) |
|
|
4 - Max Extension (cable limits) |
|
|
5 - Corkscrew Spin (propulsion) |
|
|
6 - Formation Parade (tactical patterns) |
|
|
7 - Throttle Test (0 to max) |
|
|
8 - Evasive Demo (agility) |
|
|
9 - Sacrifice Drill (TAB expendability) |
|
|
0 - SLINGSHOT / NUNCHAKU mode (Bruce Lee bola swing!) |
|
|
|
|
|
CAMERA MODES: |
|
|
V - Chase cam (behind Buzzard) |
|
|
B - Orbit cam (circles around) |
|
|
N - Tactical (top-down) |
|
|
M - Dynamic (automatic transitions) |
|
|
, - Cockpit (first person) |
|
|
C - Cycle through modes |
|
|
|
|
|
ESC - Stop showcase, return to training |
|
|
R - Reset episode |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
import time |
|
|
import threading |
|
|
from typing import Dict, Optional, List |
|
|
from collections import deque |
|
|
|
|
|
import sys |
|
|
import os |
|
|
|
|
|
_parent = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
|
|
_root = os.path.dirname(_parent) |
|
|
if _parent not in sys.path: |
|
|
sys.path.insert(0, _parent) |
|
|
if _root not in sys.path: |
|
|
sys.path.insert(0, _root) |
|
|
|
|
|
try: |
|
|
from src.training.exploration_env import ExplorationKAPSEnv |
|
|
from src.training.threat_environment import Threat, ThreatType |
|
|
from src.training.hold_control import ( |
|
|
KAPSHoldController, HoldMode, ShowcaseMode, HOLD_KEYBINDS |
|
|
) |
|
|
from src.visualization.cinematic_camera import ( |
|
|
CinematicCamera, CameraMode, CAMERA_KEYBINDS |
|
|
) |
|
|
from src.visualization.simple_geometry import ( |
|
|
create_visible_buzzard, create_visible_airfoil, |
|
|
create_visible_cable, create_visible_bola, create_visible_grid_fin, |
|
|
VISIBLE_TAB_COLORS |
|
|
) |
|
|
|
|
|
create_cable_geometry = create_visible_cable |
|
|
create_airfoil_geometry = create_visible_airfoil |
|
|
create_buzzard_geometry = create_visible_buzzard |
|
|
TAB_COLORS = VISIBLE_TAB_COLORS |
|
|
|
|
|
from src.physics.slingshot_dynamics import ( |
|
|
SlingshotController, SlingshotState, AirfoiledBuzzard, |
|
|
DeployableGridFin, AirfoilDeployState |
|
|
) |
|
|
except ImportError: |
|
|
from training.exploration_env import ExplorationKAPSEnv |
|
|
from training.threat_environment import Threat, ThreatType |
|
|
from training.hold_control import ( |
|
|
KAPSHoldController, HoldMode, ShowcaseMode, HOLD_KEYBINDS |
|
|
) |
|
|
try: |
|
|
from visualization.cinematic_camera import ( |
|
|
CinematicCamera, CameraMode, CAMERA_KEYBINDS |
|
|
) |
|
|
from visualization.simple_geometry import ( |
|
|
create_visible_buzzard, create_visible_airfoil, |
|
|
create_visible_cable, create_visible_bola, create_visible_grid_fin, |
|
|
VISIBLE_TAB_COLORS |
|
|
) |
|
|
create_cable_geometry = create_visible_cable |
|
|
create_airfoil_geometry = create_visible_airfoil |
|
|
create_buzzard_geometry = create_visible_buzzard |
|
|
TAB_COLORS = VISIBLE_TAB_COLORS |
|
|
except ImportError: |
|
|
|
|
|
CinematicCamera = None |
|
|
CameraMode = None |
|
|
create_cable_geometry = None |
|
|
create_buzzard_geometry = None |
|
|
create_airfoil_geometry = None |
|
|
TAB_COLORS = None |
|
|
|
|
|
try: |
|
|
from physics.slingshot_dynamics import ( |
|
|
SlingshotController, SlingshotState, AirfoiledBuzzard, |
|
|
DeployableGridFin, AirfoilDeployState |
|
|
) |
|
|
except ImportError: |
|
|
SlingshotController = None |
|
|
SlingshotState = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
from direct.showbase.ShowBase import ShowBase |
|
|
from panda3d.core import ( |
|
|
Vec3, Vec4, Point3, |
|
|
LineSegs, NodePath, GeomNode, |
|
|
AmbientLight, DirectionalLight, PointLight, |
|
|
TextNode, ClockObject, |
|
|
GeomVertexFormat, GeomVertexData, GeomVertexWriter, |
|
|
Geom, GeomTriangles, Fog |
|
|
) |
|
|
from direct.task import Task |
|
|
PANDA3D_AVAILABLE = True |
|
|
globalClock = ClockObject.getGlobalClock() |
|
|
except ImportError: |
|
|
PANDA3D_AVAILABLE = False |
|
|
print("Panda3D not available for visual training") |
|
|
|
|
|
|
|
|
def create_sphere_geom(radius: float = 1.0, color: tuple = (1, 1, 1, 1), segments: int = 12) -> GeomNode: |
|
|
"""Create sphere geometry""" |
|
|
format = GeomVertexFormat.getV3n3c4() |
|
|
vdata = GeomVertexData("sphere", format, Geom.UHStatic) |
|
|
|
|
|
vertex = GeomVertexWriter(vdata, "vertex") |
|
|
normal = GeomVertexWriter(vdata, "normal") |
|
|
col = GeomVertexWriter(vdata, "color") |
|
|
|
|
|
for i in range(segments + 1): |
|
|
lat = np.pi * (-0.5 + float(i) / segments) |
|
|
for j in range(segments + 1): |
|
|
lon = 2 * np.pi * float(j) / segments |
|
|
|
|
|
x = radius * np.cos(lat) * np.cos(lon) |
|
|
y = radius * np.cos(lat) * np.sin(lon) |
|
|
z = radius * np.sin(lat) |
|
|
|
|
|
vertex.addData3f(x, y, z) |
|
|
normal.addData3f(x/radius, y/radius, z/radius) |
|
|
col.addData4f(*color) |
|
|
|
|
|
prim = GeomTriangles(Geom.UHStatic) |
|
|
for i in range(segments): |
|
|
for j in range(segments): |
|
|
v0 = i * (segments + 1) + j |
|
|
v1 = v0 + 1 |
|
|
v2 = v0 + segments + 1 |
|
|
v3 = v2 + 1 |
|
|
prim.addVertices(v0, v2, v1) |
|
|
prim.addVertices(v1, v2, v3) |
|
|
|
|
|
geom = Geom(vdata) |
|
|
geom.addPrimitive(prim) |
|
|
node = GeomNode("sphere") |
|
|
node.addGeom(geom) |
|
|
return node |
|
|
|
|
|
|
|
|
def create_airfoil_geom(wingspan: float = 1.5, chord: float = 0.3, color: tuple = (1, 1, 1, 1)) -> GeomNode: |
|
|
""" |
|
|
Create an airfoil/wing geometry for TABs. |
|
|
|
|
|
TABs are AIRFOILS - flying wings with visible thickness! |
|
|
""" |
|
|
format = GeomVertexFormat.getV3n3c4() |
|
|
vdata = GeomVertexData("airfoil", format, Geom.UHStatic) |
|
|
|
|
|
vertex = GeomVertexWriter(vdata, "vertex") |
|
|
normal = GeomVertexWriter(vdata, "normal") |
|
|
col = GeomVertexWriter(vdata, "color") |
|
|
|
|
|
|
|
|
half_span = wingspan / 2 |
|
|
thickness = 0.15 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
verts = [ |
|
|
|
|
|
(0, half_span, 0), |
|
|
(-half_span, -half_span * 0.5, thickness), |
|
|
(half_span, -half_span * 0.5, thickness), |
|
|
|
|
|
(-half_span, -half_span * 0.5, -thickness), |
|
|
(half_span, -half_span * 0.5, -thickness), |
|
|
(0, -half_span * 0.3, 0), |
|
|
] |
|
|
|
|
|
normals = [ |
|
|
(0, 1, 0), |
|
|
(0, 0, 1), |
|
|
(0, 0, 1), |
|
|
(0, 0, -1), |
|
|
(0, 0, -1), |
|
|
(0, -1, 0), |
|
|
] |
|
|
|
|
|
for i, v in enumerate(verts): |
|
|
vertex.addData3f(*v) |
|
|
normal.addData3f(*normals[i]) |
|
|
col.addData4f(*color) |
|
|
|
|
|
prim = GeomTriangles(Geom.UHStatic) |
|
|
|
|
|
prim.addVertices(0, 1, 2) |
|
|
prim.addVertices(1, 5, 2) |
|
|
|
|
|
prim.addVertices(0, 4, 3) |
|
|
prim.addVertices(3, 4, 5) |
|
|
|
|
|
prim.addVertices(1, 3, 5) |
|
|
prim.addVertices(2, 5, 4) |
|
|
prim.addVertices(0, 3, 1) |
|
|
prim.addVertices(0, 2, 4) |
|
|
|
|
|
geom = Geom(vdata) |
|
|
geom.addPrimitive(prim) |
|
|
node = GeomNode("airfoil") |
|
|
node.addGeom(geom) |
|
|
return node |
|
|
|
|
|
|
|
|
if PANDA3D_AVAILABLE: |
|
|
|
|
|
class VisualTrainer(ShowBase): |
|
|
""" |
|
|
Panda3D-based visual training interface with HOLD control. |
|
|
|
|
|
Features: |
|
|
- Cinematic camera system with multiple perspectives |
|
|
- Enhanced 3D geometry for cables and airfoils |
|
|
- Interactive HOLD control for pause/step/speed |
|
|
- Showcase modes for capability demonstration |
|
|
""" |
|
|
|
|
|
def __init__(self, |
|
|
dreamer_agent=None, |
|
|
episode_steps: int = 3000, |
|
|
threat_interval: int = 100, |
|
|
speed_multiplier: float = 1.0): |
|
|
ShowBase.__init__(self) |
|
|
|
|
|
self.dreamer_agent = dreamer_agent |
|
|
self.speed_multiplier = speed_multiplier |
|
|
|
|
|
|
|
|
self.hold = KAPSHoldController() |
|
|
|
|
|
|
|
|
if CinematicCamera is not None: |
|
|
self.cinema_cam = CinematicCamera() |
|
|
else: |
|
|
self.cinema_cam = None |
|
|
|
|
|
|
|
|
self.env = ExplorationKAPSEnv( |
|
|
episode_steps=episode_steps, |
|
|
threat_spawn_interval=threat_interval |
|
|
) |
|
|
|
|
|
|
|
|
self.obs = None |
|
|
self.episode_reward = 0 |
|
|
self.episode_count = 0 |
|
|
self.total_steps = 0 |
|
|
self.recent_rewards = deque(maxlen=100) |
|
|
self.recent_intercepts = deque(maxlen=100) |
|
|
|
|
|
|
|
|
self.disableMouse() |
|
|
self.camera_distance = 150 |
|
|
self.camera_heading = 45 |
|
|
self.camera_pitch = 25 |
|
|
self.camera_target = np.array([0.0, 0.0, 1000.0]) |
|
|
self.camera_mode_index = 0 |
|
|
|
|
|
|
|
|
self.setBackgroundColor(0.4, 0.6, 0.9) |
|
|
|
|
|
|
|
|
self._setup_lighting() |
|
|
|
|
|
|
|
|
self.mother_node = None |
|
|
self.tab_nodes: Dict[str, NodePath] = {} |
|
|
self.cable_nodes: Dict[str, NodePath] = {} |
|
|
self.threat_nodes: Dict[str, NodePath] = {} |
|
|
self.hud_texts = {} |
|
|
|
|
|
|
|
|
self.slingshot = SlingshotController(cable_length=50.0) if SlingshotController else None |
|
|
self.airfoiled_buzzard = AirfoiledBuzzard() if SlingshotController else None |
|
|
self.bola_node = None |
|
|
self.grid_fin_nodes: Dict[str, NodePath] = {} |
|
|
self.buzzard_wing_nodes = [] |
|
|
self.slingshot_cable_node = None |
|
|
|
|
|
|
|
|
self._create_ground() |
|
|
self._create_entities() |
|
|
self._create_hud() |
|
|
self._create_control_panel() |
|
|
|
|
|
|
|
|
self.accept("wheel_up", self._zoom_in) |
|
|
self.accept("wheel_down", self._zoom_out) |
|
|
self.accept("mouse1", self._start_drag) |
|
|
self.accept("mouse1-up", self._stop_drag) |
|
|
|
|
|
|
|
|
self.accept("c", self._cycle_camera_mode) |
|
|
self.accept("v", lambda: self._set_camera_mode(CameraMode.CHASE)) |
|
|
self.accept("b", lambda: self._set_camera_mode(CameraMode.ORBIT)) |
|
|
self.accept("n", lambda: self._set_camera_mode(CameraMode.TACTICAL)) |
|
|
self.accept("m", lambda: self._set_camera_mode(CameraMode.DYNAMIC)) |
|
|
self.accept(",", lambda: self._set_camera_mode(CameraMode.COCKPIT)) |
|
|
|
|
|
|
|
|
self.accept("space", self._toggle_pause) |
|
|
self.accept("f", self._toggle_ai_freeze) |
|
|
self.accept(".", self._step_once) |
|
|
|
|
|
|
|
|
self.accept("=", self._speed_up) |
|
|
self.accept("+", self._speed_up) |
|
|
self.accept("-", self._slow_down) |
|
|
|
|
|
|
|
|
self.accept("s", self._toggle_slingshot_mode) |
|
|
self.accept("x", self._slingshot_release) |
|
|
self.accept("z", self._deploy_grid_fins) |
|
|
self.accept("a", self._extend_buzzard_wings) |
|
|
|
|
|
|
|
|
self.accept("1", lambda: self._set_showcase(ShowcaseMode.SYSTEMS_CHECK)) |
|
|
self.accept("2", lambda: self._set_showcase(ShowcaseMode.DEFENSIVE_SPHERE)) |
|
|
self.accept("3", lambda: self._set_showcase(ShowcaseMode.INTERCEPTION_DRILL)) |
|
|
self.accept("4", lambda: self._set_showcase(ShowcaseMode.MAX_EXTENSION)) |
|
|
self.accept("5", lambda: self._set_showcase(ShowcaseMode.CORKSCREW_SPIN)) |
|
|
self.accept("6", lambda: self._set_showcase(ShowcaseMode.FORMATION_PARADE)) |
|
|
self.accept("7", lambda: self._set_showcase(ShowcaseMode.THROTTLE_TEST)) |
|
|
self.accept("8", lambda: self._set_showcase(ShowcaseMode.EVASIVE_DEMO)) |
|
|
self.accept("9", lambda: self._set_showcase(ShowcaseMode.SACRIFICE_DRILL)) |
|
|
self.accept("0", lambda: self._set_showcase(ShowcaseMode.SLINGSHOT_DEMO)) |
|
|
|
|
|
|
|
|
self.accept("escape", self._stop_showcase) |
|
|
self.accept("backspace", self._stop_showcase) |
|
|
self.accept("r", self._reset_episode) |
|
|
self.accept("q", self.userExit) |
|
|
|
|
|
self.mouse_dragging = False |
|
|
self.last_mouse_x = 0 |
|
|
self.last_mouse_y = 0 |
|
|
|
|
|
|
|
|
self.last_frame_time = time.time() |
|
|
|
|
|
|
|
|
self.taskMgr.add(self._update_task, "training_update") |
|
|
self.taskMgr.add(self._camera_task, "camera_update") |
|
|
|
|
|
|
|
|
self._reset_episode() |
|
|
|
|
|
self._print_controls() |
|
|
|
|
|
def _print_controls(self): |
|
|
"""Print control instructions to terminal.""" |
|
|
print("=" * 70) |
|
|
print(" KAPS VISUAL TRAINER - Cinematic Mode") |
|
|
print("=" * 70) |
|
|
print("") |
|
|
print(" CAMERA MODES:") |
|
|
print(" V - Chase (behind Buzzard, follows velocity)") |
|
|
print(" B - Orbit (circles around)") |
|
|
print(" N - Tactical (top-down overview)") |
|
|
print(" M - Dynamic (automatic cinematic transitions)") |
|
|
print(" , - Cockpit (first person)") |
|
|
print(" C - Cycle through modes") |
|
|
print("") |
|
|
print(" HOLD CONTROLS:") |
|
|
print(" SPACE - Pause/Resume (Full HOLD)") |
|
|
print(" F - Freeze AI only (environment continues)") |
|
|
print(" . - Step one frame") |
|
|
print("") |
|
|
print(" SPEED:") |
|
|
print(" +/- - Speed up/down") |
|
|
print("") |
|
|
print(" SHOWCASE DEMOS:") |
|
|
print(" 1 - Systems Check 2 - Defensive Sphere") |
|
|
print(" 3 - Intercept Drill 4 - Max Extension") |
|
|
print(" 5 - Corkscrew Spin 6 - Formation Parade") |
|
|
print(" 7 - Throttle Test 8 - Evasive Demo") |
|
|
print(" 9 - Sacrifice Drill 0 - Slingshot/Bola Demo") |
|
|
print("") |
|
|
print(" SLINGSHOT / BOLA MODE (Bruce Lee nunchaku!):") |
|
|
print(" S - Toggle Slingshot mode (consolidate TABs)") |
|
|
print(" X - Release bola at max velocity") |
|
|
print(" Z - Deploy grid fins (post-release steering)") |
|
|
print(" A - Extend Buzzard wings (airfoiled mode)") |
|
|
print("") |
|
|
print(" OTHER:") |
|
|
print(" ESC/BKSP - Stop showcase") |
|
|
print(" R - Reset episode Q - Quit") |
|
|
print("=" * 70) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _cycle_camera_mode(self): |
|
|
"""Cycle through camera modes.""" |
|
|
if self.cinema_cam is None: |
|
|
return |
|
|
modes = [CameraMode.CHASE, CameraMode.ORBIT, CameraMode.TACTICAL, |
|
|
CameraMode.DYNAMIC, CameraMode.COCKPIT, CameraMode.SIDE] |
|
|
self.camera_mode_index = (self.camera_mode_index + 1) % len(modes) |
|
|
self.cinema_cam.set_mode(modes[self.camera_mode_index]) |
|
|
self._update_camera_display() |
|
|
|
|
|
def _set_camera_mode(self, mode): |
|
|
"""Set specific camera mode.""" |
|
|
if self.cinema_cam is None or mode is None: |
|
|
return |
|
|
self.cinema_cam.set_mode(mode) |
|
|
self._update_camera_display() |
|
|
|
|
|
def _update_camera_display(self): |
|
|
"""Update camera mode in HUD.""" |
|
|
if "camera" in self.hud_texts and self.cinema_cam: |
|
|
self.hud_texts["camera"].setText(f"CAM: {self.cinema_cam.mode.name}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _toggle_pause(self): |
|
|
"""Toggle full pause.""" |
|
|
self.hold.toggle_pause() |
|
|
self._update_mode_display() |
|
|
|
|
|
def _toggle_ai_freeze(self): |
|
|
"""Toggle AI freeze (environment continues).""" |
|
|
if self.hold.state.mode == HoldMode.AI_PAUSE: |
|
|
self.hold.set_mode(HoldMode.CONTINUOUS) |
|
|
else: |
|
|
self.hold.set_mode(HoldMode.AI_PAUSE) |
|
|
self._update_mode_display() |
|
|
|
|
|
def _step_once(self): |
|
|
"""Step one frame.""" |
|
|
self.hold.step_once() |
|
|
|
|
|
def _speed_up(self): |
|
|
"""Increase simulation speed.""" |
|
|
self.hold.speed_up() |
|
|
self._update_mode_display() |
|
|
|
|
|
def _slow_down(self): |
|
|
"""Decrease simulation speed.""" |
|
|
self.hold.slow_down() |
|
|
self._update_mode_display() |
|
|
|
|
|
def _reset_speed(self): |
|
|
"""Reset to 1x speed.""" |
|
|
self.hold.set_speed(1.0) |
|
|
self._update_mode_display() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _toggle_slingshot_mode(self): |
|
|
"""Toggle between dispersed and bola mode.""" |
|
|
if self.slingshot is None: |
|
|
print("[SLINGSHOT] Not available") |
|
|
return |
|
|
|
|
|
if self.slingshot.state == SlingshotState.DISPERSED: |
|
|
|
|
|
print("[SLINGSHOT] Consolidating TABs into BOLA mode...") |
|
|
|
|
|
|
|
|
if self.env and self.env.sim: |
|
|
tab_states = {} |
|
|
for tab_id, tab in self.env.sim.tab_array.tabs.items(): |
|
|
tab_states[tab_id] = { |
|
|
'position': tab.position, |
|
|
'velocity': tab.velocity, |
|
|
'mass': getattr(tab, 'mass', 5.0), |
|
|
'is_attached': tab.is_attached |
|
|
} |
|
|
|
|
|
self.slingshot.consolidate_tabs(tab_states) |
|
|
self._create_bola_visual() |
|
|
print(f"[SLINGSHOT] BOLA READY! Mass: {self.slingshot.bola.mass} kg") |
|
|
|
|
|
|
|
|
buzzard_pos = self.env.sim.mother_drone.position |
|
|
self.slingshot.begin_swing(buzzard_pos, initial_angular_rate=1.0) |
|
|
print("[SLINGSHOT] SWINGING - Use X to release!") |
|
|
else: |
|
|
|
|
|
print("[SLINGSHOT] Dispersing to standard TAB mode...") |
|
|
self.slingshot.disperse_tabs() |
|
|
self._remove_bola_visual() |
|
|
|
|
|
self._update_mode_display() |
|
|
|
|
|
def _slingshot_release(self): |
|
|
"""Release the bola at current angle.""" |
|
|
if self.slingshot is None: |
|
|
return |
|
|
|
|
|
if self.slingshot.state == SlingshotState.SWINGING: |
|
|
pos, vel, speed = self.slingshot.release() |
|
|
print(f"[SLINGSHOT] RELEASED at {speed:.1f} m/s!") |
|
|
self._update_mode_display() |
|
|
|
|
|
def _deploy_grid_fins(self): |
|
|
"""Deploy grid fins for post-release steering.""" |
|
|
if self.slingshot is None: |
|
|
return |
|
|
|
|
|
if self.slingshot.state in [SlingshotState.BALLISTIC, SlingshotState.SAILING]: |
|
|
self.slingshot.deploy_grid_fins() |
|
|
self._update_grid_fin_visuals() |
|
|
print("[SLINGSHOT] Grid fins DEPLOYED!") |
|
|
|
|
|
|
|
|
if self.airfoiled_buzzard: |
|
|
self.airfoiled_buzzard.deploy_all_fins() |
|
|
print("[BUZZARD] Grid fins deployed!") |
|
|
|
|
|
def _extend_buzzard_wings(self): |
|
|
"""Extend Buzzard's lifting surfaces.""" |
|
|
if self.airfoiled_buzzard is None: |
|
|
return |
|
|
|
|
|
|
|
|
if self.airfoiled_buzzard.wing_extension < 0.5: |
|
|
target = 1.0 |
|
|
print("[BUZZARD] Extending wings to full span!") |
|
|
else: |
|
|
target = 0.0 |
|
|
print("[BUZZARD] Retracting wings!") |
|
|
|
|
|
|
|
|
self._buzzard_wing_target = target |
|
|
|
|
|
def _create_bola_visual(self): |
|
|
"""Create visual representation of the consolidated bola.""" |
|
|
if self.bola_node: |
|
|
self.bola_node.removeNode() |
|
|
|
|
|
|
|
|
bola_geom = create_sphere_geom(radius=6, color=(0.9, 0.6, 0.2, 1)) |
|
|
self.bola_node = self.render.attachNewNode(bola_geom) |
|
|
self.bola_node.setScale(1.5) |
|
|
|
|
|
|
|
|
if self.slingshot and self.slingshot.bola.grid_fins: |
|
|
for fin_id, fin in self.slingshot.bola.grid_fins.items(): |
|
|
fin_geom = create_airfoil_geom(wingspan=1.5, chord=0.4, |
|
|
color=(0.6, 0.6, 0.7, 1)) |
|
|
fin_node = self.bola_node.attachNewNode(fin_geom) |
|
|
fin_node.setPos(*fin.position_local * 5) |
|
|
fin_node.setScale(0.5) |
|
|
self.grid_fin_nodes[fin_id] = fin_node |
|
|
|
|
|
print("[VIZ] Created BOLA visual with grid fins") |
|
|
|
|
|
def _remove_bola_visual(self): |
|
|
"""Remove bola visual, show individual TABs again.""" |
|
|
if self.bola_node: |
|
|
self.bola_node.removeNode() |
|
|
self.bola_node = None |
|
|
|
|
|
for node in self.grid_fin_nodes.values(): |
|
|
node.removeNode() |
|
|
self.grid_fin_nodes.clear() |
|
|
|
|
|
|
|
|
for node in self.tab_nodes.values(): |
|
|
node.show() |
|
|
|
|
|
def _update_grid_fin_visuals(self): |
|
|
"""Update grid fin orientations based on deployment/deflection.""" |
|
|
if not self.slingshot or not self.slingshot.bola.grid_fins: |
|
|
return |
|
|
|
|
|
for fin_id, fin_node in self.grid_fin_nodes.items(): |
|
|
fin = self.slingshot.bola.grid_fins.get(fin_id) |
|
|
if fin: |
|
|
|
|
|
scale = 0.3 + 0.7 * fin.deploy_fraction |
|
|
fin_node.setScale(scale) |
|
|
|
|
|
|
|
|
fin_node.setR(fin.deflection) |
|
|
|
|
|
def _update_slingshot_visuals(self, dt: float): |
|
|
"""Update all slingshot-related visuals each frame.""" |
|
|
if self.slingshot is None: |
|
|
return |
|
|
|
|
|
state = self.slingshot.state |
|
|
|
|
|
if state == SlingshotState.DISPERSED: |
|
|
|
|
|
if self.bola_node: |
|
|
self.bola_node.hide() |
|
|
for node in self.tab_nodes.values(): |
|
|
node.show() |
|
|
return |
|
|
|
|
|
|
|
|
for node in self.tab_nodes.values(): |
|
|
node.hide() |
|
|
|
|
|
if self.bola_node: |
|
|
self.bola_node.show() |
|
|
|
|
|
|
|
|
if self.env and self.env.sim: |
|
|
buzzard_pos = self.env.sim.mother_drone.position |
|
|
buzzard_vel = self.env.sim.mother_drone.velocity |
|
|
|
|
|
if state == SlingshotState.SWINGING: |
|
|
|
|
|
self.slingshot.accelerate_swing(torque=3000.0, dt=dt) |
|
|
bola_pos, bola_vel, tension = self.slingshot.update_swing_physics( |
|
|
buzzard_pos, buzzard_vel, dt |
|
|
) |
|
|
|
|
|
|
|
|
if self.bola_node: |
|
|
self.bola_node.setPos(*bola_pos) |
|
|
|
|
|
|
|
|
angle_deg = np.degrees(self.slingshot.swing_angle) |
|
|
self.bola_node.setH(angle_deg) |
|
|
|
|
|
|
|
|
self._update_slingshot_cable(buzzard_pos, bola_pos, tension) |
|
|
|
|
|
elif state in [SlingshotState.BALLISTIC, SlingshotState.SAILING]: |
|
|
|
|
|
bola_pos = self.slingshot.update_ballistic(dt) |
|
|
|
|
|
if self.bola_node: |
|
|
self.bola_node.setPos(*bola_pos) |
|
|
|
|
|
|
|
|
self._update_grid_fin_visuals() |
|
|
|
|
|
|
|
|
if self.slingshot_cable_node: |
|
|
self.slingshot_cable_node.removeNode() |
|
|
self.slingshot_cable_node = None |
|
|
|
|
|
|
|
|
if self.env.threat_spawner: |
|
|
threats = self.env.threat_spawner.get_active_threats() |
|
|
if threats: |
|
|
target = threats[0].position |
|
|
self.slingshot.steer_to_target(target) |
|
|
|
|
|
|
|
|
if hasattr(self, '_buzzard_wing_target') and self.airfoiled_buzzard: |
|
|
self.airfoiled_buzzard.extend_wings( |
|
|
self._buzzard_wing_target, dt, rate=2.0 |
|
|
) |
|
|
self._update_buzzard_wing_visuals() |
|
|
|
|
|
def _update_slingshot_cable(self, start: np.ndarray, end: np.ndarray, tension: float): |
|
|
"""Draw the braided cable in slingshot mode.""" |
|
|
if self.slingshot_cable_node: |
|
|
self.slingshot_cable_node.removeNode() |
|
|
|
|
|
|
|
|
max_tension = 10000 |
|
|
tension_ratio = min(1.0, tension / max_tension) |
|
|
cable_color = ( |
|
|
0.4 + tension_ratio * 0.5, |
|
|
0.4 * (1 - tension_ratio), |
|
|
0.2, |
|
|
1.0 |
|
|
) |
|
|
|
|
|
|
|
|
if create_cable_geometry: |
|
|
cable_geom = create_cable_geometry( |
|
|
start=start, |
|
|
end=end, |
|
|
radius=0.8, |
|
|
segments=8, |
|
|
color=cable_color |
|
|
) |
|
|
if cable_geom: |
|
|
self.slingshot_cable_node = self.render.attachNewNode(cable_geom) |
|
|
return |
|
|
|
|
|
|
|
|
lines = LineSegs() |
|
|
lines.setThickness(5.0) |
|
|
lines.setColor(*cable_color) |
|
|
lines.moveTo(Point3(*start)) |
|
|
lines.drawTo(Point3(*end)) |
|
|
self.slingshot_cable_node = self.render.attachNewNode(lines.create()) |
|
|
|
|
|
def _update_buzzard_wing_visuals(self): |
|
|
"""Update Buzzard wing geometry based on extension.""" |
|
|
|
|
|
|
|
|
if self.mother_node and self.airfoiled_buzzard: |
|
|
ext = self.airfoiled_buzzard.wing_extension |
|
|
|
|
|
self.mother_node.setScale(1.0 + ext * 0.5, 1.0, 1.0) |
|
|
|
|
|
def _set_showcase(self, mode: ShowcaseMode): |
|
|
"""Start a showcase demonstration.""" |
|
|
self.hold.set_showcase(mode) |
|
|
self._update_mode_display() |
|
|
|
|
|
def _stop_showcase(self): |
|
|
"""Stop showcase, return to training.""" |
|
|
self.hold.stop_showcase() |
|
|
self._update_mode_display() |
|
|
|
|
|
def _update_mode_display(self): |
|
|
"""Update the mode indicator in HUD.""" |
|
|
if "mode" not in self.hud_texts: |
|
|
return |
|
|
|
|
|
state = self.hold.state |
|
|
|
|
|
|
|
|
slingshot_active = (self.slingshot and |
|
|
self.slingshot.state != SlingshotState.DISPERSED) |
|
|
|
|
|
if slingshot_active: |
|
|
|
|
|
ss = self.slingshot |
|
|
if ss.state == SlingshotState.SWINGING: |
|
|
speed = ss.get_release_velocity() |
|
|
text = f"[SLINGSHOT] SWINGING - {speed:.0f} m/s - Press X to release!" |
|
|
color = (1.0, 0.6, 0.0, 1) |
|
|
elif ss.state == SlingshotState.BOLA_READY: |
|
|
text = "[SLINGSHOT] BOLA READY - Building momentum..." |
|
|
color = (0.8, 0.8, 0.0, 1) |
|
|
elif ss.state == SlingshotState.BALLISTIC: |
|
|
text = "[SLINGSHOT] BALLISTIC - Press Z for grid fins!" |
|
|
color = (0.0, 0.8, 1.0, 1) |
|
|
elif ss.state == SlingshotState.SAILING: |
|
|
text = "[SLINGSHOT] SAILING - Grid fins active!" |
|
|
color = (0.0, 1.0, 0.5, 1) |
|
|
else: |
|
|
text = f"[SLINGSHOT] {ss.state.name}" |
|
|
color = (0.8, 0.6, 0.2, 1) |
|
|
elif state.showcase != ShowcaseMode.NONE: |
|
|
|
|
|
phase = state.showcase_phase or state.showcase.name |
|
|
text = f"[SHOWCASE] {phase}" |
|
|
color = (1.0, 0.5, 0.0, 1) |
|
|
elif state.mode == HoldMode.FULL_PAUSE: |
|
|
text = "[PAUSED] Press SPACE to resume" |
|
|
color = (1.0, 0.2, 0.2, 1) |
|
|
elif state.mode == HoldMode.AI_PAUSE: |
|
|
text = "[AI FROZEN] Environment running - Press F to resume" |
|
|
color = (0.2, 0.5, 1.0, 1) |
|
|
elif state.mode == HoldMode.STEP: |
|
|
text = "[STEP] Press . to advance" |
|
|
color = (1.0, 1.0, 0.2, 1) |
|
|
else: |
|
|
|
|
|
speed = state.time_scale |
|
|
if self.dreamer_agent: |
|
|
text = f"[TRAINING] DreamerV3 | {speed:.1f}x" |
|
|
else: |
|
|
text = f"[TRAINING] Random | {speed:.1f}x" |
|
|
color = (0.2, 1.0, 0.2, 1) |
|
|
|
|
|
self.hud_texts["mode"].setText(text) |
|
|
|
|
|
def _setup_lighting(self): |
|
|
ambient = AmbientLight("ambient") |
|
|
ambient.setColor(Vec4(0.4, 0.4, 0.5, 1)) |
|
|
self.render.setLight(self.render.attachNewNode(ambient)) |
|
|
|
|
|
sun = DirectionalLight("sun") |
|
|
sun.setColor(Vec4(1.0, 0.95, 0.9, 1)) |
|
|
sun_np = self.render.attachNewNode(sun) |
|
|
sun_np.setHpr(45, -45, 0) |
|
|
self.render.setLight(sun_np) |
|
|
|
|
|
def _create_ground(self): |
|
|
"""Ocean surface""" |
|
|
lines = LineSegs() |
|
|
lines.setThickness(1.0) |
|
|
lines.setColor(0.1, 0.3, 0.5, 0.5) |
|
|
|
|
|
for i in range(-1000, 1001, 100): |
|
|
lines.moveTo(Point3(i, -1000, 0)) |
|
|
lines.drawTo(Point3(i, 1000, 0)) |
|
|
lines.moveTo(Point3(-1000, i, 0)) |
|
|
lines.drawTo(Point3(1000, i, 0)) |
|
|
|
|
|
self.render.attachNewNode(lines.create()) |
|
|
|
|
|
def _create_entities(self): |
|
|
"""Create Buzzard (mother drone) and TAB airfoils with VISIBLE geometry""" |
|
|
|
|
|
|
|
|
|
|
|
buzzard_created = False |
|
|
if create_buzzard_geometry: |
|
|
try: |
|
|
buzzard_geom = create_buzzard_geometry( |
|
|
length=15.0, |
|
|
width=6.0, |
|
|
height=4.0, |
|
|
color=(0.2, 0.4, 0.9, 1.0) |
|
|
) |
|
|
if buzzard_geom: |
|
|
self.mother_node = self.render.attachNewNode(buzzard_geom) |
|
|
self.mother_node.setScale(3.0) |
|
|
self.mother_node.setTwoSided(True) |
|
|
|
|
|
self.mother_node.setH(90) |
|
|
buzzard_created = True |
|
|
print(f"[VIZ] Created VISIBLE Buzzard - BIG elongated fuselage, scale 3.0") |
|
|
except Exception as e: |
|
|
print(f"[VIZ] Error creating buzzard geometry: {e}") |
|
|
|
|
|
if not buzzard_created: |
|
|
|
|
|
|
|
|
from panda3d.core import CardMaker |
|
|
cm = CardMaker("buzzard_card") |
|
|
cm.setFrame(-10, 10, -5, 5) |
|
|
cm.setColor(0.2, 0.4, 0.9, 1.0) |
|
|
|
|
|
self.mother_node = self.render.attachNewNode("buzzard_fallback") |
|
|
|
|
|
|
|
|
|
|
|
top = self.mother_node.attachNewNode(cm.generate()) |
|
|
top.setPos(0, 0, 4) |
|
|
top.setP(-90) |
|
|
|
|
|
bottom = self.mother_node.attachNewNode(cm.generate()) |
|
|
bottom.setPos(0, 0, -4) |
|
|
bottom.setP(90) |
|
|
|
|
|
front = self.mother_node.attachNewNode(cm.generate()) |
|
|
front.setPos(0, 10, 0) |
|
|
|
|
|
back = self.mother_node.attachNewNode(cm.generate()) |
|
|
back.setPos(0, -10, 0) |
|
|
back.setH(180) |
|
|
|
|
|
left = self.mother_node.attachNewNode(cm.generate()) |
|
|
left.setPos(-5, 0, 0) |
|
|
left.setH(-90) |
|
|
|
|
|
right = self.mother_node.attachNewNode(cm.generate()) |
|
|
right.setPos(5, 0, 0) |
|
|
right.setH(90) |
|
|
|
|
|
self.mother_node.setTwoSided(True) |
|
|
print("[VIZ] Created fallback BOX Buzzard (cards)") |
|
|
|
|
|
|
|
|
tab_colors = TAB_COLORS if TAB_COLORS else { |
|
|
"UP": (0.2, 1.0, 0.2, 1.0), |
|
|
"DOWN": (1.0, 0.2, 0.2, 1.0), |
|
|
"LEFT": (1.0, 1.0, 0.2, 1.0), |
|
|
"RIGHT": (1.0, 0.2, 1.0, 1.0), |
|
|
} |
|
|
|
|
|
for tab_id, color in tab_colors.items(): |
|
|
|
|
|
if create_airfoil_geometry: |
|
|
geom = create_airfoil_geometry( |
|
|
wingspan=12.0, |
|
|
chord=4.0, |
|
|
thickness=1.2, |
|
|
color=color |
|
|
) |
|
|
if geom: |
|
|
node = self.render.attachNewNode(geom) |
|
|
node.setScale(2.5) |
|
|
node.setTwoSided(True) |
|
|
|
|
|
node.setH(90) |
|
|
self.tab_nodes[tab_id] = node |
|
|
self.cable_nodes[tab_id] = None |
|
|
print(f"[VIZ] Created BIG delta wing for {tab_id}") |
|
|
continue |
|
|
|
|
|
|
|
|
geom = create_airfoil_geom(wingspan=8.0, chord=2.5, color=color) |
|
|
node = self.render.attachNewNode(geom) |
|
|
node.setScale(3.0) |
|
|
node.setTwoSided(True) |
|
|
self.tab_nodes[tab_id] = node |
|
|
self.cable_nodes[tab_id] = None |
|
|
print(f"[VIZ] Created fallback airfoil for {tab_id}") |
|
|
|
|
|
def _create_hud(self): |
|
|
"""Create training statistics HUD - BUZZARD HEALTH is PRIMARY""" |
|
|
|
|
|
|
|
|
health_txt = TextNode("buzzard_health") |
|
|
health_txt.setText("BUZZARD: 100%") |
|
|
health_txt.setAlign(TextNode.ACenter) |
|
|
health_np = self.aspect2d.attachNewNode(health_txt) |
|
|
health_np.setScale(0.08) |
|
|
health_np.setPos(0, 0, 0.85) |
|
|
health_np.setColor(0.2, 1.0, 0.2, 1) |
|
|
self.hud_texts["buzzard_health"] = health_txt |
|
|
|
|
|
|
|
|
texts = [ |
|
|
("episode", "Episode: 0", -1.3, 0.9), |
|
|
("step", "Step: 0", -1.3, 0.85), |
|
|
("reward", "Reward: 0.0", -1.3, 0.8), |
|
|
("avg_reward", "Avg Reward: 0.0", -1.3, 0.75), |
|
|
] |
|
|
|
|
|
for name, text, x, y in texts: |
|
|
txt = TextNode(name) |
|
|
txt.setText(text) |
|
|
txt.setAlign(TextNode.ALeft) |
|
|
txt_np = self.aspect2d.attachNewNode(txt) |
|
|
txt_np.setScale(0.05) |
|
|
txt_np.setPos(x, 0, y) |
|
|
txt_np.setColor(1, 1, 1, 1) |
|
|
self.hud_texts[name] = txt |
|
|
|
|
|
|
|
|
explore_texts = [ |
|
|
("intercepts", "Kills: 0", 0.7, 0.9), |
|
|
("sacrificed", "Sacrificed: 0", 0.7, 0.85), |
|
|
("threats", "Threats: 0", 0.7, 0.8), |
|
|
("impacts", "Hits Taken: 0", 0.7, 0.75), |
|
|
] |
|
|
|
|
|
for name, text, x, y in explore_texts: |
|
|
txt = TextNode(name) |
|
|
txt.setText(text) |
|
|
txt.setAlign(TextNode.ALeft) |
|
|
txt_np = self.aspect2d.attachNewNode(txt) |
|
|
txt_np.setScale(0.05) |
|
|
txt_np.setPos(x, 0, y) |
|
|
txt_np.setColor(0.2, 1.0, 0.5, 1) |
|
|
self.hud_texts[name] = txt |
|
|
|
|
|
|
|
|
for i, tab_id in enumerate(["UP", "DOWN", "LEFT", "RIGHT"]): |
|
|
txt = TextNode(f"tab_{tab_id}") |
|
|
txt.setText(f"{tab_id}: READY") |
|
|
txt.setAlign(TextNode.ACenter) |
|
|
txt_np = self.aspect2d.attachNewNode(txt) |
|
|
txt_np.setScale(0.04) |
|
|
txt_np.setPos(-0.6 + i * 0.4, 0, -0.85) |
|
|
self.hud_texts[f"tab_{tab_id}"] = txt |
|
|
|
|
|
|
|
|
mode_txt = TextNode("mode") |
|
|
mode_txt.setText("[TRAINING] Initializing...") |
|
|
mode_txt.setAlign(TextNode.ACenter) |
|
|
mode_np = self.aspect2d.attachNewNode(mode_txt) |
|
|
mode_np.setScale(0.06) |
|
|
mode_np.setPos(0, 0, 0.95) |
|
|
mode_np.setColor(1, 0.8, 0.2, 1) |
|
|
self.hud_texts["mode"] = mode_txt |
|
|
|
|
|
|
|
|
cam_txt = TextNode("camera") |
|
|
cam_txt.setText("CAM: CHASE") |
|
|
cam_txt.setAlign(TextNode.ARight) |
|
|
cam_np = self.aspect2d.attachNewNode(cam_txt) |
|
|
cam_np.setScale(0.05) |
|
|
cam_np.setPos(1.3, 0, -0.9) |
|
|
cam_np.setColor(0.7, 0.9, 1.0, 1) |
|
|
self.hud_texts["camera"] = cam_txt |
|
|
|
|
|
def _create_control_panel(self): |
|
|
"""Create showcase selection help text (bottom right).""" |
|
|
help_lines = [ |
|
|
"SHOWCASES:", |
|
|
"1-Systems 2-Defense 3-Intercept", |
|
|
"4-Extend 5-Corkscrew 6-Formation", |
|
|
"7-Throttle 8-Evasive 9-Sacrifice", |
|
|
"", |
|
|
"SPACE-Pause F-AI Freeze .-Step", |
|
|
] |
|
|
|
|
|
for i, line in enumerate(help_lines): |
|
|
txt = TextNode(f"help_{i}") |
|
|
txt.setText(line) |
|
|
txt.setAlign(TextNode.ARight) |
|
|
txt_np = self.aspect2d.attachNewNode(txt) |
|
|
txt_np.setScale(0.035) |
|
|
txt_np.setPos(1.3, 0, -0.5 - i * 0.05) |
|
|
txt_np.setColor(0.7, 0.7, 0.7, 0.8) |
|
|
|
|
|
def _reset_episode(self): |
|
|
"""Reset to new episode""" |
|
|
self.obs, info = self.env.reset() |
|
|
self.episode_reward = 0 |
|
|
self.episode_count += 1 |
|
|
print(f"\n[EPISODE {self.episode_count}] Starting...") |
|
|
|
|
|
def _update_task(self, task): |
|
|
"""Main training update loop""" |
|
|
if self.obs is None: |
|
|
return Task.cont |
|
|
|
|
|
|
|
|
if not self.hold.should_step(): |
|
|
|
|
|
return Task.cont |
|
|
|
|
|
|
|
|
ai_action = None |
|
|
if self.hold.should_ai_act(): |
|
|
if self.dreamer_agent is not None: |
|
|
|
|
|
obs_dict = self._obs_to_dict(self.obs) |
|
|
ai_action = self.dreamer_agent.infer(obs_dict) |
|
|
ai_action = np.clip(ai_action, -1.0, 1.0) |
|
|
if len(ai_action) < 18: |
|
|
ai_action = np.concatenate([ai_action, np.zeros(18 - len(ai_action))]) |
|
|
elif len(ai_action) > 18: |
|
|
ai_action = ai_action[:18] |
|
|
else: |
|
|
|
|
|
ai_action = self.env.action_space.sample() |
|
|
|
|
|
|
|
|
action = self.hold.get_action(self.obs, ai_action) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.obs, reward, terminated, truncated, info = self.env.step(action) |
|
|
self.episode_reward += reward |
|
|
self.total_steps += 1 |
|
|
|
|
|
|
|
|
if self.hold.state.showcase != ShowcaseMode.NONE: |
|
|
self.hold.state.showcase_phase = self.hold.showcase.phase |
|
|
self._update_mode_display() |
|
|
|
|
|
|
|
|
self._update_entities() |
|
|
self._update_threats() |
|
|
self._update_hud(info) |
|
|
|
|
|
|
|
|
dt = globalClock.getDt() |
|
|
self._update_slingshot_visuals(dt) |
|
|
|
|
|
|
|
|
if terminated or truncated: |
|
|
self.recent_rewards.append(self.episode_reward) |
|
|
self.recent_intercepts.append(info.get('intercepts', 0)) |
|
|
print(f"[EPISODE {self.episode_count}] " |
|
|
f"Reward: {self.episode_reward:.1f} | " |
|
|
f"Intercepts: {info.get('intercepts', 0)} | " |
|
|
f"Novel: {info.get('novel_states', 0)}") |
|
|
self._reset_episode() |
|
|
|
|
|
return Task.cont |
|
|
|
|
|
def _update_entities(self): |
|
|
"""Update visual positions""" |
|
|
sim = self.env.sim |
|
|
if sim is None: |
|
|
return |
|
|
|
|
|
|
|
|
md_pos = sim.mother_drone.position |
|
|
self.mother_node.setPos(md_pos[0], md_pos[1], md_pos[2]) |
|
|
|
|
|
|
|
|
self.camera_target = md_pos.copy() |
|
|
|
|
|
|
|
|
for tab_id, node in self.tab_nodes.items(): |
|
|
tab = sim.tab_array.tabs[tab_id] |
|
|
pos = tab.position |
|
|
vel = tab.velocity |
|
|
node.setPos(pos[0], pos[1], pos[2]) |
|
|
|
|
|
|
|
|
|
|
|
if np.linalg.norm(vel) > 0.1: |
|
|
|
|
|
heading = np.degrees(np.arctan2(vel[0], vel[1])) |
|
|
pitch = np.degrees(np.arctan2(vel[2], np.sqrt(vel[0]**2 + vel[1]**2))) |
|
|
node.setHpr(heading, pitch, 0) |
|
|
|
|
|
|
|
|
if not tab.is_attached: |
|
|
node.setColorScale(0.5, 0.5, 0.5, 0.5) |
|
|
else: |
|
|
node.setColorScale(1, 1, 1, 1) |
|
|
|
|
|
|
|
|
if self.cable_nodes[tab_id]: |
|
|
self.cable_nodes[tab_id].removeNode() |
|
|
|
|
|
if tab.is_attached: |
|
|
|
|
|
cable_length = np.linalg.norm(pos - md_pos) |
|
|
max_length = 60.0 |
|
|
tension_ratio = min(1.0, cable_length / max_length) |
|
|
|
|
|
|
|
|
cable_color = ( |
|
|
0.3 + tension_ratio * 0.6, |
|
|
0.6 * (1 - tension_ratio), |
|
|
0.2, |
|
|
1.0 |
|
|
) |
|
|
|
|
|
|
|
|
lines = LineSegs("cable") |
|
|
lines.setThickness(6.0) |
|
|
lines.setColor(*cable_color) |
|
|
lines.moveTo(Point3(float(md_pos[0]), float(md_pos[1]), float(md_pos[2]))) |
|
|
lines.drawTo(Point3(float(pos[0]), float(pos[1]), float(pos[2]))) |
|
|
self.cable_nodes[tab_id] = self.render.attachNewNode(lines.create()) |
|
|
|
|
|
def _update_threats(self): |
|
|
"""Update threat visuals""" |
|
|
|
|
|
for node in self.threat_nodes.values(): |
|
|
node.removeNode() |
|
|
self.threat_nodes.clear() |
|
|
|
|
|
|
|
|
if self.env.threat_spawner is None: |
|
|
return |
|
|
|
|
|
for threat in self.env.threat_spawner.get_active_threats(): |
|
|
|
|
|
if threat.profile.type == ThreatType.IR_MISSILE: |
|
|
color = (1.0, 0.3, 0.1, 1) |
|
|
size = 2 |
|
|
elif threat.profile.type == ThreatType.RADAR_MISSILE: |
|
|
color = (1.0, 0.1, 0.1, 1) |
|
|
size = 2.5 |
|
|
elif threat.profile.type == ThreatType.ATTACK_DRONE: |
|
|
color = (0.8, 0.4, 0.8, 1) |
|
|
size = 3 |
|
|
elif threat.profile.type == ThreatType.SWARM_ELEMENT: |
|
|
color = (1.0, 0.6, 0.0, 1) |
|
|
size = 1 |
|
|
else: |
|
|
color = (0.7, 0.7, 0.7, 1) |
|
|
size = 2 |
|
|
|
|
|
geom = create_sphere_geom(radius=size, color=color) |
|
|
node = self.render.attachNewNode(geom) |
|
|
pos = threat.position |
|
|
node.setPos(pos[0], pos[1], pos[2]) |
|
|
self.threat_nodes[threat.threat_id] = node |
|
|
|
|
|
def _update_hud(self, info: Dict): |
|
|
"""Update HUD texts - BUZZARD HEALTH is PRIMARY""" |
|
|
|
|
|
|
|
|
health = info.get('buzzard_health', 100.0) |
|
|
self.hud_texts["buzzard_health"].setText(f"BUZZARD: {health:.0f}%") |
|
|
|
|
|
|
|
|
if health > 70: |
|
|
|
|
|
pass |
|
|
elif health > 30: |
|
|
|
|
|
pass |
|
|
else: |
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
self.hud_texts["episode"].setText(f"Episode: {self.episode_count}") |
|
|
self.hud_texts["step"].setText(f"Step: {info.get('step', 0)}") |
|
|
self.hud_texts["reward"].setText(f"Reward: {self.episode_reward:.1f}") |
|
|
|
|
|
if self.recent_rewards: |
|
|
avg = np.mean(self.recent_rewards) |
|
|
self.hud_texts["avg_reward"].setText(f"Avg Reward (100): {avg:.1f}") |
|
|
|
|
|
|
|
|
self.hud_texts["intercepts"].setText(f"Kills: {info.get('intercepts', 0)}") |
|
|
self.hud_texts["sacrificed"].setText(f"Sacrificed: {info.get('tabs_sacrificed', 0)}") |
|
|
self.hud_texts["threats"].setText(f"Threats: {info.get('threats_active', 0)}") |
|
|
self.hud_texts["impacts"].setText(f"Hits Taken: {info.get('impacts', 0)}") |
|
|
|
|
|
|
|
|
sim = self.env.sim |
|
|
if sim: |
|
|
for tab_id in ["UP", "DOWN", "LEFT", "RIGHT"]: |
|
|
tab = sim.tab_array.tabs[tab_id] |
|
|
if tab.is_attached: |
|
|
status = "READY" |
|
|
else: |
|
|
status = "LAUNCHED" |
|
|
self.hud_texts[f"tab_{tab_id}"].setText(f"{tab_id}: {status}") |
|
|
|
|
|
def _camera_task(self, task): |
|
|
"""Camera control with cinematic modes""" |
|
|
dt = globalClock.getDt() |
|
|
|
|
|
|
|
|
if self.cinema_cam.mode == CameraMode.FREE: |
|
|
if self.mouse_dragging and self.mouseWatcherNode.hasMouse(): |
|
|
mx = self.mouseWatcherNode.getMouseX() |
|
|
my = self.mouseWatcherNode.getMouseY() |
|
|
|
|
|
dx = (mx - self.last_mouse_x) * 200 |
|
|
dy = (my - self.last_mouse_y) * 200 |
|
|
|
|
|
self.camera_heading += dx |
|
|
self.camera_pitch = np.clip(self.camera_pitch + dy, -85, 85) |
|
|
|
|
|
self.last_mouse_x = mx |
|
|
self.last_mouse_y = my |
|
|
|
|
|
|
|
|
rad_h = np.radians(self.camera_heading) |
|
|
rad_p = np.radians(self.camera_pitch) |
|
|
|
|
|
x = self.camera_distance * np.cos(rad_p) * np.sin(rad_h) |
|
|
y = self.camera_distance * np.cos(rad_p) * np.cos(rad_h) |
|
|
z = self.camera_distance * np.sin(rad_p) |
|
|
|
|
|
cam_pos = self.camera_target + np.array([x, -y, z]) |
|
|
self.camera.setPos(cam_pos[0], cam_pos[1], cam_pos[2]) |
|
|
self.camera.lookAt(Point3(*self.camera_target)) |
|
|
else: |
|
|
|
|
|
|
|
|
buzzard_pos = self.camera_target |
|
|
buzzard_vel = np.array([0, 50, 0]) |
|
|
|
|
|
|
|
|
if self.env and self.env.sim: |
|
|
buzzard_vel = self.env.sim.mother_drone.velocity |
|
|
|
|
|
|
|
|
threats = [] |
|
|
if self.env and self.env.threat_spawner: |
|
|
threats = self.env.threat_spawner.get_active_threats() |
|
|
|
|
|
|
|
|
cam_pos, look_at, fov = self.cinema_cam.update( |
|
|
target_pos=buzzard_pos, |
|
|
target_vel=buzzard_vel, |
|
|
dt=dt, |
|
|
threats=threats |
|
|
) |
|
|
|
|
|
|
|
|
self.camera.setPos(cam_pos[0], cam_pos[1], cam_pos[2]) |
|
|
self.camera.lookAt(Point3(*look_at)) |
|
|
|
|
|
|
|
|
if hasattr(self, 'camLens'): |
|
|
self.camLens.setFov(fov) |
|
|
|
|
|
return Task.cont |
|
|
|
|
|
def _start_drag(self): |
|
|
if self.mouseWatcherNode.hasMouse(): |
|
|
self.mouse_dragging = True |
|
|
self.last_mouse_x = self.mouseWatcherNode.getMouseX() |
|
|
self.last_mouse_y = self.mouseWatcherNode.getMouseY() |
|
|
|
|
|
def _stop_drag(self): |
|
|
self.mouse_dragging = False |
|
|
|
|
|
def _zoom_in(self): |
|
|
self.camera_distance = max(30, self.camera_distance * 0.85) |
|
|
|
|
|
def _zoom_out(self): |
|
|
self.camera_distance = min(500, self.camera_distance * 1.15) |
|
|
|
|
|
def _obs_to_dict(self, obs: np.ndarray) -> Dict: |
|
|
"""Convert flat observation to dict for DreamerV3""" |
|
|
return { |
|
|
'mother_drone': { |
|
|
'position': obs[0:3] * 1000, |
|
|
'velocity': obs[3:6] * 100, |
|
|
}, |
|
|
'tabs': {}, |
|
|
'threats': [] |
|
|
} |
|
|
|
|
|
|
|
|
def run_visual_training(use_dreamer: bool = True, explore: bool = True): |
|
|
""" |
|
|
Launch visual training mode. |
|
|
|
|
|
Args: |
|
|
use_dreamer: Load DreamerV3 weights (gen42) |
|
|
explore: Use ExplorationController instead of frozen inference |
|
|
NOTE: DreamerV3 gen42 was NOT trained on KAPS physics. |
|
|
Set explore=True to actually see the airfoils demonstrate |
|
|
what they can do! |
|
|
""" |
|
|
if not PANDA3D_AVAILABLE: |
|
|
print("Panda3D not available!") |
|
|
return |
|
|
|
|
|
print("=" * 70) |
|
|
print("KAPS VISUAL TRAINING with HOLD Control") |
|
|
print("=" * 70) |
|
|
|
|
|
dreamer_agent = None |
|
|
if use_dreamer: |
|
|
try: |
|
|
from src.ai.dreamer_interface import DreamerBrainInterface |
|
|
dreamer_agent = DreamerBrainInterface() |
|
|
print("[✓] DreamerV3 agent loaded (trainable)") |
|
|
except Exception as e: |
|
|
print(f"[!] Could not load DreamerV3: {e}") |
|
|
print("[!] Running with random baseline") |
|
|
|
|
|
print("") |
|
|
print("Interactive HOLD controls enabled:") |
|
|
print(" - SPACE to pause/resume") |
|
|
print(" - F to freeze AI (environment continues)") |
|
|
print(" - 1-9 for showcase demonstrations") |
|
|
print("=" * 70) |
|
|
|
|
|
trainer = VisualTrainer(dreamer_agent=dreamer_agent) |
|
|
trainer._update_mode_display() |
|
|
trainer.run() |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import argparse |
|
|
parser = argparse.ArgumentParser(description="KAPS Visual Training with HOLD Control") |
|
|
parser.add_argument("--dreamer", action="store_true", |
|
|
help="Load DreamerV3 agent for training") |
|
|
parser.add_argument("--random", action="store_true", |
|
|
help="Use random actions only (no DreamerV3)") |
|
|
args = parser.parse_args() |
|
|
|
|
|
run_visual_training(use_dreamer=args.dreamer and not args.random) |
|
|
|
|
|
|