| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import warnings |
| from dataclasses import replace |
| from pathlib import Path |
|
|
| import cv2 |
| import numpy as np |
| import pandas as pd |
| from PIL import Image |
| from scipy.optimize import minimize |
|
|
| from scene_tools import ( |
| DEFAULT_ROBOT_WORLD_RVEC, |
| DEFAULT_ROBOT_WORLD_TVEC, |
| SceneCalibration, |
| load_scene_calibration, |
| render_robot_mask, |
| render_scene, |
| ) |
|
|
|
|
| def parse_rows(spec: str) -> list[int]: |
| return [int(chunk.strip()) for chunk in spec.split(",") if chunk.strip()] |
|
|
|
|
| def load_rgb_frames(session_root: Path, sync: pd.DataFrame, rows: list[int]) -> dict[int, np.ndarray]: |
| return {row_idx: np.array(Image.open(session_root / sync.iloc[row_idx]["azure_rgb_file"]).convert("RGB")) for row_idx in rows} |
|
|
|
|
| def load_q_rows(sync: pd.DataFrame, rows: list[int]) -> dict[int, np.ndarray]: |
| return {row_idx: sync.iloc[row_idx][[f"q{i}_deg" for i in range(1, 8)]].to_numpy(np.float64) for row_idx in rows} |
|
|
|
|
| def compute_motion_mask(base_rgb: np.ndarray, rgb: np.ndarray, roi_mask: np.ndarray, threshold: int = 18) -> np.ndarray: |
| diff = np.abs(rgb.astype(np.int16) - base_rgb.astype(np.int16)).mean(axis=2).astype(np.uint8) |
| mask = ((diff > threshold) & roi_mask).astype(np.uint8) |
| mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((5, 5), dtype=np.uint8)) |
| mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, np.ones((11, 11), dtype=np.uint8)) |
| return mask > 0 |
|
|
|
|
| def mask_iou(lhs: np.ndarray, rhs: np.ndarray) -> float: |
| intersection = np.logical_and(lhs, rhs).sum() |
| union = np.logical_or(lhs, rhs).sum() |
| return float(intersection / union) if union else 0.0 |
|
|
|
|
| def fit_robot_transform( |
| calibration: SceneCalibration, rgbs: dict[int, np.ndarray], q_by_row: dict[int, np.ndarray], rows: list[int] |
| ) -> tuple[np.ndarray, dict[int, np.ndarray], dict[int, float], float]: |
| start_row = rows[0] |
| base_rgb = rgbs[start_row] |
| roi_mask = np.zeros(base_rgb.shape[:2], dtype=bool) |
| roi_mask[250:1079, 850:1919] = True |
| motion_masks = {row_idx: compute_motion_mask(base_rgb, rgbs[row_idx], roi_mask) for row_idx in rows[1:]} |
|
|
| def score(params: np.ndarray) -> float: |
| robot_world_rvec = params[:3] |
| robot_world_tvec = params[3:] |
| start_mask = render_robot_mask( |
| base_rgb.shape, |
| q_by_row[start_row], |
| calibration.camera_matrix, |
| calibration.rvec, |
| calibration.tvec, |
| robot_world_rvec, |
| robot_world_tvec, |
| extra_dilate=35, |
| ) > 0 |
| total = 0.0 |
| for row_idx in rows[1:]: |
| current_mask = render_robot_mask( |
| base_rgb.shape, |
| q_by_row[row_idx], |
| calibration.camera_matrix, |
| calibration.rvec, |
| calibration.tvec, |
| robot_world_rvec, |
| robot_world_tvec, |
| extra_dilate=35, |
| ) > 0 |
| predicted_change = np.logical_xor(start_mask, current_mask) |
| total += 1.0 - mask_iou(predicted_change, motion_masks[row_idx]) |
| return total |
|
|
| start_params = np.concatenate([DEFAULT_ROBOT_WORLD_RVEC.copy(), DEFAULT_ROBOT_WORLD_TVEC.copy()]) |
| result = minimize(score, start_params, method="Powell", options={"maxiter": 40, "disp": False}) |
| best_params = np.asarray(result.x, dtype=np.float64) |
|
|
| start_mask = render_robot_mask( |
| base_rgb.shape, |
| q_by_row[start_row], |
| calibration.camera_matrix, |
| calibration.rvec, |
| calibration.tvec, |
| best_params[:3], |
| best_params[3:], |
| extra_dilate=35, |
| ) > 0 |
| ious: dict[int, float] = {} |
| for row_idx in rows[1:]: |
| current_mask = render_robot_mask( |
| base_rgb.shape, |
| q_by_row[row_idx], |
| calibration.camera_matrix, |
| calibration.rvec, |
| calibration.tvec, |
| best_params[:3], |
| best_params[3:], |
| extra_dilate=35, |
| ) > 0 |
| ious[row_idx] = mask_iou(np.logical_xor(start_mask, current_mask), motion_masks[row_idx]) |
| return best_params, motion_masks, ious, float(result.fun) |
|
|
|
|
| def build_background( |
| calibration: SceneCalibration, |
| rgbs: dict[int, np.ndarray], |
| q_by_row: dict[int, np.ndarray], |
| rows: list[int], |
| robot_world_rvec: np.ndarray, |
| robot_world_tvec: np.ndarray, |
| ) -> np.ndarray: |
| base_rgb = rgbs[rows[0]] |
| masked_frames = [] |
| valid_masks = [] |
| for row_idx in rows: |
| robot_mask = render_robot_mask( |
| base_rgb.shape, |
| q_by_row[row_idx], |
| calibration.camera_matrix, |
| calibration.rvec, |
| calibration.tvec, |
| robot_world_rvec, |
| robot_world_tvec, |
| extra_dilate=65, |
| ) > 0 |
| masked_frames.append(rgbs[row_idx].astype(np.float32)) |
| valid_masks.append(~robot_mask) |
|
|
| frame_stack = np.stack(masked_frames, axis=0) |
| valid_stack = np.stack(valid_masks, axis=0) |
| with warnings.catch_warnings(): |
| warnings.simplefilter("ignore", category=RuntimeWarning) |
| background = np.zeros_like(base_rgb, dtype=np.float32) |
| for channel in range(3): |
| channel_values = np.where(valid_stack, frame_stack[..., channel], np.nan) |
| background[..., channel] = np.nanmedian(channel_values, axis=0) |
| hole_mask = np.isnan(background[..., 0]) |
| background = np.nan_to_num(background, nan=0.0).astype(np.uint8) |
| if np.any(hole_mask): |
| background = cv2.cvtColor( |
| cv2.inpaint(cv2.cvtColor(background, cv2.COLOR_RGB2BGR), hole_mask.astype(np.uint8) * 255, 9, cv2.INPAINT_TELEA), |
| cv2.COLOR_BGR2RGB, |
| ) |
| return background |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--session-root", default="/workspace/data/teddybear_raw/session_20260327_165944_bear") |
| parser.add_argument("--fit-rows", default="0,52,60,68,76,84,92,100,108,116,124") |
| parser.add_argument("--output-dir", default="/workspace/kinova_scene_sim/outputs") |
| args = parser.parse_args() |
|
|
| session_root = Path(args.session_root) |
| output_dir = Path(args.output_dir) |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| sync = pd.read_csv(session_root / "sync_index.csv") |
| fit_rows = parse_rows(args.fit_rows) |
| calibration = load_scene_calibration(session_root, sync_row_index=0) |
| rgbs = load_rgb_frames(session_root, sync, fit_rows) |
| q_by_row = load_q_rows(sync, fit_rows) |
|
|
| best_params, motion_masks, ious, fit_score = fit_robot_transform(calibration, rgbs, q_by_row, fit_rows) |
| calibration = replace( |
| calibration, |
| robot_world_rvec=best_params[:3].copy(), |
| robot_world_tvec=best_params[3:].copy(), |
| ) |
| background = build_background(calibration, rgbs, q_by_row, fit_rows, best_params[:3], best_params[3:]) |
|
|
| start_row = fit_rows[0] |
| representative_row = fit_rows[min(len(fit_rows) - 1, 4)] |
| start_q = q_by_row[start_row] |
| start_rgb = rgbs[start_row] |
| start_render = render_scene(calibration, start_q, background=background.copy(), alpha=1.0) |
|
|
| start_mask = render_robot_mask( |
| start_rgb.shape, |
| start_q, |
| calibration.camera_matrix, |
| calibration.rvec, |
| calibration.tvec, |
| calibration.robot_world_rvec, |
| calibration.robot_world_tvec, |
| ) > 0 |
| representative_mask = render_robot_mask( |
| start_rgb.shape, |
| q_by_row[representative_row], |
| calibration.camera_matrix, |
| calibration.rvec, |
| calibration.tvec, |
| calibration.robot_world_rvec, |
| calibration.robot_world_tvec, |
| ) > 0 |
| predicted_change = np.logical_xor(start_mask, representative_mask) |
| motion_overlay = rgbs[representative_row].copy() |
| motion_overlay[motion_masks[representative_row]] = np.array([0, 255, 0], dtype=np.uint8) |
| motion_overlay[predicted_change] = np.array([255, 0, 0], dtype=np.uint8) |
|
|
| background_path = output_dir / "scene_background_model.png" |
| start_render_path = output_dir / "scene_start_render_fitted.png" |
| real_vs_scene_path = output_dir / "real_vs_scene_start_fitted.png" |
| motion_overlay_path = output_dir / "robot_motion_fit_overlay.png" |
| scene_model_json = output_dir / "robot_scene_model.json" |
|
|
| Image.fromarray(background).save(background_path) |
| Image.fromarray(start_render).save(start_render_path) |
| Image.fromarray(np.concatenate([start_rgb, start_render], axis=1)).save(real_vs_scene_path) |
| Image.fromarray(motion_overlay).save(motion_overlay_path) |
|
|
| payload = { |
| "session_root": str(session_root), |
| "fit_rows": fit_rows, |
| "robot_world_rvec": calibration.robot_world_rvec.tolist(), |
| "robot_world_tvec": calibration.robot_world_tvec.tolist(), |
| "fit_score": fit_score, |
| "motion_iou_by_row": {str(key): value for key, value in ious.items()}, |
| "motion_iou_mean": float(np.mean(list(ious.values()))), |
| "background_path": str(background_path), |
| "scene_start_render_path": str(start_render_path), |
| "real_vs_scene_path": str(real_vs_scene_path), |
| "motion_overlay_path": str(motion_overlay_path), |
| } |
| scene_model_json.write_text(json.dumps(payload, indent=2)) |
|
|
| print(f"saved scene model to {scene_model_json}") |
| print(f"robot_world_rvec={calibration.robot_world_rvec.tolist()}") |
| print(f"robot_world_tvec={calibration.robot_world_tvec.tolist()}") |
| print(f"motion_iou_mean={payload['motion_iou_mean']:.4f}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|