from __future__ import annotations import argparse import json import warnings from dataclasses import replace from pathlib import Path import cv2 import numpy as np import pandas as pd from PIL import Image from scipy.optimize import minimize from scene_tools import ( DEFAULT_ROBOT_WORLD_RVEC, DEFAULT_ROBOT_WORLD_TVEC, SceneCalibration, load_scene_calibration, render_robot_mask, render_scene, ) def parse_rows(spec: str) -> list[int]: return [int(chunk.strip()) for chunk in spec.split(",") if chunk.strip()] def load_rgb_frames(session_root: Path, sync: pd.DataFrame, rows: list[int]) -> dict[int, np.ndarray]: return {row_idx: np.array(Image.open(session_root / sync.iloc[row_idx]["azure_rgb_file"]).convert("RGB")) for row_idx in rows} def load_q_rows(sync: pd.DataFrame, rows: list[int]) -> dict[int, np.ndarray]: return {row_idx: sync.iloc[row_idx][[f"q{i}_deg" for i in range(1, 8)]].to_numpy(np.float64) for row_idx in rows} def compute_motion_mask(base_rgb: np.ndarray, rgb: np.ndarray, roi_mask: np.ndarray, threshold: int = 18) -> np.ndarray: diff = np.abs(rgb.astype(np.int16) - base_rgb.astype(np.int16)).mean(axis=2).astype(np.uint8) mask = ((diff > threshold) & roi_mask).astype(np.uint8) mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((5, 5), dtype=np.uint8)) mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, np.ones((11, 11), dtype=np.uint8)) return mask > 0 def mask_iou(lhs: np.ndarray, rhs: np.ndarray) -> float: intersection = np.logical_and(lhs, rhs).sum() union = np.logical_or(lhs, rhs).sum() return float(intersection / union) if union else 0.0 def fit_robot_transform( calibration: SceneCalibration, rgbs: dict[int, np.ndarray], q_by_row: dict[int, np.ndarray], rows: list[int] ) -> tuple[np.ndarray, dict[int, np.ndarray], dict[int, float], float]: start_row = rows[0] base_rgb = rgbs[start_row] roi_mask = np.zeros(base_rgb.shape[:2], dtype=bool) roi_mask[250:1079, 850:1919] = True motion_masks = {row_idx: compute_motion_mask(base_rgb, rgbs[row_idx], roi_mask) for row_idx in rows[1:]} def score(params: np.ndarray) -> float: robot_world_rvec = params[:3] robot_world_tvec = params[3:] start_mask = render_robot_mask( base_rgb.shape, q_by_row[start_row], calibration.camera_matrix, calibration.rvec, calibration.tvec, robot_world_rvec, robot_world_tvec, extra_dilate=35, ) > 0 total = 0.0 for row_idx in rows[1:]: current_mask = render_robot_mask( base_rgb.shape, q_by_row[row_idx], calibration.camera_matrix, calibration.rvec, calibration.tvec, robot_world_rvec, robot_world_tvec, extra_dilate=35, ) > 0 predicted_change = np.logical_xor(start_mask, current_mask) total += 1.0 - mask_iou(predicted_change, motion_masks[row_idx]) return total start_params = np.concatenate([DEFAULT_ROBOT_WORLD_RVEC.copy(), DEFAULT_ROBOT_WORLD_TVEC.copy()]) result = minimize(score, start_params, method="Powell", options={"maxiter": 40, "disp": False}) best_params = np.asarray(result.x, dtype=np.float64) start_mask = render_robot_mask( base_rgb.shape, q_by_row[start_row], calibration.camera_matrix, calibration.rvec, calibration.tvec, best_params[:3], best_params[3:], extra_dilate=35, ) > 0 ious: dict[int, float] = {} for row_idx in rows[1:]: current_mask = render_robot_mask( base_rgb.shape, q_by_row[row_idx], calibration.camera_matrix, calibration.rvec, calibration.tvec, best_params[:3], best_params[3:], extra_dilate=35, ) > 0 ious[row_idx] = mask_iou(np.logical_xor(start_mask, current_mask), motion_masks[row_idx]) return best_params, motion_masks, ious, float(result.fun) def build_background( calibration: SceneCalibration, rgbs: dict[int, np.ndarray], q_by_row: dict[int, np.ndarray], rows: list[int], robot_world_rvec: np.ndarray, robot_world_tvec: np.ndarray, ) -> np.ndarray: base_rgb = rgbs[rows[0]] masked_frames = [] valid_masks = [] for row_idx in rows: robot_mask = render_robot_mask( base_rgb.shape, q_by_row[row_idx], calibration.camera_matrix, calibration.rvec, calibration.tvec, robot_world_rvec, robot_world_tvec, extra_dilate=65, ) > 0 masked_frames.append(rgbs[row_idx].astype(np.float32)) valid_masks.append(~robot_mask) frame_stack = np.stack(masked_frames, axis=0) valid_stack = np.stack(valid_masks, axis=0) with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) background = np.zeros_like(base_rgb, dtype=np.float32) for channel in range(3): channel_values = np.where(valid_stack, frame_stack[..., channel], np.nan) background[..., channel] = np.nanmedian(channel_values, axis=0) hole_mask = np.isnan(background[..., 0]) background = np.nan_to_num(background, nan=0.0).astype(np.uint8) if np.any(hole_mask): background = cv2.cvtColor( cv2.inpaint(cv2.cvtColor(background, cv2.COLOR_RGB2BGR), hole_mask.astype(np.uint8) * 255, 9, cv2.INPAINT_TELEA), cv2.COLOR_BGR2RGB, ) return background def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--session-root", default="/workspace/data/teddybear_raw/session_20260327_165944_bear") parser.add_argument("--fit-rows", default="0,52,60,68,76,84,92,100,108,116,124") parser.add_argument("--output-dir", default="/workspace/kinova_scene_sim/outputs") args = parser.parse_args() session_root = Path(args.session_root) output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) sync = pd.read_csv(session_root / "sync_index.csv") fit_rows = parse_rows(args.fit_rows) calibration = load_scene_calibration(session_root, sync_row_index=0) rgbs = load_rgb_frames(session_root, sync, fit_rows) q_by_row = load_q_rows(sync, fit_rows) best_params, motion_masks, ious, fit_score = fit_robot_transform(calibration, rgbs, q_by_row, fit_rows) calibration = replace( calibration, robot_world_rvec=best_params[:3].copy(), robot_world_tvec=best_params[3:].copy(), ) background = build_background(calibration, rgbs, q_by_row, fit_rows, best_params[:3], best_params[3:]) start_row = fit_rows[0] representative_row = fit_rows[min(len(fit_rows) - 1, 4)] start_q = q_by_row[start_row] start_rgb = rgbs[start_row] start_render = render_scene(calibration, start_q, background=background.copy(), alpha=1.0) start_mask = render_robot_mask( start_rgb.shape, start_q, calibration.camera_matrix, calibration.rvec, calibration.tvec, calibration.robot_world_rvec, calibration.robot_world_tvec, ) > 0 representative_mask = render_robot_mask( start_rgb.shape, q_by_row[representative_row], calibration.camera_matrix, calibration.rvec, calibration.tvec, calibration.robot_world_rvec, calibration.robot_world_tvec, ) > 0 predicted_change = np.logical_xor(start_mask, representative_mask) motion_overlay = rgbs[representative_row].copy() motion_overlay[motion_masks[representative_row]] = np.array([0, 255, 0], dtype=np.uint8) motion_overlay[predicted_change] = np.array([255, 0, 0], dtype=np.uint8) background_path = output_dir / "scene_background_model.png" start_render_path = output_dir / "scene_start_render_fitted.png" real_vs_scene_path = output_dir / "real_vs_scene_start_fitted.png" motion_overlay_path = output_dir / "robot_motion_fit_overlay.png" scene_model_json = output_dir / "robot_scene_model.json" Image.fromarray(background).save(background_path) Image.fromarray(start_render).save(start_render_path) Image.fromarray(np.concatenate([start_rgb, start_render], axis=1)).save(real_vs_scene_path) Image.fromarray(motion_overlay).save(motion_overlay_path) payload = { "session_root": str(session_root), "fit_rows": fit_rows, "robot_world_rvec": calibration.robot_world_rvec.tolist(), "robot_world_tvec": calibration.robot_world_tvec.tolist(), "fit_score": fit_score, "motion_iou_by_row": {str(key): value for key, value in ious.items()}, "motion_iou_mean": float(np.mean(list(ious.values()))), "background_path": str(background_path), "scene_start_render_path": str(start_render_path), "real_vs_scene_path": str(real_vs_scene_path), "motion_overlay_path": str(motion_overlay_path), } scene_model_json.write_text(json.dumps(payload, indent=2)) print(f"saved scene model to {scene_model_json}") print(f"robot_world_rvec={calibration.robot_world_rvec.tolist()}") print(f"robot_world_tvec={calibration.robot_world_tvec.tolist()}") print(f"motion_iou_mean={payload['motion_iou_mean']:.4f}") if __name__ == "__main__": main()