# Project EmbodiedGen # # Copyright (c) 2025 Horizon Robotics. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. See the License for the specific language governing # permissions and limitations under the License. from __future__ import annotations import argparse import logging import math import shutil import tempfile from collections.abc import Callable from pathlib import Path import bpy import cv2 import numpy as np from mathutils import Euler, Matrix, Vector logger = logging.getLogger(__name__) def build_arg_parser() -> argparse.ArgumentParser: """Build the CLI parser for USD rendering.""" parser = argparse.ArgumentParser() parser.add_argument("--usd_path", required=True, type=Path) parser.add_argument("--glb_path", type=str, default="") parser.add_argument( "--glb_xyz", type=float, nargs=3, metavar=("X", "Y", "Z"), ) parser.add_argument( "--glb_rotation_deg", type=float, nargs=3, metavar=("RX", "RY", "RZ"), ) parser.add_argument("--output_dir", required=True, type=Path) parser.add_argument( "--render_passes", nargs="+", choices=("rgb", "depth", "normal", "mesh", "instance_seg", "flow"), default=("rgb",), ) parser.add_argument( "--depth_mode", choices=("normalized", "metric"), default="normalized", ) parser.add_argument( "--resolution", type=int, nargs=2, metavar=("WIDTH", "HEIGHT"), default=(1920, 1080), ) parser.add_argument("--samples", type=int, default=1024) parser.add_argument( "--camera_xyz", type=float, nargs=3, metavar=("X", "Y", "Z"), required=True, ) parser.add_argument( "--camera_rotation_deg", type=float, nargs=3, metavar=("RX", "RY", "RZ"), required=True, ) parser.add_argument( "--flow_camera_xyz", type=float, nargs=3, metavar=("X", "Y", "Z"), ) parser.add_argument( "--flow_camera_rotation_deg", type=float, nargs=3, metavar=("RX", "RY", "RZ"), ) parser.add_argument("--focal_length_mm", type=float, default=20.0) parser.add_argument("--exposure", type=float, default=2.2) parser.add_argument("--world_strength", type=float, default=8.0) parser.add_argument("--fill_light_energy", type=float, default=14000.0) return parser def _parse_args() -> argparse.Namespace: return build_arg_parser().parse_args() class RenderUsd: """USD renderer for RGB, depth, normal, mesh, segmentation, and flow.""" def __init__( self, *, usd_path: Path, glb_path: Path | str | None, glb_xyz: tuple[float, float, float] | list[float] | None, glb_rotation_deg: tuple[float, float, float] | list[float] | None, output_dir: Path, render_passes: tuple[str, ...] | list[str], depth_mode: str, resolution: tuple[int, int] | list[int], samples: int, camera_xyz: tuple[float, float, float] | list[float], camera_rotation_deg: tuple[float, float, float] | list[float], flow_camera_xyz: tuple[float, float, float] | list[float] | None, flow_camera_rotation_deg: ( tuple[float, float, float] | list[float] | None ), focal_length_mm: float, exposure: float, world_strength: float, fill_light_energy: float, ) -> None: """Initialize renderer configuration independent of CLI parsing.""" self.usd_path = usd_path self.glb_path = self.normalize_optional_path(glb_path) self.glb_xyz = tuple(glb_xyz) if glb_xyz is not None else None self.glb_rotation_deg = ( tuple(glb_rotation_deg) if glb_rotation_deg is not None else None ) self.output_dir = output_dir self.render_passes = tuple(render_passes) self.depth_mode = depth_mode self.resolution = tuple(resolution) self.samples = samples self.camera_xyz = tuple(camera_xyz) self.camera_rotation_deg = tuple(camera_rotation_deg) self.flow_camera_xyz = ( tuple(flow_camera_xyz) if flow_camera_xyz is not None else None ) self.flow_camera_rotation_deg = ( tuple(flow_camera_rotation_deg) if flow_camera_rotation_deg is not None else None ) self.focal_length_mm = focal_length_mm self.exposure = exposure self.world_strength = world_strength self.fill_light_energy = fill_light_energy self.temp_dir: Path | None = None @classmethod def from_args(cls, args: argparse.Namespace) -> RenderUsd: """Build a renderer from parsed CLI arguments.""" return cls( usd_path=args.usd_path, glb_path=args.glb_path, glb_xyz=args.glb_xyz, glb_rotation_deg=args.glb_rotation_deg, output_dir=args.output_dir, render_passes=args.render_passes, depth_mode=args.depth_mode, resolution=args.resolution, samples=args.samples, camera_xyz=args.camera_xyz, camera_rotation_deg=args.camera_rotation_deg, flow_camera_xyz=args.flow_camera_xyz, flow_camera_rotation_deg=args.flow_camera_rotation_deg, focal_length_mm=args.focal_length_mm, exposure=args.exposure, world_strength=args.world_strength, fill_light_energy=args.fill_light_energy, ) @property def scene(self) -> bpy.types.Scene: return bpy.context.scene def normalize_optional_path( self, path_value: Path | str | None ) -> Path | None: """Normalize an optional CLI path, treating empty strings as missing.""" if path_value is None: return None if isinstance(path_value, Path): return path_value normalized = path_value.strip() if not normalized: return None return Path(normalized) def build_output_path(self, filename: str) -> Path: """Build a normalized output path under the render directory.""" return self.output_dir / filename def build_temp_path(self, filename: str) -> Path: """Build a temporary path outside the final output directory.""" if self.temp_dir is None: raise RuntimeError( "Temporary render directory is not initialized." ) return self.temp_dir / filename def get_rgb_output_path(self) -> Path: return self.build_output_path("render_rgb.png") def get_depth_vis_output_path(self, output_path: Path) -> Path: del output_path return self.build_output_path("render_depth.png") def get_normal_output_path(self, output_path: Path) -> Path: del output_path return self.build_output_path("render_normal.png") def get_mesh_output_path(self, output_path: Path) -> Path: del output_path return self.build_output_path("render_mesh.png") def get_instance_seg_vis_output_path(self, output_path: Path) -> Path: del output_path return self.build_output_path("render_instance_seg_vis.png") def get_instance_seg_temp_path(self, output_path: Path) -> Path: del output_path return self.build_temp_path("render_instance_seg_raw_0001.exr") def get_flow_output_path(self, output_path: Path) -> Path: del output_path return self.build_output_path("render_flow.npy") def get_flow_valid_output_path(self, output_path: Path) -> Path: del output_path return self.build_output_path("render_flow_valid.npy") def get_flow_vis_output_path(self, output_path: Path) -> Path: del output_path return self.build_output_path("render_flow_vis.png") def get_flow_depth_temp_path(self, output_path: Path) -> Path: del output_path return self.build_temp_path("render_flow_depth_raw_0001.exr") def get_depth_gray_temp_path(self, output_path: Path) -> Path: del output_path return self.build_temp_path("render_depth_gray_0001.png") def get_composite_output_path( self, render_passes: list[str] | tuple[str, ...] ) -> Path: pass_names = "_".join(render_passes) return self.build_output_path(f"render_composite_{pass_names}.png") def build_occurrence_output_path( self, output_path: Path, occurrence_index: int ) -> Path: """Build an occurrence-specific path for repeated preview outputs.""" if occurrence_index < 1: raise ValueError("occurrence_index must be greater than 0.") if occurrence_index == 1: return output_path return output_path.with_name( f"{output_path.stem}_{occurrence_index}{output_path.suffix}" ) def iter_render_pass_occurrences(self) -> list[tuple[str, int]]: """Return requested render passes with 1-based occurrence indices.""" occurrence_counts: dict[str, int] = {} render_pass_occurrences: list[tuple[str, int]] = [] for render_pass_name in self.render_passes: occurrence_index = occurrence_counts.get(render_pass_name, 0) + 1 occurrence_counts[render_pass_name] = occurrence_index render_pass_occurrences.append( (render_pass_name, occurrence_index) ) return render_pass_occurrences def get_temp_output_slot_prefix(self, temp_output_path: Path) -> str: """Return the compositor slot prefix without the frame suffix.""" stem_parts = temp_output_path.stem.rsplit("_", maxsplit=1) if len(stem_parts) != 2 or not stem_parts[1].isdigit(): raise ValueError( f"Unexpected temporary output filename: {temp_output_path.name}" ) return f"{stem_parts[0]}_" def get_mesh_objects(self) -> list[bpy.types.Object]: return [obj for obj in self.scene.objects if obj.type == "MESH"] def clear_scene(self) -> None: bpy.ops.wm.read_factory_settings(use_empty=True) def import_usd(self) -> None: if not self.usd_path.exists(): raise FileNotFoundError(f"USD file not found: {self.usd_path}") bpy.ops.wm.usd_import(filepath=str(self.usd_path)) def validate_glb_args(self) -> None: """Normalize optional GLB arguments and ensure all-or-none usage.""" has_glb_path = self.glb_path is not None has_glb_xyz = self.glb_xyz is not None has_glb_rotation = self.glb_rotation_deg is not None if len({has_glb_path, has_glb_xyz, has_glb_rotation}) != 1: raise ValueError( "--glb_path, --glb_xyz, and --glb_rotation_deg must be " "provided together." ) if not has_glb_path: return if not self.glb_path.exists(): raise FileNotFoundError(f"GLB file not found: {self.glb_path}") if self.glb_path.suffix.lower() != ".glb": raise ValueError( f"Expected a .glb asset, but got: {self.glb_path}" ) def enable_gltf_importer(self) -> None: """Ensure Blender's glTF importer add-on is available.""" addon_name = "io_scene_gltf2" if addon_name in bpy.context.preferences.addons: return try: bpy.ops.preferences.addon_enable(module=addon_name) except Exception as exc: raise RuntimeError( "Failed to enable Blender glTF importer add-on." ) from exc def import_glb_asset(self) -> list[bpy.types.Object]: """Import the optional GLB asset and return created objects.""" if self.glb_path is None: return [] self.enable_gltf_importer() existing_object_ids = {obj.as_pointer() for obj in bpy.data.objects} result = bpy.ops.import_scene.gltf(filepath=str(self.glb_path)) if "FINISHED" not in result: raise RuntimeError(f"Failed to import GLB asset: {self.glb_path}") imported_objects = [ obj for obj in bpy.data.objects if obj.as_pointer() not in existing_object_ids ] if not imported_objects: raise ValueError( f"No objects were imported from GLB asset: {self.glb_path}" ) return imported_objects def get_imported_root_objects( self, imported_objects: list[bpy.types.Object] ) -> list[bpy.types.Object]: """Return top-level imported objects so transforms apply as one asset.""" imported_ids = {obj.as_pointer() for obj in imported_objects} root_objects = [ obj for obj in imported_objects if obj.parent is None or obj.parent.as_pointer() not in imported_ids ] return root_objects or imported_objects def place_glb_asset( self, imported_objects: list[bpy.types.Object] ) -> None: """Place the imported GLB asset using the requested world transform.""" if not imported_objects: return if self.glb_xyz is None or self.glb_rotation_deg is None: raise ValueError("GLB transform arguments are not initialized.") asset_transform = self.build_camera_matrix_world( self.glb_xyz, self.glb_rotation_deg, ) for obj in self.get_imported_root_objects(imported_objects): obj.matrix_world = asset_transform @ obj.matrix_world.copy() bpy.context.view_layer.update() def get_scene_bbox(self) -> tuple[Vector, Vector]: """Compute the world-space bounding box across all mesh objects.""" mesh_objects = self.get_mesh_objects() if not mesh_objects: raise ValueError("No mesh objects found after USD import.") points: list[Vector] = [] for obj in mesh_objects: points.extend( obj.matrix_world @ Vector(corner) for corner in obj.bound_box ) min_corner = Vector( ( min(p.x for p in points), min(p.y for p in points), min(p.z for p in points), ) ) max_corner = Vector( ( max(p.x for p in points), max(p.y for p in points), max(p.z for p in points), ) ) return min_corner, max_corner def create_camera(self) -> bpy.types.Object: """Create and configure the primary render camera.""" if self.camera_xyz is None: raise ValueError("--camera_xyz is required.") location = Vector(tuple(self.camera_xyz)) rotation_rad = self.get_rotation_radians(self.camera_rotation_deg) bpy.ops.object.camera_add(location=location, rotation=rotation_rad) camera = bpy.context.object camera.rotation_mode = "XYZ" camera.data.lens = self.focal_length_mm camera.data.clip_start = 0.01 camera.data.clip_end = 1000.0 self.scene.camera = camera return camera def add_light_rig( self, diagonal: float, center: Vector, top_z: float, *, area_energy: float, sun_energy: float, prefix: str, ) -> None: bpy.ops.object.light_add( type="AREA", location=(center.x, center.y, top_z + 0.5 * diagonal), ) area = bpy.context.object area.name = f"{prefix}Area" area.data.energy = area_energy area.data.shape = "DISK" area.data.size = max(diagonal, 2.0) bpy.ops.object.light_add( type="SUN", location=( center.x + diagonal, center.y - diagonal, top_z + diagonal, ), ) sun = bpy.context.object sun.name = f"{prefix}Sun" sun.data.energy = sun_energy def add_fill_light( self, diagonal: float, center: Vector, top_z: float, energy: float, ) -> None: if energy <= 0.0: return bpy.ops.object.light_add( type="AREA", location=(center.x, center.y, top_z + 0.35 * diagonal), rotation=(0.0, 0.0, 0.0), ) area = bpy.context.object area.name = "GlobalFillArea" area.data.energy = energy area.data.shape = "DISK" area.data.size = max(diagonal * 0.9, 3.0) def ensure_lighting( self, diagonal: float, center: Vector, top_z: float ) -> None: if any(obj.type == "LIGHT" for obj in self.scene.objects): return self.add_light_rig( diagonal, center, top_z, area_energy=5000.0, sun_energy=1.5, prefix="Fallback", ) def set_world_strength(self, strength: float) -> None: world = self.scene.world if world is None: return if not world.use_nodes: world.use_nodes = True tree = world.node_tree background_nodes = [ node for node in tree.nodes if node.type == "BACKGROUND" ] if not background_nodes: background = tree.nodes.new(type="ShaderNodeBackground") output = next( (node for node in tree.nodes if node.type == "OUTPUT_WORLD"), None, ) if output is None: output = tree.nodes.new(type="ShaderNodeOutputWorld") tree.links.new( background.outputs["Background"], output.inputs["Surface"] ) background_nodes = [background] for background in background_nodes: background.inputs["Strength"].default_value = strength def ensure_world(self) -> bool: """Ensure the scene has a world shader and return whether it was created.""" if self.scene.world is not None: self.set_world_strength(self.world_strength) return False world = bpy.data.worlds.new(name="RenderWorld") world.use_nodes = True tree = world.node_tree tree.nodes.clear() output = tree.nodes.new(type="ShaderNodeOutputWorld") background = tree.nodes.new(type="ShaderNodeBackground") sky = tree.nodes.new(type="ShaderNodeTexSky") background.inputs["Strength"].default_value = self.world_strength tree.links.new(sky.outputs["Color"], background.inputs["Color"]) tree.links.new( background.outputs["Background"], output.inputs["Surface"] ) self.scene.world = world return True def configure_cycles(self) -> None: self.scene.render.engine = "CYCLES" self.scene.cycles.device = "GPU" self.scene.cycles.samples = self.samples self.scene.render.resolution_x = self.resolution[0] self.scene.render.resolution_y = self.resolution[1] self.scene.render.image_settings.file_format = "PNG" self.scene.render.film_transparent = False prefs = bpy.context.preferences.addons["cycles"].preferences prefs.compute_device_type = "CUDA" prefs.get_devices() cuda_devices = [ device for device in prefs.devices if device.type == "CUDA" ] if not cuda_devices: raise RuntimeError("No CUDA device found in Blender Cycles.") for device in prefs.devices: device.use = device.type == "CUDA" def configure_color_management(self) -> None: self.scene.view_settings.exposure = self.exposure def snapshot_render_state( self, view_layer: bpy.types.ViewLayer, *, include_filepath: bool = False, include_material_override: bool = False, include_use_pass_z: bool = False, include_use_pass_object_index: bool = False, ) -> dict[str, object]: """Capture the render state that temporary passes need to restore.""" state: dict[str, object] = { "film_transparent": self.scene.render.film_transparent, "view_transform": self.scene.view_settings.view_transform, "look": self.scene.view_settings.look, "exposure": self.scene.view_settings.exposure, "gamma": self.scene.view_settings.gamma, "file_format": self.scene.render.image_settings.file_format, "color_mode": self.scene.render.image_settings.color_mode, "color_depth": self.scene.render.image_settings.color_depth, "use_nodes": self.scene.use_nodes, "samples": self.scene.cycles.samples, } if include_filepath: state["filepath"] = self.scene.render.filepath if include_material_override: state["material_override"] = view_layer.material_override if include_use_pass_z: state["use_pass_z"] = view_layer.use_pass_z if include_use_pass_object_index: state["use_pass_object_index"] = view_layer.use_pass_object_index return state def restore_render_state( self, state: dict[str, object], view_layer: bpy.types.ViewLayer ) -> None: """Restore a render state captured by ``snapshot_render_state``.""" self.scene.render.film_transparent = state["film_transparent"] self.scene.view_settings.view_transform = state["view_transform"] self.scene.view_settings.look = state["look"] self.scene.view_settings.exposure = state["exposure"] self.scene.view_settings.gamma = state["gamma"] self.scene.render.image_settings.file_format = state["file_format"] self.scene.render.image_settings.color_mode = state["color_mode"] self.scene.render.image_settings.color_depth = state["color_depth"] self.scene.use_nodes = state["use_nodes"] self.scene.cycles.samples = state["samples"] if "filepath" in state: self.scene.render.filepath = state["filepath"] if "material_override" in state: view_layer.material_override = state["material_override"] if "use_pass_z" in state: view_layer.use_pass_z = state["use_pass_z"] if "use_pass_object_index" in state: view_layer.use_pass_object_index = state["use_pass_object_index"] def apply_raw_preview_settings( self, *, use_nodes: bool, samples: int, color_mode: str, color_depth: str, ) -> None: """Apply the shared render settings for auxiliary preview passes.""" self.scene.render.film_transparent = True self.scene.view_settings.view_transform = "Raw" self.scene.view_settings.look = "None" self.scene.view_settings.exposure = 0.0 self.scene.view_settings.gamma = 1.0 self.scene.use_nodes = use_nodes self.scene.cycles.samples = samples self.scene.render.image_settings.file_format = "PNG" self.scene.render.image_settings.color_mode = color_mode self.scene.render.image_settings.color_depth = color_depth def clear_compositor_tree(self) -> bpy.types.NodeTree: """Reset the compositor tree so each pass starts from a clean slate.""" self.scene.use_nodes = True tree = self.scene.node_tree tree.nodes.clear() return tree def remove_render_nodes(self, created_nodes: list[bpy.types.Node]) -> None: """Remove compositor nodes created for a temporary render pass.""" if not created_nodes: return node_tree = self.scene.node_tree if node_tree is None: return for node in created_nodes: if node.name in node_tree.nodes: node_tree.nodes.remove(node) def render_material_override_pass( self, preview_output_path: Path, material_factory: Callable[[], bpy.types.Material], *, color_mode: str, ) -> None: """Render a pass with a temporary material override.""" preview_output_path.parent.mkdir(parents=True, exist_ok=True) view_layer = self.scene.view_layers["ViewLayer"] state = self.snapshot_render_state( view_layer, include_filepath=True, include_material_override=True, ) material = material_factory() try: self.apply_raw_preview_settings( use_nodes=False, samples=min(int(state["samples"]), 64), color_mode=color_mode, color_depth="8", ) self.scene.render.filepath = str(preview_output_path) view_layer.material_override = material bpy.ops.render.render(write_still=True) finally: self.restore_render_state(state, view_layer) bpy.data.materials.remove(material, do_unlink=True) def render_temp_output_pass( self, output_path: Path, temp_output_path: Path, *, add_output_node: Callable[ [Path], tuple[bpy.types.NodeTree, list[bpy.types.Node]] ], load_temp_output: Callable[[Path], np.ndarray], finalize_output: Callable[[np.ndarray], None], color_mode: str, color_depth: str, enable_depth_pass: bool = False, enable_object_index_pass: bool = False, ) -> None: """Render a temporary compositor output and finalize it.""" output_path.parent.mkdir(parents=True, exist_ok=True) view_layer = self.scene.view_layers["ViewLayer"] state = self.snapshot_render_state( view_layer, include_use_pass_z=enable_depth_pass, include_use_pass_object_index=enable_object_index_pass, ) created_nodes: list[bpy.types.Node] = [] try: if temp_output_path.exists(): temp_output_path.unlink() self.apply_raw_preview_settings( use_nodes=True, samples=1, color_mode=color_mode, color_depth=color_depth, ) if enable_depth_pass: view_layer.use_pass_z = True if enable_object_index_pass: view_layer.use_pass_object_index = True self.clear_compositor_tree() _, created_nodes = add_output_node(output_path) bpy.ops.render.render(write_still=False) finalize_output(load_temp_output(temp_output_path)) finally: self.remove_render_nodes(created_nodes) if temp_output_path.exists(): temp_output_path.unlink() self.restore_render_state(state, view_layer) def get_rotation_radians( self, rotation_deg: tuple[float, float, float] | list[float] ) -> tuple[float, float, float]: return tuple(math.radians(angle_deg) for angle_deg in rotation_deg) def validate_flow_args(self) -> None: """Normalize optional flow-camera arguments and fill defaults.""" has_flow_xyz = self.flow_camera_xyz is not None has_flow_rotation = self.flow_camera_rotation_deg is not None if has_flow_xyz != has_flow_rotation: raise ValueError( "--flow_camera_xyz and --flow_camera_rotation_deg must be " "provided together." ) if not has_flow_xyz: xyz = list(self.camera_xyz) xyz[0] += 0.5 self.flow_camera_xyz = tuple(xyz) self.flow_camera_rotation_deg = tuple(self.camera_rotation_deg) def build_depth_preview_node( self, tree: bpy.types.NodeTree, render_layers: bpy.types.CompositorNodeRLayers, camera: bpy.types.Camera, depth_mode: str, ) -> bpy.types.Node: """Build the compositor node that converts raw depth to a previewable map.""" if depth_mode == "normalized": normalize = tree.nodes.new(type="CompositorNodeNormalize") tree.links.new(render_layers.outputs["Depth"], normalize.inputs[0]) return normalize if depth_mode != "metric": raise ValueError(f"Unsupported depth mode: {depth_mode}") depth_map = tree.nodes.new(type="CompositorNodeMapRange") depth_map.inputs["From Min"].default_value = camera.clip_start depth_map.inputs["From Max"].default_value = camera.clip_end depth_map.inputs["To Min"].default_value = 0.0 depth_map.inputs["To Max"].default_value = 1.0 depth_map.use_clamp = True tree.links.new(render_layers.outputs["Depth"], depth_map.inputs[0]) return depth_map def build_depth_vis_output( self, tree: bpy.types.NodeTree, depth_preview_node: bpy.types.Node, output_path: Path, ) -> Path: temp_output_path = self.get_depth_gray_temp_path(output_path) output_node = tree.nodes.new(type="CompositorNodeOutputFile") output_node.base_path = str(temp_output_path.parent) output_node.file_slots[0].path = self.get_temp_output_slot_prefix( temp_output_path ) output_node.format.file_format = "PNG" output_node.format.color_mode = "BW" output_node.format.color_depth = "8" tree.links.new(depth_preview_node.outputs[0], output_node.inputs[0]) return temp_output_path def configure_auxiliary_outputs( self, output_path: Path, render_passes: tuple[str, ...] | list[str], depth_mode: str, ) -> list[tuple[Path, Path]]: """Configure compositor outputs needed during the base render.""" view_layer = self.scene.view_layers["ViewLayer"] if "depth" in render_passes: view_layer.use_pass_z = True if "depth" not in render_passes: return [] tree = self.clear_compositor_tree() render_layers = tree.nodes.new(type="CompositorNodeRLayers") temp_outputs: list[tuple[Path, Path]] = [] depth_preview_node = self.build_depth_preview_node( tree, render_layers, self.scene.camera.data, depth_mode, ) temp_path = self.build_depth_vis_output( tree=tree, depth_preview_node=depth_preview_node, output_path=output_path, ) temp_outputs.append( (temp_path, self.get_depth_vis_output_path(output_path)) ) return temp_outputs def finalize_depth_output( self, temp_path: Path, output_path: Path ) -> None: """Convert the grayscale depth temp image into the final colored preview.""" if output_path.exists(): output_path.unlink() if not temp_path.exists(): raise FileNotFoundError(f"Depth file not generated: {temp_path}") try: depth = cv2.imread(str(temp_path), cv2.IMREAD_GRAYSCALE) if depth is None: raise FileNotFoundError( f"Failed to read depth image: {temp_path}" ) depth_uint8 = np.ascontiguousarray(depth) depth_colormap = cv2.applyColorMap(depth_uint8, cv2.COLORMAP_JET) if not cv2.imwrite(str(output_path), depth_colormap): raise RuntimeError( f"Failed to write depth visualization: {output_path}" ) finally: if temp_path.exists(): temp_path.unlink() def create_clean_material(self, material_name: str) -> bpy.types.Material: """Create a material with a cleared node tree.""" existing = bpy.data.materials.get(material_name) if existing is not None: bpy.data.materials.remove(existing, do_unlink=True) material = bpy.data.materials.new(name=material_name) material.use_nodes = True material.shadow_method = "NONE" tree = material.node_tree tree.nodes.clear() return material def create_view_normal_material(self) -> bpy.types.Material: material = self.create_clean_material("EmbodiedGenViewNormal") tree = material.node_tree geometry = tree.nodes.new(type="ShaderNodeNewGeometry") invert = tree.nodes.new(type="ShaderNodeVectorMath") invert.operation = "MULTIPLY" invert.inputs[1].default_value = (-1.0, -1.0, -1.0) face_mix = tree.nodes.new(type="ShaderNodeMix") face_mix.data_type = "VECTOR" face_mix.clamp_factor = True face_mix.factor_mode = "UNIFORM" view_transform = tree.nodes.new(type="ShaderNodeVectorTransform") view_transform.vector_type = "NORMAL" view_transform.convert_from = "WORLD" view_transform.convert_to = "CAMERA" flip_x = tree.nodes.new(type="ShaderNodeVectorMath") flip_x.operation = "MULTIPLY" flip_x.inputs[1].default_value = (-1.0, 1.0, -1.0) scale_bias = tree.nodes.new(type="ShaderNodeVectorMath") scale_bias.operation = "MULTIPLY_ADD" scale_bias.inputs[1].default_value = (0.5, 0.5, 0.5) scale_bias.inputs[2].default_value = (0.5, 0.5, 0.5) emission = tree.nodes.new(type="ShaderNodeEmission") output = tree.nodes.new(type="ShaderNodeOutputMaterial") tree.links.new(geometry.outputs["True Normal"], invert.inputs[0]) tree.links.new( geometry.outputs["Backfacing"], face_mix.inputs["Factor"] ) tree.links.new(geometry.outputs["True Normal"], face_mix.inputs["A"]) tree.links.new(invert.outputs["Vector"], face_mix.inputs["B"]) tree.links.new( face_mix.outputs["Result"], view_transform.inputs["Vector"] ) tree.links.new(view_transform.outputs["Vector"], flip_x.inputs[0]) tree.links.new(flip_x.outputs["Vector"], scale_bias.inputs[0]) tree.links.new(scale_bias.outputs["Vector"], emission.inputs["Color"]) tree.links.new(emission.outputs["Emission"], output.inputs["Surface"]) return material def create_mesh_preview_material(self) -> bpy.types.Material: material = self.create_clean_material("EmbodiedGenMeshPreview") tree = material.node_tree layer_weight = tree.nodes.new(type="ShaderNodeLayerWeight") layer_weight.inputs["Blend"].default_value = 0.35 base_ramp = tree.nodes.new(type="ShaderNodeValToRGB") base_ramp.color_ramp.elements[0].position = 0.1 base_ramp.color_ramp.elements[0].color = (0.78, 0.81, 0.87, 1.0) base_ramp.color_ramp.elements[1].position = 0.9 base_ramp.color_ramp.elements[1].color = (0.42, 0.48, 0.58, 1.0) emission = tree.nodes.new(type="ShaderNodeEmission") emission.inputs["Strength"].default_value = 0.82 output = tree.nodes.new(type="ShaderNodeOutputMaterial") tree.links.new(layer_weight.outputs["Facing"], base_ramp.inputs["Fac"]) tree.links.new(base_ramp.outputs["Color"], emission.inputs["Color"]) tree.links.new(emission.outputs["Emission"], output.inputs["Surface"]) return material def assign_instance_ids(self) -> dict[str, int]: """Assign stable per-object pass indices for instance segmentation.""" mesh_objects = sorted( self.get_mesh_objects(), key=lambda obj: obj.name ) if not mesh_objects: raise ValueError( "No mesh objects found for instance segmentation." ) instance_id_map: dict[str, int] = {} for instance_id, obj in enumerate(mesh_objects, start=1): obj.pass_index = instance_id instance_id_map[obj.name] = instance_id return instance_id_map def snapshot_object_pass_indices( self, ) -> list[tuple[bpy.types.Object, int]]: """Capture original object pass indices before a temporary override.""" return [(obj, obj.pass_index) for obj in self.get_mesh_objects()] def restore_object_pass_indices( self, original_pass_indices: list[tuple[bpy.types.Object, int]] ) -> None: """Restore object pass indices captured earlier.""" for obj, pass_index in original_pass_indices: obj.pass_index = pass_index def add_instance_seg_output_node( self, output_path: Path, ) -> tuple[bpy.types.NodeTree, list[bpy.types.Node]]: return self.add_exr_output_node( output_path=output_path, temp_output_path=self.get_instance_seg_temp_path(output_path), render_output_name="IndexOB", ) def add_flow_depth_output_node( self, output_path: Path, ) -> tuple[bpy.types.NodeTree, list[bpy.types.Node]]: return self.add_exr_output_node( output_path=output_path, temp_output_path=self.get_flow_depth_temp_path(output_path), render_output_name="Depth", ) def add_exr_output_node( self, output_path: Path, temp_output_path: Path, render_output_name: str, ) -> tuple[bpy.types.NodeTree, list[bpy.types.Node]]: """Attach a file-output EXR node for a specific render-layer socket.""" tree = self.scene.node_tree render_layers = tree.nodes.new(type="CompositorNodeRLayers") output_node = tree.nodes.new(type="CompositorNodeOutputFile") output_node.base_path = str(temp_output_path.parent) output_node.file_slots[0].path = self.get_temp_output_slot_prefix( temp_output_path ) output_node.format.file_format = "OPEN_EXR" output_node.format.color_mode = "RGB" output_node.format.color_depth = "32" output_node.format.exr_codec = "NONE" tree.links.new( render_layers.outputs[render_output_name], output_node.inputs[0] ) return tree, [render_layers, output_node] def load_temp_exr_first_channel( self, temp_path: Path, error_message: str, ) -> np.ndarray: """Load the first channel from a temporary EXR and flip to image space.""" if not temp_path.exists(): raise FileNotFoundError(error_message.format(path=temp_path)) temp_image = bpy.data.images.load(str(temp_path), check_existing=False) try: width, height = temp_image.size channels = temp_image.channels pixels = np.array(temp_image.pixels[:], dtype=np.float32) if pixels.size != width * height * channels: raise RuntimeError( f"Unexpected EXR image layout for {temp_path}." ) image = pixels.reshape(height, width, channels)[..., 0] return np.flipud(image) finally: bpy.data.images.remove(temp_image) def load_instance_seg_temp_output(self, temp_path: Path) -> np.ndarray: instance_seg = self.load_temp_exr_first_channel( temp_path, "Instance segmentation file not generated: {path}", ) return np.ascontiguousarray(np.rint(instance_seg).astype(np.uint16)) def load_flow_depth_temp_output(self, temp_path: Path) -> np.ndarray: depth = self.load_temp_exr_first_channel( temp_path, "Flow depth file not generated: {path}", ) depth = np.ascontiguousarray(depth.astype(np.float32)) depth[~np.isfinite(depth)] = 0.0 return depth def build_instance_seg_visualization( self, instance_seg: np.ndarray, max_instance_id: int ) -> np.ndarray: """Map instance ids to deterministic RGB colors for visualization.""" color_lut = np.zeros((max_instance_id + 1, 3), dtype=np.uint8) for instance_id in range(1, max_instance_id + 1): color_lut[instance_id] = ( (instance_id * 37) % 256, (instance_id * 67) % 256, (instance_id * 97) % 256, ) return color_lut[instance_seg] def save_instance_seg_outputs( self, output_path: Path, instance_seg: np.ndarray, ) -> None: output_path.parent.mkdir(parents=True, exist_ok=True) vis_output_path = self.get_instance_seg_vis_output_path(output_path) visualization = self.build_instance_seg_visualization( instance_seg=instance_seg, max_instance_id=int(instance_seg.max(initial=0)), ) if not cv2.imwrite(str(vis_output_path), visualization): raise RuntimeError( f"Failed to write instance segmentation preview: " f"{vis_output_path}" ) def build_flow_visualization(self, flow: np.ndarray) -> np.ndarray: flow_float = flow.astype(np.float32) magnitude, angle = cv2.cartToPolar( flow_float[..., 0], flow_float[..., 1], angleInDegrees=True, ) max_magnitude = float(np.percentile(magnitude, 99.0)) if max_magnitude <= 1e-6: max_magnitude = 1.0 magnitude_norm = np.clip(magnitude / max_magnitude, 0.0, 1.0) hsv = np.zeros((*flow.shape[:2], 3), dtype=np.float32) hsv[..., 0] = np.mod(angle, 360.0) hsv[..., 1] = magnitude_norm hsv[..., 2] = 1.0 bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR) return np.clip(bgr * 255.0, 0.0, 255.0).astype(np.uint8) def get_camera_intrinsics( self, camera: bpy.types.Object, width: int, height: int ) -> tuple[float, float, float, float]: camera_data = camera.data fx = width / (2.0 * math.tan(camera_data.angle_x * 0.5)) fy = height / (2.0 * math.tan(camera_data.angle_y * 0.5)) cx = (width - 1.0) * 0.5 cy = (height - 1.0) * 0.5 return fx, fy, cx, cy def build_camera_matrix_world( self, xyz: tuple[float, float, float] | list[float], rotation_deg: tuple[float, float, float] | list[float], ) -> Matrix: rotation = Euler(self.get_rotation_radians(rotation_deg), "XYZ") translation = Matrix.Translation(Vector(tuple(xyz))) return translation @ rotation.to_matrix().to_4x4() def compute_flow_from_depth( self, depth: np.ndarray, camera: bpy.types.Object, ) -> tuple[np.ndarray, np.ndarray]: """Project depth into a target camera and derive dense 2D flow.""" height, width = depth.shape fx, fy, cx, cy = self.get_camera_intrinsics(camera, width, height) valid = np.isfinite(depth) & (depth > 0.0) valid_mask = np.zeros((height, width), dtype=bool) if not np.any(valid): return np.zeros((height, width, 2), dtype=np.float32), valid_mask u_coords, v_coords = np.meshgrid( np.arange(width, dtype=np.float32), np.arange(height, dtype=np.float32), ) depth_valid = depth[valid] x_cam = ((u_coords[valid] - cx) / fx) * depth_valid y_cam = (-(v_coords[valid] - cy) / fy) * depth_valid z_cam = -depth_valid camera_points = np.stack( [x_cam, y_cam, z_cam, np.ones_like(z_cam)], axis=1 ) source_matrix_world = np.array(camera.matrix_world, dtype=np.float64) target_matrix_world = np.array( self.build_camera_matrix_world( self.flow_camera_xyz, self.flow_camera_rotation_deg, ), dtype=np.float64, ) target_world_to_camera = np.linalg.inv(target_matrix_world) world_points = camera_points @ source_matrix_world.T target_camera_points = world_points @ target_world_to_camera.T target_z = target_camera_points[:, 2] positive_depth = target_z < -1e-6 flow = np.zeros((height, width, 2), dtype=np.float32) if not np.any(positive_depth): return flow, valid_mask projected_x = ( fx * ( target_camera_points[positive_depth, 0] / -target_z[positive_depth] ) + cx ) projected_y = ( -fy * ( target_camera_points[positive_depth, 1] / -target_z[positive_depth] ) + cy ) in_frame = ( (projected_x >= 0.0) & (projected_x < width) & (projected_y >= 0.0) & (projected_y < height) ) if not np.any(in_frame): return flow, valid_mask source_x = u_coords[valid][positive_depth] source_y = v_coords[valid][positive_depth] flow_valid = np.stack( [ projected_x[in_frame] - source_x[in_frame], projected_y[in_frame] - source_y[in_frame], ], axis=1, ).astype(np.float32) flow_buffer = flow[valid] positive_depth_buffer = flow_buffer[positive_depth] positive_depth_buffer[in_frame] = flow_valid flow_buffer[positive_depth] = positive_depth_buffer flow[valid] = flow_buffer valid_mask_buffer = valid_mask[valid] positive_depth_mask = valid_mask_buffer[positive_depth] positive_depth_mask[in_frame] = True valid_mask_buffer[positive_depth] = positive_depth_mask valid_mask[valid] = valid_mask_buffer return flow, valid_mask def save_numpy_array(self, output_path: Path, array: np.ndarray) -> None: """Persist a NumPy array atomically to avoid partial writes.""" temp_output_path = output_path.with_suffix(".tmp.npy") if temp_output_path.exists(): temp_output_path.unlink() np.save(temp_output_path, array) temp_output_path.replace(output_path) def save_flow_outputs( self, output_path: Path, flow: np.ndarray, valid_mask: np.ndarray, ) -> None: output_path.parent.mkdir(parents=True, exist_ok=True) flow_output_path = self.get_flow_output_path(output_path) flow_valid_output_path = self.get_flow_valid_output_path(output_path) flow_vis_output_path = self.get_flow_vis_output_path(output_path) self.save_numpy_array(flow_output_path, flow) self.save_numpy_array(flow_valid_output_path, valid_mask) flow_vis = self.build_flow_visualization(flow) if not cv2.imwrite(str(flow_vis_output_path), flow_vis): raise RuntimeError( f"Failed to write flow preview: {flow_vis_output_path}" ) def get_preview_output_path( self, output_path: Path, render_pass_name: str, occurrence_index: int = 1, ) -> Path | None: preview_output_paths = { "rgb": output_path, "depth": self.get_depth_vis_output_path(output_path), "normal": self.get_normal_output_path(output_path), "mesh": self.get_mesh_output_path(output_path), "instance_seg": self.get_instance_seg_vis_output_path(output_path), "flow": self.get_flow_vis_output_path(output_path), } preview_output_path = preview_output_paths.get(render_pass_name) if preview_output_path is None: return None return self.build_occurrence_output_path( preview_output_path, occurrence_index ) def load_preview_image(self, image_path: Path) -> np.ndarray: image = cv2.imread(str(image_path), cv2.IMREAD_COLOR) if image is None: raise FileNotFoundError( f"Failed to read preview image: {image_path}" ) return image def collect_composite_images( self, output_path: Path ) -> list[tuple[str, np.ndarray]]: composite_images: list[tuple[str, np.ndarray]] = [] for ( render_pass_name, occurrence_index, ) in self.iter_render_pass_occurrences(): preview_output_path = self.get_preview_output_path( output_path, render_pass_name, occurrence_index, ) if preview_output_path is None or not preview_output_path.exists(): continue composite_images.append( ( render_pass_name, self.load_preview_image(preview_output_path), ) ) return composite_images def replicate_duplicate_preview_outputs(self, output_path: Path) -> None: """Materialize repeated preview outputs without re-rendering.""" for ( render_pass_name, occurrence_index, ) in self.iter_render_pass_occurrences(): if occurrence_index == 1: continue source_output_path = self.get_preview_output_path( output_path, render_pass_name ) duplicate_output_path = self.get_preview_output_path( output_path, render_pass_name, occurrence_index, ) if source_output_path is None or duplicate_output_path is None: continue if not source_output_path.exists(): raise FileNotFoundError( f"Preview output not generated for repeated pass " f"{render_pass_name}: {source_output_path}" ) if duplicate_output_path.exists(): duplicate_output_path.unlink() shutil.copyfile(source_output_path, duplicate_output_path) def get_composite_separator_boundaries( self, render_pass_names: list[str] | tuple[str, ...], boundaries: np.ndarray, ) -> list[float]: """Return separator boundaries for adjacent passes that differ.""" if len(boundaries) != len(render_pass_names) + 1: raise ValueError( "boundaries length must match the number of render passes + 1." ) separator_boundaries: list[float] = [] for index, boundary in enumerate(boundaries[1:-1], start=1): if render_pass_names[index - 1] == render_pass_names[index]: continue separator_boundaries.append(float(boundary)) return separator_boundaries def build_composite_image( self, images: list[np.ndarray], render_pass_names: list[str] | tuple[str, ...], separator_width_px: int = 6, ) -> np.ndarray: if not images: raise ValueError("At least one image is required for composition.") if len(images) != len(render_pass_names): raise ValueError( "images and render_pass_names must have the same length." ) base_height, base_width = images[0].shape[:2] resized_images = [ ( image if image.shape[:2] == (base_height, base_width) else cv2.resize( image, (base_width, base_height), interpolation=cv2.INTER_LINEAR, ) ) for image in images ] x_coords = np.broadcast_to( np.arange(base_width, dtype=np.float32), (base_height, base_width), ) y_coords = np.broadcast_to( np.arange(base_height, dtype=np.float32)[:, None], (base_height, base_width), ) slash_slope = 0.28 * (base_width / base_height) diagonal_coord = x_coords + y_coords * slash_slope diagonal_min = float(diagonal_coord.min()) diagonal_max = float(diagonal_coord.max()) boundaries = np.linspace( diagonal_min, diagonal_max, len(resized_images) + 1 ) composite = np.zeros_like(resized_images[0]) region_indices = np.digitize( diagonal_coord, boundaries[1:-1], right=False ) for image_index, image in enumerate(resized_images): composite[region_indices == image_index] = image[ region_indices == image_index ] slash_mask = np.zeros((base_height, base_width), dtype=bool) separator_boundaries = self.get_composite_separator_boundaries( render_pass_names, boundaries ) for boundary in separator_boundaries: slash_mask |= ( np.abs(diagonal_coord - boundary) <= separator_width_px ) composite[slash_mask] = 255 return composite def save_composite_preview(self, output_path: Path) -> None: composite_images = self.collect_composite_images(output_path) if len(composite_images) < 2: return composite_output_path = self.get_composite_output_path( tuple(render_pass_name for render_pass_name, _ in composite_images) ) composite_image = self.build_composite_image( [image for _, image in composite_images], [render_pass_name for render_pass_name, _ in composite_images], ) if not cv2.imwrite(str(composite_output_path), composite_image): raise RuntimeError( f"Failed to write composite preview: {composite_output_path}" ) def render_flow_pass(self, output_path: Path) -> None: self.validate_flow_args() camera = self.scene.camera if camera is None: raise ValueError("Scene camera is required for flow rendering.") temp_output_path = self.get_flow_depth_temp_path(output_path) def finalize_flow_output(depth: np.ndarray) -> None: flow, valid_mask = self.compute_flow_from_depth( depth=depth, camera=camera ) self.save_flow_outputs( output_path=output_path, flow=flow, valid_mask=valid_mask, ) self.render_temp_output_pass( output_path=output_path, temp_output_path=temp_output_path, add_output_node=self.add_flow_depth_output_node, load_temp_output=self.load_flow_depth_temp_output, finalize_output=finalize_flow_output, color_mode="RGB", color_depth="8", enable_depth_pass=True, ) def render_normal_pass(self, output_path: Path) -> None: normal_output_path = self.get_normal_output_path(output_path) self.render_material_override_pass( preview_output_path=normal_output_path, material_factory=self.create_view_normal_material, color_mode="RGB", ) def render_mesh_pass(self, output_path: Path) -> None: mesh_output_path = self.get_mesh_output_path(output_path) self.render_material_override_pass( preview_output_path=mesh_output_path, material_factory=self.create_mesh_preview_material, color_mode="RGBA", ) def render_instance_seg_pass(self, output_path: Path) -> None: original_pass_indices = self.snapshot_object_pass_indices() self.assign_instance_ids() temp_output_path = self.get_instance_seg_temp_path(output_path) def finalize_instance_seg_output(instance_seg: np.ndarray) -> None: self.save_instance_seg_outputs( output_path=output_path, instance_seg=instance_seg, ) try: self.render_temp_output_pass( output_path=output_path, temp_output_path=temp_output_path, add_output_node=self.add_instance_seg_output_node, load_temp_output=self.load_instance_seg_temp_output, finalize_output=finalize_instance_seg_output, color_mode="BW", color_depth="16", enable_object_index_pass=True, ) finally: self.restore_object_pass_indices(original_pass_indices) def render(self, output_path: Path) -> None: """Run the requested render passes and write final outputs.""" self.scene.use_nodes = False auxiliary_outputs: list[tuple[Path, Path]] = [] needs_base_render = bool({"rgb", "depth"} & set(self.render_passes)) if "depth" in self.render_passes: auxiliary_outputs = self.configure_auxiliary_outputs( output_path, self.render_passes, self.depth_mode ) output_path.parent.mkdir(parents=True, exist_ok=True) if "rgb" in self.render_passes: self.scene.render.filepath = str(output_path) if needs_base_render: bpy.ops.render.render(write_still="rgb" in self.render_passes) for temp_path, final_path in auxiliary_outputs: if final_path == self.get_depth_vis_output_path(output_path): self.finalize_depth_output(temp_path, final_path) continue raise ValueError(f"Unsupported render output target: {final_path}") if auxiliary_outputs: self.clear_compositor_tree() self.scene.use_nodes = False if "normal" in self.render_passes: self.render_normal_pass(output_path) if "mesh" in self.render_passes: self.render_mesh_pass(output_path) if "instance_seg" in self.render_passes: self.render_instance_seg_pass(output_path) if "flow" in self.render_passes: self.render_flow_pass(output_path) self.replicate_duplicate_preview_outputs(output_path) self.save_composite_preview(output_path) def run(self) -> None: """Prepare the scene, configure rendering, and execute all passes.""" rgb_output_path = self.get_rgb_output_path() self.output_dir.mkdir(parents=True, exist_ok=True) self.clear_scene() self.import_usd() self.validate_glb_args() imported_glb_objects = self.import_glb_asset() self.place_glb_asset(imported_glb_objects) min_corner, max_corner = self.get_scene_bbox() center = (min_corner + max_corner) * 0.5 diagonal = (max_corner - min_corner).length self.create_camera() self.ensure_lighting(diagonal, center, max_corner.z) world_created = self.ensure_world() self.add_fill_light( diagonal, center, max_corner.z, energy=self.fill_light_energy, ) if world_created: self.add_light_rig( diagonal, center, max_corner.z, area_energy=1500.0, sun_energy=0.35, prefix="Fill", ) self.configure_color_management() self.configure_cycles() with tempfile.TemporaryDirectory( prefix="render_usd_", dir=None ) as temp_dir: self.temp_dir = Path(temp_dir) self.render(rgb_output_path) self.temp_dir = None logger.info("Rendered outputs to %s", self.output_dir) def main() -> None: logging.basicConfig(level=logging.INFO) args = _parse_args() RenderUsd.from_args(args).run() if __name__ == "__main__": main()