File size: 4,622 Bytes
f17ae24 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 | import numpy as np
import cv2
import torch
import yaml
import os
import sys
# Add project root to path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")))
from wm.dataset.dataset import RoboticsDatasetWrapper
def create_recon_overlay_video(config_path, sample_idx=0, output_dir="results/visualizations"):
os.makedirs(output_dir, exist_ok=True)
# 1. Load Config
with open(config_path, 'r') as f:
config = yaml.safe_load(f)
# 2. Setup Dataset (Force RECON)
print(f"Loading dataset: recon")
dataset = RoboticsDatasetWrapper.get_dataset("recon")
# 3. Get a sample
sample = dataset[sample_idx]
obs = sample['obs'] # [T, 3, H, W] in [0, 1]
actions = sample['action'] # [T, 2] (v, w)
if isinstance(obs, torch.Tensor):
obs = obs.numpy()
if isinstance(actions, torch.Tensor):
actions = actions.numpy()
T, C, H, W = obs.shape
# 4. Integrate Actions (v, w) -> (x, y)
# Assuming v is linear velocity, w is angular velocity
# dt = 1/fps. Default is 10 FPS
dt = 0.1
x, y, theta = 0.0, 0.0, 0.0
path = [[x, y]]
thetas = [theta]
for t in range(T-1):
v = actions[t, 0]
w = actions[t, 1]
theta += w * dt
x += v * np.cos(theta) * dt
y += v * np.sin(theta) * dt
path.append([x, y])
thetas.append(theta)
path = np.array(path)
# 5. Scaling to Pixels
# Center the path and scale to middle 60% of image height/width
max_disp = np.abs(path).max()
scale = (min(H, W) * 0.3) / max_disp if max_disp > 0 else 1.0
# Map to pixel coordinates (Relative to center)
# Robot X (forward) -> Pixel -Y (up)
# Robot Y (left) -> Pixel -X (left)
# So:
# pixel_x = center_x - robot_y * scale
# pixel_y = center_y - robot_x * scale
pixel_path = np.zeros_like(path)
pixel_path[:, 0] = W // 2 - path[:, 1] * scale
pixel_path[:, 1] = H // 2 - path[:, 0] * scale
# 6. Video Setup
video_path = os.path.join(output_dir, f"recon_overlay_video_sample_{sample_idx}.mp4")
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(video_path, fourcc, 10.0, (W, H))
print(f"Generating RECON overlay video: {W}x{H}, {T} frames")
for t in range(T):
# Prepare frame: [C, H, W] -> [H, W, C]
frame = (np.transpose(obs[t], (1, 2, 0)) * 255).astype(np.uint8)
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
# Draw the full path history up to time t
if t > 0:
for i in range(1, t + 1):
pt1 = (int(pixel_path[i-1, 0]), int(pixel_path[i-1, 1]))
pt2 = (int(pixel_path[i, 0]), int(pixel_path[i, 1]))
# Color fades from Blue (start) to Red (end)
color = (int(255 * (1 - i/T)), 0, int(255 * (i/T)))
cv2.line(frame, pt1, pt2, color, 2, cv2.LINE_AA)
# Draw current position (Green dot)
curr_pos = (int(pixel_path[t, 0]), int(pixel_path[t, 1]))
cv2.circle(frame, curr_pos, 5, (0, 255, 0), -1, cv2.LINE_AA)
# Draw current orientation/velocity (White arrow)
# Robot heading theta is relative to Robot X (which is now UP)
# We need to rotate the arrow appropriately
curr_theta = thetas[t]
arrow_len = 20
# Robot X-axis is UP (-Y in pixels), Robot Y-axis is LEFT (-X in pixels)
# Heading 0 is Robot X (UP): adx = 0, ady = -arrow_len
# Heading pi/2 is Robot Y (LEFT): adx = -arrow_len, ady = 0
adx = -np.sin(curr_theta) * arrow_len
ady = -np.cos(curr_theta) * arrow_len
arrow_end = (int(curr_pos[0] + adx), int(curr_pos[1] + ady))
cv2.arrowedLine(frame, curr_pos, arrow_end, (255, 255, 255), 2, tipLength=0.3)
# Add Step Label and velocity info
cv2.putText(frame, f"Step: {t}", (10, 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
v, w = actions[t]
cv2.putText(frame, f"v: {v:.2f} w: {w:.2f}", (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
out.write(frame)
out.release()
print(f"RECON overlay video saved to: {video_path}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--config", type=str, default="wm/config/fulltraj_dit/lang_table.yaml")
parser.add_argument("--sample", type=int, default=0)
args = parser.parse_args()
create_recon_overlay_video(args.config, args.sample)
|