TeddyBearKinova / bundle /sim_model_based_20260329 /code /fit_robot_scene_model.py
lsnu's picture
Upload folder using huggingface_hub
f81aa8a verified
from __future__ import annotations
import argparse
import json
import warnings
from dataclasses import replace
from pathlib import Path
import cv2
import numpy as np
import pandas as pd
from PIL import Image
from scipy.optimize import minimize
from scene_tools import (
DEFAULT_ROBOT_WORLD_RVEC,
DEFAULT_ROBOT_WORLD_TVEC,
SceneCalibration,
load_scene_calibration,
render_robot_mask,
render_scene,
)
def parse_rows(spec: str) -> list[int]:
return [int(chunk.strip()) for chunk in spec.split(",") if chunk.strip()]
def load_rgb_frames(session_root: Path, sync: pd.DataFrame, rows: list[int]) -> dict[int, np.ndarray]:
return {row_idx: np.array(Image.open(session_root / sync.iloc[row_idx]["azure_rgb_file"]).convert("RGB")) for row_idx in rows}
def load_q_rows(sync: pd.DataFrame, rows: list[int]) -> dict[int, np.ndarray]:
return {row_idx: sync.iloc[row_idx][[f"q{i}_deg" for i in range(1, 8)]].to_numpy(np.float64) for row_idx in rows}
def compute_motion_mask(base_rgb: np.ndarray, rgb: np.ndarray, roi_mask: np.ndarray, threshold: int = 18) -> np.ndarray:
diff = np.abs(rgb.astype(np.int16) - base_rgb.astype(np.int16)).mean(axis=2).astype(np.uint8)
mask = ((diff > threshold) & roi_mask).astype(np.uint8)
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((5, 5), dtype=np.uint8))
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, np.ones((11, 11), dtype=np.uint8))
return mask > 0
def mask_iou(lhs: np.ndarray, rhs: np.ndarray) -> float:
intersection = np.logical_and(lhs, rhs).sum()
union = np.logical_or(lhs, rhs).sum()
return float(intersection / union) if union else 0.0
def fit_robot_transform(
calibration: SceneCalibration, rgbs: dict[int, np.ndarray], q_by_row: dict[int, np.ndarray], rows: list[int]
) -> tuple[np.ndarray, dict[int, np.ndarray], dict[int, float], float]:
start_row = rows[0]
base_rgb = rgbs[start_row]
roi_mask = np.zeros(base_rgb.shape[:2], dtype=bool)
roi_mask[250:1079, 850:1919] = True
motion_masks = {row_idx: compute_motion_mask(base_rgb, rgbs[row_idx], roi_mask) for row_idx in rows[1:]}
def score(params: np.ndarray) -> float:
robot_world_rvec = params[:3]
robot_world_tvec = params[3:]
start_mask = render_robot_mask(
base_rgb.shape,
q_by_row[start_row],
calibration.camera_matrix,
calibration.rvec,
calibration.tvec,
robot_world_rvec,
robot_world_tvec,
extra_dilate=35,
) > 0
total = 0.0
for row_idx in rows[1:]:
current_mask = render_robot_mask(
base_rgb.shape,
q_by_row[row_idx],
calibration.camera_matrix,
calibration.rvec,
calibration.tvec,
robot_world_rvec,
robot_world_tvec,
extra_dilate=35,
) > 0
predicted_change = np.logical_xor(start_mask, current_mask)
total += 1.0 - mask_iou(predicted_change, motion_masks[row_idx])
return total
start_params = np.concatenate([DEFAULT_ROBOT_WORLD_RVEC.copy(), DEFAULT_ROBOT_WORLD_TVEC.copy()])
result = minimize(score, start_params, method="Powell", options={"maxiter": 40, "disp": False})
best_params = np.asarray(result.x, dtype=np.float64)
start_mask = render_robot_mask(
base_rgb.shape,
q_by_row[start_row],
calibration.camera_matrix,
calibration.rvec,
calibration.tvec,
best_params[:3],
best_params[3:],
extra_dilate=35,
) > 0
ious: dict[int, float] = {}
for row_idx in rows[1:]:
current_mask = render_robot_mask(
base_rgb.shape,
q_by_row[row_idx],
calibration.camera_matrix,
calibration.rvec,
calibration.tvec,
best_params[:3],
best_params[3:],
extra_dilate=35,
) > 0
ious[row_idx] = mask_iou(np.logical_xor(start_mask, current_mask), motion_masks[row_idx])
return best_params, motion_masks, ious, float(result.fun)
def build_background(
calibration: SceneCalibration,
rgbs: dict[int, np.ndarray],
q_by_row: dict[int, np.ndarray],
rows: list[int],
robot_world_rvec: np.ndarray,
robot_world_tvec: np.ndarray,
) -> np.ndarray:
base_rgb = rgbs[rows[0]]
masked_frames = []
valid_masks = []
for row_idx in rows:
robot_mask = render_robot_mask(
base_rgb.shape,
q_by_row[row_idx],
calibration.camera_matrix,
calibration.rvec,
calibration.tvec,
robot_world_rvec,
robot_world_tvec,
extra_dilate=65,
) > 0
masked_frames.append(rgbs[row_idx].astype(np.float32))
valid_masks.append(~robot_mask)
frame_stack = np.stack(masked_frames, axis=0)
valid_stack = np.stack(valid_masks, axis=0)
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
background = np.zeros_like(base_rgb, dtype=np.float32)
for channel in range(3):
channel_values = np.where(valid_stack, frame_stack[..., channel], np.nan)
background[..., channel] = np.nanmedian(channel_values, axis=0)
hole_mask = np.isnan(background[..., 0])
background = np.nan_to_num(background, nan=0.0).astype(np.uint8)
if np.any(hole_mask):
background = cv2.cvtColor(
cv2.inpaint(cv2.cvtColor(background, cv2.COLOR_RGB2BGR), hole_mask.astype(np.uint8) * 255, 9, cv2.INPAINT_TELEA),
cv2.COLOR_BGR2RGB,
)
return background
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--session-root", default="/workspace/data/teddybear_raw/session_20260327_165944_bear")
parser.add_argument("--fit-rows", default="0,52,60,68,76,84,92,100,108,116,124")
parser.add_argument("--output-dir", default="/workspace/kinova_scene_sim/outputs")
args = parser.parse_args()
session_root = Path(args.session_root)
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
sync = pd.read_csv(session_root / "sync_index.csv")
fit_rows = parse_rows(args.fit_rows)
calibration = load_scene_calibration(session_root, sync_row_index=0)
rgbs = load_rgb_frames(session_root, sync, fit_rows)
q_by_row = load_q_rows(sync, fit_rows)
best_params, motion_masks, ious, fit_score = fit_robot_transform(calibration, rgbs, q_by_row, fit_rows)
calibration = replace(
calibration,
robot_world_rvec=best_params[:3].copy(),
robot_world_tvec=best_params[3:].copy(),
)
background = build_background(calibration, rgbs, q_by_row, fit_rows, best_params[:3], best_params[3:])
start_row = fit_rows[0]
representative_row = fit_rows[min(len(fit_rows) - 1, 4)]
start_q = q_by_row[start_row]
start_rgb = rgbs[start_row]
start_render = render_scene(calibration, start_q, background=background.copy(), alpha=1.0)
start_mask = render_robot_mask(
start_rgb.shape,
start_q,
calibration.camera_matrix,
calibration.rvec,
calibration.tvec,
calibration.robot_world_rvec,
calibration.robot_world_tvec,
) > 0
representative_mask = render_robot_mask(
start_rgb.shape,
q_by_row[representative_row],
calibration.camera_matrix,
calibration.rvec,
calibration.tvec,
calibration.robot_world_rvec,
calibration.robot_world_tvec,
) > 0
predicted_change = np.logical_xor(start_mask, representative_mask)
motion_overlay = rgbs[representative_row].copy()
motion_overlay[motion_masks[representative_row]] = np.array([0, 255, 0], dtype=np.uint8)
motion_overlay[predicted_change] = np.array([255, 0, 0], dtype=np.uint8)
background_path = output_dir / "scene_background_model.png"
start_render_path = output_dir / "scene_start_render_fitted.png"
real_vs_scene_path = output_dir / "real_vs_scene_start_fitted.png"
motion_overlay_path = output_dir / "robot_motion_fit_overlay.png"
scene_model_json = output_dir / "robot_scene_model.json"
Image.fromarray(background).save(background_path)
Image.fromarray(start_render).save(start_render_path)
Image.fromarray(np.concatenate([start_rgb, start_render], axis=1)).save(real_vs_scene_path)
Image.fromarray(motion_overlay).save(motion_overlay_path)
payload = {
"session_root": str(session_root),
"fit_rows": fit_rows,
"robot_world_rvec": calibration.robot_world_rvec.tolist(),
"robot_world_tvec": calibration.robot_world_tvec.tolist(),
"fit_score": fit_score,
"motion_iou_by_row": {str(key): value for key, value in ious.items()},
"motion_iou_mean": float(np.mean(list(ious.values()))),
"background_path": str(background_path),
"scene_start_render_path": str(start_render_path),
"real_vs_scene_path": str(real_vs_scene_path),
"motion_overlay_path": str(motion_overlay_path),
}
scene_model_json.write_text(json.dumps(payload, indent=2))
print(f"saved scene model to {scene_model_json}")
print(f"robot_world_rvec={calibration.robot_world_rvec.tolist()}")
print(f"robot_world_tvec={calibration.robot_world_tvec.tolist()}")
print(f"motion_iou_mean={payload['motion_iou_mean']:.4f}")
if __name__ == "__main__":
main()