Spaces:
Running on Zero
Running on Zero
| # Project EmbodiedGen | |
| # | |
| # Copyright (c) 2025 Horizon Robotics. All Rights Reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
| # implied. See the License for the specific language governing | |
| # permissions and limitations under the License. | |
| from __future__ import annotations | |
| import argparse | |
| import logging | |
| import math | |
| import shutil | |
| import tempfile | |
| from collections.abc import Callable | |
| from pathlib import Path | |
| import bpy | |
| import cv2 | |
| import numpy as np | |
| from mathutils import Euler, Matrix, Vector | |
| logger = logging.getLogger(__name__) | |
| def build_arg_parser() -> argparse.ArgumentParser: | |
| """Build the CLI parser for USD rendering.""" | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--usd_path", required=True, type=Path) | |
| parser.add_argument("--glb_path", type=str, default="") | |
| parser.add_argument( | |
| "--glb_xyz", | |
| type=float, | |
| nargs=3, | |
| metavar=("X", "Y", "Z"), | |
| ) | |
| parser.add_argument( | |
| "--glb_rotation_deg", | |
| type=float, | |
| nargs=3, | |
| metavar=("RX", "RY", "RZ"), | |
| ) | |
| parser.add_argument("--output_dir", required=True, type=Path) | |
| parser.add_argument( | |
| "--render_passes", | |
| nargs="+", | |
| choices=("rgb", "depth", "normal", "mesh", "instance_seg", "flow"), | |
| default=("rgb",), | |
| ) | |
| parser.add_argument( | |
| "--depth_mode", | |
| choices=("normalized", "metric"), | |
| default="normalized", | |
| ) | |
| parser.add_argument( | |
| "--resolution", | |
| type=int, | |
| nargs=2, | |
| metavar=("WIDTH", "HEIGHT"), | |
| default=(1920, 1080), | |
| ) | |
| parser.add_argument("--samples", type=int, default=1024) | |
| parser.add_argument( | |
| "--camera_xyz", | |
| type=float, | |
| nargs=3, | |
| metavar=("X", "Y", "Z"), | |
| required=True, | |
| ) | |
| parser.add_argument( | |
| "--camera_rotation_deg", | |
| type=float, | |
| nargs=3, | |
| metavar=("RX", "RY", "RZ"), | |
| required=True, | |
| ) | |
| parser.add_argument( | |
| "--flow_camera_xyz", | |
| type=float, | |
| nargs=3, | |
| metavar=("X", "Y", "Z"), | |
| ) | |
| parser.add_argument( | |
| "--flow_camera_rotation_deg", | |
| type=float, | |
| nargs=3, | |
| metavar=("RX", "RY", "RZ"), | |
| ) | |
| parser.add_argument("--focal_length_mm", type=float, default=20.0) | |
| parser.add_argument("--exposure", type=float, default=2.2) | |
| parser.add_argument("--world_strength", type=float, default=8.0) | |
| parser.add_argument("--fill_light_energy", type=float, default=14000.0) | |
| return parser | |
| def _parse_args() -> argparse.Namespace: | |
| return build_arg_parser().parse_args() | |
| class RenderUsd: | |
| """USD renderer for RGB, depth, normal, mesh, segmentation, and flow.""" | |
| def __init__( | |
| self, | |
| *, | |
| usd_path: Path, | |
| glb_path: Path | str | None, | |
| glb_xyz: tuple[float, float, float] | list[float] | None, | |
| glb_rotation_deg: tuple[float, float, float] | list[float] | None, | |
| output_dir: Path, | |
| render_passes: tuple[str, ...] | list[str], | |
| depth_mode: str, | |
| resolution: tuple[int, int] | list[int], | |
| samples: int, | |
| camera_xyz: tuple[float, float, float] | list[float], | |
| camera_rotation_deg: tuple[float, float, float] | list[float], | |
| flow_camera_xyz: tuple[float, float, float] | list[float] | None, | |
| flow_camera_rotation_deg: ( | |
| tuple[float, float, float] | list[float] | None | |
| ), | |
| focal_length_mm: float, | |
| exposure: float, | |
| world_strength: float, | |
| fill_light_energy: float, | |
| ) -> None: | |
| """Initialize renderer configuration independent of CLI parsing.""" | |
| self.usd_path = usd_path | |
| self.glb_path = self.normalize_optional_path(glb_path) | |
| self.glb_xyz = tuple(glb_xyz) if glb_xyz is not None else None | |
| self.glb_rotation_deg = ( | |
| tuple(glb_rotation_deg) if glb_rotation_deg is not None else None | |
| ) | |
| self.output_dir = output_dir | |
| self.render_passes = tuple(render_passes) | |
| self.depth_mode = depth_mode | |
| self.resolution = tuple(resolution) | |
| self.samples = samples | |
| self.camera_xyz = tuple(camera_xyz) | |
| self.camera_rotation_deg = tuple(camera_rotation_deg) | |
| self.flow_camera_xyz = ( | |
| tuple(flow_camera_xyz) if flow_camera_xyz is not None else None | |
| ) | |
| self.flow_camera_rotation_deg = ( | |
| tuple(flow_camera_rotation_deg) | |
| if flow_camera_rotation_deg is not None | |
| else None | |
| ) | |
| self.focal_length_mm = focal_length_mm | |
| self.exposure = exposure | |
| self.world_strength = world_strength | |
| self.fill_light_energy = fill_light_energy | |
| self.temp_dir: Path | None = None | |
| def from_args(cls, args: argparse.Namespace) -> RenderUsd: | |
| """Build a renderer from parsed CLI arguments.""" | |
| return cls( | |
| usd_path=args.usd_path, | |
| glb_path=args.glb_path, | |
| glb_xyz=args.glb_xyz, | |
| glb_rotation_deg=args.glb_rotation_deg, | |
| output_dir=args.output_dir, | |
| render_passes=args.render_passes, | |
| depth_mode=args.depth_mode, | |
| resolution=args.resolution, | |
| samples=args.samples, | |
| camera_xyz=args.camera_xyz, | |
| camera_rotation_deg=args.camera_rotation_deg, | |
| flow_camera_xyz=args.flow_camera_xyz, | |
| flow_camera_rotation_deg=args.flow_camera_rotation_deg, | |
| focal_length_mm=args.focal_length_mm, | |
| exposure=args.exposure, | |
| world_strength=args.world_strength, | |
| fill_light_energy=args.fill_light_energy, | |
| ) | |
| def scene(self) -> bpy.types.Scene: | |
| return bpy.context.scene | |
| def normalize_optional_path( | |
| self, path_value: Path | str | None | |
| ) -> Path | None: | |
| """Normalize an optional CLI path, treating empty strings as missing.""" | |
| if path_value is None: | |
| return None | |
| if isinstance(path_value, Path): | |
| return path_value | |
| normalized = path_value.strip() | |
| if not normalized: | |
| return None | |
| return Path(normalized) | |
| def build_output_path(self, filename: str) -> Path: | |
| """Build a normalized output path under the render directory.""" | |
| return self.output_dir / filename | |
| def build_temp_path(self, filename: str) -> Path: | |
| """Build a temporary path outside the final output directory.""" | |
| if self.temp_dir is None: | |
| raise RuntimeError( | |
| "Temporary render directory is not initialized." | |
| ) | |
| return self.temp_dir / filename | |
| def get_rgb_output_path(self) -> Path: | |
| return self.build_output_path("render_rgb.png") | |
| def get_depth_vis_output_path(self, output_path: Path) -> Path: | |
| del output_path | |
| return self.build_output_path("render_depth.png") | |
| def get_normal_output_path(self, output_path: Path) -> Path: | |
| del output_path | |
| return self.build_output_path("render_normal.png") | |
| def get_mesh_output_path(self, output_path: Path) -> Path: | |
| del output_path | |
| return self.build_output_path("render_mesh.png") | |
| def get_instance_seg_vis_output_path(self, output_path: Path) -> Path: | |
| del output_path | |
| return self.build_output_path("render_instance_seg_vis.png") | |
| def get_instance_seg_temp_path(self, output_path: Path) -> Path: | |
| del output_path | |
| return self.build_temp_path("render_instance_seg_raw_0001.exr") | |
| def get_flow_output_path(self, output_path: Path) -> Path: | |
| del output_path | |
| return self.build_output_path("render_flow.npy") | |
| def get_flow_valid_output_path(self, output_path: Path) -> Path: | |
| del output_path | |
| return self.build_output_path("render_flow_valid.npy") | |
| def get_flow_vis_output_path(self, output_path: Path) -> Path: | |
| del output_path | |
| return self.build_output_path("render_flow_vis.png") | |
| def get_flow_depth_temp_path(self, output_path: Path) -> Path: | |
| del output_path | |
| return self.build_temp_path("render_flow_depth_raw_0001.exr") | |
| def get_depth_gray_temp_path(self, output_path: Path) -> Path: | |
| del output_path | |
| return self.build_temp_path("render_depth_gray_0001.png") | |
| def get_composite_output_path( | |
| self, render_passes: list[str] | tuple[str, ...] | |
| ) -> Path: | |
| pass_names = "_".join(render_passes) | |
| return self.build_output_path(f"render_composite_{pass_names}.png") | |
| def build_occurrence_output_path( | |
| self, output_path: Path, occurrence_index: int | |
| ) -> Path: | |
| """Build an occurrence-specific path for repeated preview outputs.""" | |
| if occurrence_index < 1: | |
| raise ValueError("occurrence_index must be greater than 0.") | |
| if occurrence_index == 1: | |
| return output_path | |
| return output_path.with_name( | |
| f"{output_path.stem}_{occurrence_index}{output_path.suffix}" | |
| ) | |
| def iter_render_pass_occurrences(self) -> list[tuple[str, int]]: | |
| """Return requested render passes with 1-based occurrence indices.""" | |
| occurrence_counts: dict[str, int] = {} | |
| render_pass_occurrences: list[tuple[str, int]] = [] | |
| for render_pass_name in self.render_passes: | |
| occurrence_index = occurrence_counts.get(render_pass_name, 0) + 1 | |
| occurrence_counts[render_pass_name] = occurrence_index | |
| render_pass_occurrences.append( | |
| (render_pass_name, occurrence_index) | |
| ) | |
| return render_pass_occurrences | |
| def get_temp_output_slot_prefix(self, temp_output_path: Path) -> str: | |
| """Return the compositor slot prefix without the frame suffix.""" | |
| stem_parts = temp_output_path.stem.rsplit("_", maxsplit=1) | |
| if len(stem_parts) != 2 or not stem_parts[1].isdigit(): | |
| raise ValueError( | |
| f"Unexpected temporary output filename: {temp_output_path.name}" | |
| ) | |
| return f"{stem_parts[0]}_" | |
| def get_mesh_objects(self) -> list[bpy.types.Object]: | |
| return [obj for obj in self.scene.objects if obj.type == "MESH"] | |
| def clear_scene(self) -> None: | |
| bpy.ops.wm.read_factory_settings(use_empty=True) | |
| def import_usd(self) -> None: | |
| if not self.usd_path.exists(): | |
| raise FileNotFoundError(f"USD file not found: {self.usd_path}") | |
| bpy.ops.wm.usd_import(filepath=str(self.usd_path)) | |
| def validate_glb_args(self) -> None: | |
| """Normalize optional GLB arguments and ensure all-or-none usage.""" | |
| has_glb_path = self.glb_path is not None | |
| has_glb_xyz = self.glb_xyz is not None | |
| has_glb_rotation = self.glb_rotation_deg is not None | |
| if len({has_glb_path, has_glb_xyz, has_glb_rotation}) != 1: | |
| raise ValueError( | |
| "--glb_path, --glb_xyz, and --glb_rotation_deg must be " | |
| "provided together." | |
| ) | |
| if not has_glb_path: | |
| return | |
| if not self.glb_path.exists(): | |
| raise FileNotFoundError(f"GLB file not found: {self.glb_path}") | |
| if self.glb_path.suffix.lower() != ".glb": | |
| raise ValueError( | |
| f"Expected a .glb asset, but got: {self.glb_path}" | |
| ) | |
| def enable_gltf_importer(self) -> None: | |
| """Ensure Blender's glTF importer add-on is available.""" | |
| addon_name = "io_scene_gltf2" | |
| if addon_name in bpy.context.preferences.addons: | |
| return | |
| try: | |
| bpy.ops.preferences.addon_enable(module=addon_name) | |
| except Exception as exc: | |
| raise RuntimeError( | |
| "Failed to enable Blender glTF importer add-on." | |
| ) from exc | |
| def import_glb_asset(self) -> list[bpy.types.Object]: | |
| """Import the optional GLB asset and return created objects.""" | |
| if self.glb_path is None: | |
| return [] | |
| self.enable_gltf_importer() | |
| existing_object_ids = {obj.as_pointer() for obj in bpy.data.objects} | |
| result = bpy.ops.import_scene.gltf(filepath=str(self.glb_path)) | |
| if "FINISHED" not in result: | |
| raise RuntimeError(f"Failed to import GLB asset: {self.glb_path}") | |
| imported_objects = [ | |
| obj | |
| for obj in bpy.data.objects | |
| if obj.as_pointer() not in existing_object_ids | |
| ] | |
| if not imported_objects: | |
| raise ValueError( | |
| f"No objects were imported from GLB asset: {self.glb_path}" | |
| ) | |
| return imported_objects | |
| def get_imported_root_objects( | |
| self, imported_objects: list[bpy.types.Object] | |
| ) -> list[bpy.types.Object]: | |
| """Return top-level imported objects so transforms apply as one asset.""" | |
| imported_ids = {obj.as_pointer() for obj in imported_objects} | |
| root_objects = [ | |
| obj | |
| for obj in imported_objects | |
| if obj.parent is None | |
| or obj.parent.as_pointer() not in imported_ids | |
| ] | |
| return root_objects or imported_objects | |
| def place_glb_asset( | |
| self, imported_objects: list[bpy.types.Object] | |
| ) -> None: | |
| """Place the imported GLB asset using the requested world transform.""" | |
| if not imported_objects: | |
| return | |
| if self.glb_xyz is None or self.glb_rotation_deg is None: | |
| raise ValueError("GLB transform arguments are not initialized.") | |
| asset_transform = self.build_camera_matrix_world( | |
| self.glb_xyz, | |
| self.glb_rotation_deg, | |
| ) | |
| for obj in self.get_imported_root_objects(imported_objects): | |
| obj.matrix_world = asset_transform @ obj.matrix_world.copy() | |
| bpy.context.view_layer.update() | |
| def get_scene_bbox(self) -> tuple[Vector, Vector]: | |
| """Compute the world-space bounding box across all mesh objects.""" | |
| mesh_objects = self.get_mesh_objects() | |
| if not mesh_objects: | |
| raise ValueError("No mesh objects found after USD import.") | |
| points: list[Vector] = [] | |
| for obj in mesh_objects: | |
| points.extend( | |
| obj.matrix_world @ Vector(corner) for corner in obj.bound_box | |
| ) | |
| min_corner = Vector( | |
| ( | |
| min(p.x for p in points), | |
| min(p.y for p in points), | |
| min(p.z for p in points), | |
| ) | |
| ) | |
| max_corner = Vector( | |
| ( | |
| max(p.x for p in points), | |
| max(p.y for p in points), | |
| max(p.z for p in points), | |
| ) | |
| ) | |
| return min_corner, max_corner | |
| def create_camera(self) -> bpy.types.Object: | |
| """Create and configure the primary render camera.""" | |
| if self.camera_xyz is None: | |
| raise ValueError("--camera_xyz is required.") | |
| location = Vector(tuple(self.camera_xyz)) | |
| rotation_rad = self.get_rotation_radians(self.camera_rotation_deg) | |
| bpy.ops.object.camera_add(location=location, rotation=rotation_rad) | |
| camera = bpy.context.object | |
| camera.rotation_mode = "XYZ" | |
| camera.data.lens = self.focal_length_mm | |
| camera.data.clip_start = 0.01 | |
| camera.data.clip_end = 1000.0 | |
| self.scene.camera = camera | |
| return camera | |
| def add_light_rig( | |
| self, | |
| diagonal: float, | |
| center: Vector, | |
| top_z: float, | |
| *, | |
| area_energy: float, | |
| sun_energy: float, | |
| prefix: str, | |
| ) -> None: | |
| bpy.ops.object.light_add( | |
| type="AREA", | |
| location=(center.x, center.y, top_z + 0.5 * diagonal), | |
| ) | |
| area = bpy.context.object | |
| area.name = f"{prefix}Area" | |
| area.data.energy = area_energy | |
| area.data.shape = "DISK" | |
| area.data.size = max(diagonal, 2.0) | |
| bpy.ops.object.light_add( | |
| type="SUN", | |
| location=( | |
| center.x + diagonal, | |
| center.y - diagonal, | |
| top_z + diagonal, | |
| ), | |
| ) | |
| sun = bpy.context.object | |
| sun.name = f"{prefix}Sun" | |
| sun.data.energy = sun_energy | |
| def add_fill_light( | |
| self, | |
| diagonal: float, | |
| center: Vector, | |
| top_z: float, | |
| energy: float, | |
| ) -> None: | |
| if energy <= 0.0: | |
| return | |
| bpy.ops.object.light_add( | |
| type="AREA", | |
| location=(center.x, center.y, top_z + 0.35 * diagonal), | |
| rotation=(0.0, 0.0, 0.0), | |
| ) | |
| area = bpy.context.object | |
| area.name = "GlobalFillArea" | |
| area.data.energy = energy | |
| area.data.shape = "DISK" | |
| area.data.size = max(diagonal * 0.9, 3.0) | |
| def ensure_lighting( | |
| self, diagonal: float, center: Vector, top_z: float | |
| ) -> None: | |
| if any(obj.type == "LIGHT" for obj in self.scene.objects): | |
| return | |
| self.add_light_rig( | |
| diagonal, | |
| center, | |
| top_z, | |
| area_energy=5000.0, | |
| sun_energy=1.5, | |
| prefix="Fallback", | |
| ) | |
| def set_world_strength(self, strength: float) -> None: | |
| world = self.scene.world | |
| if world is None: | |
| return | |
| if not world.use_nodes: | |
| world.use_nodes = True | |
| tree = world.node_tree | |
| background_nodes = [ | |
| node for node in tree.nodes if node.type == "BACKGROUND" | |
| ] | |
| if not background_nodes: | |
| background = tree.nodes.new(type="ShaderNodeBackground") | |
| output = next( | |
| (node for node in tree.nodes if node.type == "OUTPUT_WORLD"), | |
| None, | |
| ) | |
| if output is None: | |
| output = tree.nodes.new(type="ShaderNodeOutputWorld") | |
| tree.links.new( | |
| background.outputs["Background"], output.inputs["Surface"] | |
| ) | |
| background_nodes = [background] | |
| for background in background_nodes: | |
| background.inputs["Strength"].default_value = strength | |
| def ensure_world(self) -> bool: | |
| """Ensure the scene has a world shader and return whether it was created.""" | |
| if self.scene.world is not None: | |
| self.set_world_strength(self.world_strength) | |
| return False | |
| world = bpy.data.worlds.new(name="RenderWorld") | |
| world.use_nodes = True | |
| tree = world.node_tree | |
| tree.nodes.clear() | |
| output = tree.nodes.new(type="ShaderNodeOutputWorld") | |
| background = tree.nodes.new(type="ShaderNodeBackground") | |
| sky = tree.nodes.new(type="ShaderNodeTexSky") | |
| background.inputs["Strength"].default_value = self.world_strength | |
| tree.links.new(sky.outputs["Color"], background.inputs["Color"]) | |
| tree.links.new( | |
| background.outputs["Background"], output.inputs["Surface"] | |
| ) | |
| self.scene.world = world | |
| return True | |
| def configure_cycles(self) -> None: | |
| self.scene.render.engine = "CYCLES" | |
| self.scene.cycles.device = "GPU" | |
| self.scene.cycles.samples = self.samples | |
| self.scene.render.resolution_x = self.resolution[0] | |
| self.scene.render.resolution_y = self.resolution[1] | |
| self.scene.render.image_settings.file_format = "PNG" | |
| self.scene.render.film_transparent = False | |
| prefs = bpy.context.preferences.addons["cycles"].preferences | |
| prefs.compute_device_type = "CUDA" | |
| prefs.get_devices() | |
| cuda_devices = [ | |
| device for device in prefs.devices if device.type == "CUDA" | |
| ] | |
| if not cuda_devices: | |
| raise RuntimeError("No CUDA device found in Blender Cycles.") | |
| for device in prefs.devices: | |
| device.use = device.type == "CUDA" | |
| def configure_color_management(self) -> None: | |
| self.scene.view_settings.exposure = self.exposure | |
| def snapshot_render_state( | |
| self, | |
| view_layer: bpy.types.ViewLayer, | |
| *, | |
| include_filepath: bool = False, | |
| include_material_override: bool = False, | |
| include_use_pass_z: bool = False, | |
| include_use_pass_object_index: bool = False, | |
| ) -> dict[str, object]: | |
| """Capture the render state that temporary passes need to restore.""" | |
| state: dict[str, object] = { | |
| "film_transparent": self.scene.render.film_transparent, | |
| "view_transform": self.scene.view_settings.view_transform, | |
| "look": self.scene.view_settings.look, | |
| "exposure": self.scene.view_settings.exposure, | |
| "gamma": self.scene.view_settings.gamma, | |
| "file_format": self.scene.render.image_settings.file_format, | |
| "color_mode": self.scene.render.image_settings.color_mode, | |
| "color_depth": self.scene.render.image_settings.color_depth, | |
| "use_nodes": self.scene.use_nodes, | |
| "samples": self.scene.cycles.samples, | |
| } | |
| if include_filepath: | |
| state["filepath"] = self.scene.render.filepath | |
| if include_material_override: | |
| state["material_override"] = view_layer.material_override | |
| if include_use_pass_z: | |
| state["use_pass_z"] = view_layer.use_pass_z | |
| if include_use_pass_object_index: | |
| state["use_pass_object_index"] = view_layer.use_pass_object_index | |
| return state | |
| def restore_render_state( | |
| self, state: dict[str, object], view_layer: bpy.types.ViewLayer | |
| ) -> None: | |
| """Restore a render state captured by ``snapshot_render_state``.""" | |
| self.scene.render.film_transparent = state["film_transparent"] | |
| self.scene.view_settings.view_transform = state["view_transform"] | |
| self.scene.view_settings.look = state["look"] | |
| self.scene.view_settings.exposure = state["exposure"] | |
| self.scene.view_settings.gamma = state["gamma"] | |
| self.scene.render.image_settings.file_format = state["file_format"] | |
| self.scene.render.image_settings.color_mode = state["color_mode"] | |
| self.scene.render.image_settings.color_depth = state["color_depth"] | |
| self.scene.use_nodes = state["use_nodes"] | |
| self.scene.cycles.samples = state["samples"] | |
| if "filepath" in state: | |
| self.scene.render.filepath = state["filepath"] | |
| if "material_override" in state: | |
| view_layer.material_override = state["material_override"] | |
| if "use_pass_z" in state: | |
| view_layer.use_pass_z = state["use_pass_z"] | |
| if "use_pass_object_index" in state: | |
| view_layer.use_pass_object_index = state["use_pass_object_index"] | |
| def apply_raw_preview_settings( | |
| self, | |
| *, | |
| use_nodes: bool, | |
| samples: int, | |
| color_mode: str, | |
| color_depth: str, | |
| ) -> None: | |
| """Apply the shared render settings for auxiliary preview passes.""" | |
| self.scene.render.film_transparent = True | |
| self.scene.view_settings.view_transform = "Raw" | |
| self.scene.view_settings.look = "None" | |
| self.scene.view_settings.exposure = 0.0 | |
| self.scene.view_settings.gamma = 1.0 | |
| self.scene.use_nodes = use_nodes | |
| self.scene.cycles.samples = samples | |
| self.scene.render.image_settings.file_format = "PNG" | |
| self.scene.render.image_settings.color_mode = color_mode | |
| self.scene.render.image_settings.color_depth = color_depth | |
| def clear_compositor_tree(self) -> bpy.types.NodeTree: | |
| """Reset the compositor tree so each pass starts from a clean slate.""" | |
| self.scene.use_nodes = True | |
| tree = self.scene.node_tree | |
| tree.nodes.clear() | |
| return tree | |
| def remove_render_nodes(self, created_nodes: list[bpy.types.Node]) -> None: | |
| """Remove compositor nodes created for a temporary render pass.""" | |
| if not created_nodes: | |
| return | |
| node_tree = self.scene.node_tree | |
| if node_tree is None: | |
| return | |
| for node in created_nodes: | |
| if node.name in node_tree.nodes: | |
| node_tree.nodes.remove(node) | |
| def render_material_override_pass( | |
| self, | |
| preview_output_path: Path, | |
| material_factory: Callable[[], bpy.types.Material], | |
| *, | |
| color_mode: str, | |
| ) -> None: | |
| """Render a pass with a temporary material override.""" | |
| preview_output_path.parent.mkdir(parents=True, exist_ok=True) | |
| view_layer = self.scene.view_layers["ViewLayer"] | |
| state = self.snapshot_render_state( | |
| view_layer, | |
| include_filepath=True, | |
| include_material_override=True, | |
| ) | |
| material = material_factory() | |
| try: | |
| self.apply_raw_preview_settings( | |
| use_nodes=False, | |
| samples=min(int(state["samples"]), 64), | |
| color_mode=color_mode, | |
| color_depth="8", | |
| ) | |
| self.scene.render.filepath = str(preview_output_path) | |
| view_layer.material_override = material | |
| bpy.ops.render.render(write_still=True) | |
| finally: | |
| self.restore_render_state(state, view_layer) | |
| bpy.data.materials.remove(material, do_unlink=True) | |
| def render_temp_output_pass( | |
| self, | |
| output_path: Path, | |
| temp_output_path: Path, | |
| *, | |
| add_output_node: Callable[ | |
| [Path], tuple[bpy.types.NodeTree, list[bpy.types.Node]] | |
| ], | |
| load_temp_output: Callable[[Path], np.ndarray], | |
| finalize_output: Callable[[np.ndarray], None], | |
| color_mode: str, | |
| color_depth: str, | |
| enable_depth_pass: bool = False, | |
| enable_object_index_pass: bool = False, | |
| ) -> None: | |
| """Render a temporary compositor output and finalize it.""" | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| view_layer = self.scene.view_layers["ViewLayer"] | |
| state = self.snapshot_render_state( | |
| view_layer, | |
| include_use_pass_z=enable_depth_pass, | |
| include_use_pass_object_index=enable_object_index_pass, | |
| ) | |
| created_nodes: list[bpy.types.Node] = [] | |
| try: | |
| if temp_output_path.exists(): | |
| temp_output_path.unlink() | |
| self.apply_raw_preview_settings( | |
| use_nodes=True, | |
| samples=1, | |
| color_mode=color_mode, | |
| color_depth=color_depth, | |
| ) | |
| if enable_depth_pass: | |
| view_layer.use_pass_z = True | |
| if enable_object_index_pass: | |
| view_layer.use_pass_object_index = True | |
| self.clear_compositor_tree() | |
| _, created_nodes = add_output_node(output_path) | |
| bpy.ops.render.render(write_still=False) | |
| finalize_output(load_temp_output(temp_output_path)) | |
| finally: | |
| self.remove_render_nodes(created_nodes) | |
| if temp_output_path.exists(): | |
| temp_output_path.unlink() | |
| self.restore_render_state(state, view_layer) | |
| def get_rotation_radians( | |
| self, rotation_deg: tuple[float, float, float] | list[float] | |
| ) -> tuple[float, float, float]: | |
| return tuple(math.radians(angle_deg) for angle_deg in rotation_deg) | |
| def validate_flow_args(self) -> None: | |
| """Normalize optional flow-camera arguments and fill defaults.""" | |
| has_flow_xyz = self.flow_camera_xyz is not None | |
| has_flow_rotation = self.flow_camera_rotation_deg is not None | |
| if has_flow_xyz != has_flow_rotation: | |
| raise ValueError( | |
| "--flow_camera_xyz and --flow_camera_rotation_deg must be " | |
| "provided together." | |
| ) | |
| if not has_flow_xyz: | |
| xyz = list(self.camera_xyz) | |
| xyz[0] += 0.5 | |
| self.flow_camera_xyz = tuple(xyz) | |
| self.flow_camera_rotation_deg = tuple(self.camera_rotation_deg) | |
| def build_depth_preview_node( | |
| self, | |
| tree: bpy.types.NodeTree, | |
| render_layers: bpy.types.CompositorNodeRLayers, | |
| camera: bpy.types.Camera, | |
| depth_mode: str, | |
| ) -> bpy.types.Node: | |
| """Build the compositor node that converts raw depth to a previewable map.""" | |
| if depth_mode == "normalized": | |
| normalize = tree.nodes.new(type="CompositorNodeNormalize") | |
| tree.links.new(render_layers.outputs["Depth"], normalize.inputs[0]) | |
| return normalize | |
| if depth_mode != "metric": | |
| raise ValueError(f"Unsupported depth mode: {depth_mode}") | |
| depth_map = tree.nodes.new(type="CompositorNodeMapRange") | |
| depth_map.inputs["From Min"].default_value = camera.clip_start | |
| depth_map.inputs["From Max"].default_value = camera.clip_end | |
| depth_map.inputs["To Min"].default_value = 0.0 | |
| depth_map.inputs["To Max"].default_value = 1.0 | |
| depth_map.use_clamp = True | |
| tree.links.new(render_layers.outputs["Depth"], depth_map.inputs[0]) | |
| return depth_map | |
| def build_depth_vis_output( | |
| self, | |
| tree: bpy.types.NodeTree, | |
| depth_preview_node: bpy.types.Node, | |
| output_path: Path, | |
| ) -> Path: | |
| temp_output_path = self.get_depth_gray_temp_path(output_path) | |
| output_node = tree.nodes.new(type="CompositorNodeOutputFile") | |
| output_node.base_path = str(temp_output_path.parent) | |
| output_node.file_slots[0].path = self.get_temp_output_slot_prefix( | |
| temp_output_path | |
| ) | |
| output_node.format.file_format = "PNG" | |
| output_node.format.color_mode = "BW" | |
| output_node.format.color_depth = "8" | |
| tree.links.new(depth_preview_node.outputs[0], output_node.inputs[0]) | |
| return temp_output_path | |
| def configure_auxiliary_outputs( | |
| self, | |
| output_path: Path, | |
| render_passes: tuple[str, ...] | list[str], | |
| depth_mode: str, | |
| ) -> list[tuple[Path, Path]]: | |
| """Configure compositor outputs needed during the base render.""" | |
| view_layer = self.scene.view_layers["ViewLayer"] | |
| if "depth" in render_passes: | |
| view_layer.use_pass_z = True | |
| if "depth" not in render_passes: | |
| return [] | |
| tree = self.clear_compositor_tree() | |
| render_layers = tree.nodes.new(type="CompositorNodeRLayers") | |
| temp_outputs: list[tuple[Path, Path]] = [] | |
| depth_preview_node = self.build_depth_preview_node( | |
| tree, | |
| render_layers, | |
| self.scene.camera.data, | |
| depth_mode, | |
| ) | |
| temp_path = self.build_depth_vis_output( | |
| tree=tree, | |
| depth_preview_node=depth_preview_node, | |
| output_path=output_path, | |
| ) | |
| temp_outputs.append( | |
| (temp_path, self.get_depth_vis_output_path(output_path)) | |
| ) | |
| return temp_outputs | |
| def finalize_depth_output( | |
| self, temp_path: Path, output_path: Path | |
| ) -> None: | |
| """Convert the grayscale depth temp image into the final colored preview.""" | |
| if output_path.exists(): | |
| output_path.unlink() | |
| if not temp_path.exists(): | |
| raise FileNotFoundError(f"Depth file not generated: {temp_path}") | |
| try: | |
| depth = cv2.imread(str(temp_path), cv2.IMREAD_GRAYSCALE) | |
| if depth is None: | |
| raise FileNotFoundError( | |
| f"Failed to read depth image: {temp_path}" | |
| ) | |
| depth_uint8 = np.ascontiguousarray(depth) | |
| depth_colormap = cv2.applyColorMap(depth_uint8, cv2.COLORMAP_JET) | |
| if not cv2.imwrite(str(output_path), depth_colormap): | |
| raise RuntimeError( | |
| f"Failed to write depth visualization: {output_path}" | |
| ) | |
| finally: | |
| if temp_path.exists(): | |
| temp_path.unlink() | |
| def create_clean_material(self, material_name: str) -> bpy.types.Material: | |
| """Create a material with a cleared node tree.""" | |
| existing = bpy.data.materials.get(material_name) | |
| if existing is not None: | |
| bpy.data.materials.remove(existing, do_unlink=True) | |
| material = bpy.data.materials.new(name=material_name) | |
| material.use_nodes = True | |
| material.shadow_method = "NONE" | |
| tree = material.node_tree | |
| tree.nodes.clear() | |
| return material | |
| def create_view_normal_material(self) -> bpy.types.Material: | |
| material = self.create_clean_material("EmbodiedGenViewNormal") | |
| tree = material.node_tree | |
| geometry = tree.nodes.new(type="ShaderNodeNewGeometry") | |
| invert = tree.nodes.new(type="ShaderNodeVectorMath") | |
| invert.operation = "MULTIPLY" | |
| invert.inputs[1].default_value = (-1.0, -1.0, -1.0) | |
| face_mix = tree.nodes.new(type="ShaderNodeMix") | |
| face_mix.data_type = "VECTOR" | |
| face_mix.clamp_factor = True | |
| face_mix.factor_mode = "UNIFORM" | |
| view_transform = tree.nodes.new(type="ShaderNodeVectorTransform") | |
| view_transform.vector_type = "NORMAL" | |
| view_transform.convert_from = "WORLD" | |
| view_transform.convert_to = "CAMERA" | |
| flip_x = tree.nodes.new(type="ShaderNodeVectorMath") | |
| flip_x.operation = "MULTIPLY" | |
| flip_x.inputs[1].default_value = (-1.0, 1.0, -1.0) | |
| scale_bias = tree.nodes.new(type="ShaderNodeVectorMath") | |
| scale_bias.operation = "MULTIPLY_ADD" | |
| scale_bias.inputs[1].default_value = (0.5, 0.5, 0.5) | |
| scale_bias.inputs[2].default_value = (0.5, 0.5, 0.5) | |
| emission = tree.nodes.new(type="ShaderNodeEmission") | |
| output = tree.nodes.new(type="ShaderNodeOutputMaterial") | |
| tree.links.new(geometry.outputs["True Normal"], invert.inputs[0]) | |
| tree.links.new( | |
| geometry.outputs["Backfacing"], face_mix.inputs["Factor"] | |
| ) | |
| tree.links.new(geometry.outputs["True Normal"], face_mix.inputs["A"]) | |
| tree.links.new(invert.outputs["Vector"], face_mix.inputs["B"]) | |
| tree.links.new( | |
| face_mix.outputs["Result"], view_transform.inputs["Vector"] | |
| ) | |
| tree.links.new(view_transform.outputs["Vector"], flip_x.inputs[0]) | |
| tree.links.new(flip_x.outputs["Vector"], scale_bias.inputs[0]) | |
| tree.links.new(scale_bias.outputs["Vector"], emission.inputs["Color"]) | |
| tree.links.new(emission.outputs["Emission"], output.inputs["Surface"]) | |
| return material | |
| def create_mesh_preview_material(self) -> bpy.types.Material: | |
| material = self.create_clean_material("EmbodiedGenMeshPreview") | |
| tree = material.node_tree | |
| layer_weight = tree.nodes.new(type="ShaderNodeLayerWeight") | |
| layer_weight.inputs["Blend"].default_value = 0.35 | |
| base_ramp = tree.nodes.new(type="ShaderNodeValToRGB") | |
| base_ramp.color_ramp.elements[0].position = 0.1 | |
| base_ramp.color_ramp.elements[0].color = (0.78, 0.81, 0.87, 1.0) | |
| base_ramp.color_ramp.elements[1].position = 0.9 | |
| base_ramp.color_ramp.elements[1].color = (0.42, 0.48, 0.58, 1.0) | |
| emission = tree.nodes.new(type="ShaderNodeEmission") | |
| emission.inputs["Strength"].default_value = 0.82 | |
| output = tree.nodes.new(type="ShaderNodeOutputMaterial") | |
| tree.links.new(layer_weight.outputs["Facing"], base_ramp.inputs["Fac"]) | |
| tree.links.new(base_ramp.outputs["Color"], emission.inputs["Color"]) | |
| tree.links.new(emission.outputs["Emission"], output.inputs["Surface"]) | |
| return material | |
| def assign_instance_ids(self) -> dict[str, int]: | |
| """Assign stable per-object pass indices for instance segmentation.""" | |
| mesh_objects = sorted( | |
| self.get_mesh_objects(), key=lambda obj: obj.name | |
| ) | |
| if not mesh_objects: | |
| raise ValueError( | |
| "No mesh objects found for instance segmentation." | |
| ) | |
| instance_id_map: dict[str, int] = {} | |
| for instance_id, obj in enumerate(mesh_objects, start=1): | |
| obj.pass_index = instance_id | |
| instance_id_map[obj.name] = instance_id | |
| return instance_id_map | |
| def snapshot_object_pass_indices( | |
| self, | |
| ) -> list[tuple[bpy.types.Object, int]]: | |
| """Capture original object pass indices before a temporary override.""" | |
| return [(obj, obj.pass_index) for obj in self.get_mesh_objects()] | |
| def restore_object_pass_indices( | |
| self, original_pass_indices: list[tuple[bpy.types.Object, int]] | |
| ) -> None: | |
| """Restore object pass indices captured earlier.""" | |
| for obj, pass_index in original_pass_indices: | |
| obj.pass_index = pass_index | |
| def add_instance_seg_output_node( | |
| self, | |
| output_path: Path, | |
| ) -> tuple[bpy.types.NodeTree, list[bpy.types.Node]]: | |
| return self.add_exr_output_node( | |
| output_path=output_path, | |
| temp_output_path=self.get_instance_seg_temp_path(output_path), | |
| render_output_name="IndexOB", | |
| ) | |
| def add_flow_depth_output_node( | |
| self, | |
| output_path: Path, | |
| ) -> tuple[bpy.types.NodeTree, list[bpy.types.Node]]: | |
| return self.add_exr_output_node( | |
| output_path=output_path, | |
| temp_output_path=self.get_flow_depth_temp_path(output_path), | |
| render_output_name="Depth", | |
| ) | |
| def add_exr_output_node( | |
| self, | |
| output_path: Path, | |
| temp_output_path: Path, | |
| render_output_name: str, | |
| ) -> tuple[bpy.types.NodeTree, list[bpy.types.Node]]: | |
| """Attach a file-output EXR node for a specific render-layer socket.""" | |
| tree = self.scene.node_tree | |
| render_layers = tree.nodes.new(type="CompositorNodeRLayers") | |
| output_node = tree.nodes.new(type="CompositorNodeOutputFile") | |
| output_node.base_path = str(temp_output_path.parent) | |
| output_node.file_slots[0].path = self.get_temp_output_slot_prefix( | |
| temp_output_path | |
| ) | |
| output_node.format.file_format = "OPEN_EXR" | |
| output_node.format.color_mode = "RGB" | |
| output_node.format.color_depth = "32" | |
| output_node.format.exr_codec = "NONE" | |
| tree.links.new( | |
| render_layers.outputs[render_output_name], output_node.inputs[0] | |
| ) | |
| return tree, [render_layers, output_node] | |
| def load_temp_exr_first_channel( | |
| self, | |
| temp_path: Path, | |
| error_message: str, | |
| ) -> np.ndarray: | |
| """Load the first channel from a temporary EXR and flip to image space.""" | |
| if not temp_path.exists(): | |
| raise FileNotFoundError(error_message.format(path=temp_path)) | |
| temp_image = bpy.data.images.load(str(temp_path), check_existing=False) | |
| try: | |
| width, height = temp_image.size | |
| channels = temp_image.channels | |
| pixels = np.array(temp_image.pixels[:], dtype=np.float32) | |
| if pixels.size != width * height * channels: | |
| raise RuntimeError( | |
| f"Unexpected EXR image layout for {temp_path}." | |
| ) | |
| image = pixels.reshape(height, width, channels)[..., 0] | |
| return np.flipud(image) | |
| finally: | |
| bpy.data.images.remove(temp_image) | |
| def load_instance_seg_temp_output(self, temp_path: Path) -> np.ndarray: | |
| instance_seg = self.load_temp_exr_first_channel( | |
| temp_path, | |
| "Instance segmentation file not generated: {path}", | |
| ) | |
| return np.ascontiguousarray(np.rint(instance_seg).astype(np.uint16)) | |
| def load_flow_depth_temp_output(self, temp_path: Path) -> np.ndarray: | |
| depth = self.load_temp_exr_first_channel( | |
| temp_path, | |
| "Flow depth file not generated: {path}", | |
| ) | |
| depth = np.ascontiguousarray(depth.astype(np.float32)) | |
| depth[~np.isfinite(depth)] = 0.0 | |
| return depth | |
| def build_instance_seg_visualization( | |
| self, instance_seg: np.ndarray, max_instance_id: int | |
| ) -> np.ndarray: | |
| """Map instance ids to deterministic RGB colors for visualization.""" | |
| color_lut = np.zeros((max_instance_id + 1, 3), dtype=np.uint8) | |
| for instance_id in range(1, max_instance_id + 1): | |
| color_lut[instance_id] = ( | |
| (instance_id * 37) % 256, | |
| (instance_id * 67) % 256, | |
| (instance_id * 97) % 256, | |
| ) | |
| return color_lut[instance_seg] | |
| def save_instance_seg_outputs( | |
| self, | |
| output_path: Path, | |
| instance_seg: np.ndarray, | |
| ) -> None: | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| vis_output_path = self.get_instance_seg_vis_output_path(output_path) | |
| visualization = self.build_instance_seg_visualization( | |
| instance_seg=instance_seg, | |
| max_instance_id=int(instance_seg.max(initial=0)), | |
| ) | |
| if not cv2.imwrite(str(vis_output_path), visualization): | |
| raise RuntimeError( | |
| f"Failed to write instance segmentation preview: " | |
| f"{vis_output_path}" | |
| ) | |
| def build_flow_visualization(self, flow: np.ndarray) -> np.ndarray: | |
| flow_float = flow.astype(np.float32) | |
| magnitude, angle = cv2.cartToPolar( | |
| flow_float[..., 0], | |
| flow_float[..., 1], | |
| angleInDegrees=True, | |
| ) | |
| max_magnitude = float(np.percentile(magnitude, 99.0)) | |
| if max_magnitude <= 1e-6: | |
| max_magnitude = 1.0 | |
| magnitude_norm = np.clip(magnitude / max_magnitude, 0.0, 1.0) | |
| hsv = np.zeros((*flow.shape[:2], 3), dtype=np.float32) | |
| hsv[..., 0] = np.mod(angle, 360.0) | |
| hsv[..., 1] = magnitude_norm | |
| hsv[..., 2] = 1.0 | |
| bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR) | |
| return np.clip(bgr * 255.0, 0.0, 255.0).astype(np.uint8) | |
| def get_camera_intrinsics( | |
| self, camera: bpy.types.Object, width: int, height: int | |
| ) -> tuple[float, float, float, float]: | |
| camera_data = camera.data | |
| fx = width / (2.0 * math.tan(camera_data.angle_x * 0.5)) | |
| fy = height / (2.0 * math.tan(camera_data.angle_y * 0.5)) | |
| cx = (width - 1.0) * 0.5 | |
| cy = (height - 1.0) * 0.5 | |
| return fx, fy, cx, cy | |
| def build_camera_matrix_world( | |
| self, | |
| xyz: tuple[float, float, float] | list[float], | |
| rotation_deg: tuple[float, float, float] | list[float], | |
| ) -> Matrix: | |
| rotation = Euler(self.get_rotation_radians(rotation_deg), "XYZ") | |
| translation = Matrix.Translation(Vector(tuple(xyz))) | |
| return translation @ rotation.to_matrix().to_4x4() | |
| def compute_flow_from_depth( | |
| self, | |
| depth: np.ndarray, | |
| camera: bpy.types.Object, | |
| ) -> tuple[np.ndarray, np.ndarray]: | |
| """Project depth into a target camera and derive dense 2D flow.""" | |
| height, width = depth.shape | |
| fx, fy, cx, cy = self.get_camera_intrinsics(camera, width, height) | |
| valid = np.isfinite(depth) & (depth > 0.0) | |
| valid_mask = np.zeros((height, width), dtype=bool) | |
| if not np.any(valid): | |
| return np.zeros((height, width, 2), dtype=np.float32), valid_mask | |
| u_coords, v_coords = np.meshgrid( | |
| np.arange(width, dtype=np.float32), | |
| np.arange(height, dtype=np.float32), | |
| ) | |
| depth_valid = depth[valid] | |
| x_cam = ((u_coords[valid] - cx) / fx) * depth_valid | |
| y_cam = (-(v_coords[valid] - cy) / fy) * depth_valid | |
| z_cam = -depth_valid | |
| camera_points = np.stack( | |
| [x_cam, y_cam, z_cam, np.ones_like(z_cam)], axis=1 | |
| ) | |
| source_matrix_world = np.array(camera.matrix_world, dtype=np.float64) | |
| target_matrix_world = np.array( | |
| self.build_camera_matrix_world( | |
| self.flow_camera_xyz, | |
| self.flow_camera_rotation_deg, | |
| ), | |
| dtype=np.float64, | |
| ) | |
| target_world_to_camera = np.linalg.inv(target_matrix_world) | |
| world_points = camera_points @ source_matrix_world.T | |
| target_camera_points = world_points @ target_world_to_camera.T | |
| target_z = target_camera_points[:, 2] | |
| positive_depth = target_z < -1e-6 | |
| flow = np.zeros((height, width, 2), dtype=np.float32) | |
| if not np.any(positive_depth): | |
| return flow, valid_mask | |
| projected_x = ( | |
| fx | |
| * ( | |
| target_camera_points[positive_depth, 0] | |
| / -target_z[positive_depth] | |
| ) | |
| + cx | |
| ) | |
| projected_y = ( | |
| -fy | |
| * ( | |
| target_camera_points[positive_depth, 1] | |
| / -target_z[positive_depth] | |
| ) | |
| + cy | |
| ) | |
| in_frame = ( | |
| (projected_x >= 0.0) | |
| & (projected_x < width) | |
| & (projected_y >= 0.0) | |
| & (projected_y < height) | |
| ) | |
| if not np.any(in_frame): | |
| return flow, valid_mask | |
| source_x = u_coords[valid][positive_depth] | |
| source_y = v_coords[valid][positive_depth] | |
| flow_valid = np.stack( | |
| [ | |
| projected_x[in_frame] - source_x[in_frame], | |
| projected_y[in_frame] - source_y[in_frame], | |
| ], | |
| axis=1, | |
| ).astype(np.float32) | |
| flow_buffer = flow[valid] | |
| positive_depth_buffer = flow_buffer[positive_depth] | |
| positive_depth_buffer[in_frame] = flow_valid | |
| flow_buffer[positive_depth] = positive_depth_buffer | |
| flow[valid] = flow_buffer | |
| valid_mask_buffer = valid_mask[valid] | |
| positive_depth_mask = valid_mask_buffer[positive_depth] | |
| positive_depth_mask[in_frame] = True | |
| valid_mask_buffer[positive_depth] = positive_depth_mask | |
| valid_mask[valid] = valid_mask_buffer | |
| return flow, valid_mask | |
| def save_numpy_array(self, output_path: Path, array: np.ndarray) -> None: | |
| """Persist a NumPy array atomically to avoid partial writes.""" | |
| temp_output_path = output_path.with_suffix(".tmp.npy") | |
| if temp_output_path.exists(): | |
| temp_output_path.unlink() | |
| np.save(temp_output_path, array) | |
| temp_output_path.replace(output_path) | |
| def save_flow_outputs( | |
| self, | |
| output_path: Path, | |
| flow: np.ndarray, | |
| valid_mask: np.ndarray, | |
| ) -> None: | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| flow_output_path = self.get_flow_output_path(output_path) | |
| flow_valid_output_path = self.get_flow_valid_output_path(output_path) | |
| flow_vis_output_path = self.get_flow_vis_output_path(output_path) | |
| self.save_numpy_array(flow_output_path, flow) | |
| self.save_numpy_array(flow_valid_output_path, valid_mask) | |
| flow_vis = self.build_flow_visualization(flow) | |
| if not cv2.imwrite(str(flow_vis_output_path), flow_vis): | |
| raise RuntimeError( | |
| f"Failed to write flow preview: {flow_vis_output_path}" | |
| ) | |
| def get_preview_output_path( | |
| self, | |
| output_path: Path, | |
| render_pass_name: str, | |
| occurrence_index: int = 1, | |
| ) -> Path | None: | |
| preview_output_paths = { | |
| "rgb": output_path, | |
| "depth": self.get_depth_vis_output_path(output_path), | |
| "normal": self.get_normal_output_path(output_path), | |
| "mesh": self.get_mesh_output_path(output_path), | |
| "instance_seg": self.get_instance_seg_vis_output_path(output_path), | |
| "flow": self.get_flow_vis_output_path(output_path), | |
| } | |
| preview_output_path = preview_output_paths.get(render_pass_name) | |
| if preview_output_path is None: | |
| return None | |
| return self.build_occurrence_output_path( | |
| preview_output_path, occurrence_index | |
| ) | |
| def load_preview_image(self, image_path: Path) -> np.ndarray: | |
| image = cv2.imread(str(image_path), cv2.IMREAD_COLOR) | |
| if image is None: | |
| raise FileNotFoundError( | |
| f"Failed to read preview image: {image_path}" | |
| ) | |
| return image | |
| def collect_composite_images( | |
| self, output_path: Path | |
| ) -> list[tuple[str, np.ndarray]]: | |
| composite_images: list[tuple[str, np.ndarray]] = [] | |
| for ( | |
| render_pass_name, | |
| occurrence_index, | |
| ) in self.iter_render_pass_occurrences(): | |
| preview_output_path = self.get_preview_output_path( | |
| output_path, | |
| render_pass_name, | |
| occurrence_index, | |
| ) | |
| if preview_output_path is None or not preview_output_path.exists(): | |
| continue | |
| composite_images.append( | |
| ( | |
| render_pass_name, | |
| self.load_preview_image(preview_output_path), | |
| ) | |
| ) | |
| return composite_images | |
| def replicate_duplicate_preview_outputs(self, output_path: Path) -> None: | |
| """Materialize repeated preview outputs without re-rendering.""" | |
| for ( | |
| render_pass_name, | |
| occurrence_index, | |
| ) in self.iter_render_pass_occurrences(): | |
| if occurrence_index == 1: | |
| continue | |
| source_output_path = self.get_preview_output_path( | |
| output_path, render_pass_name | |
| ) | |
| duplicate_output_path = self.get_preview_output_path( | |
| output_path, | |
| render_pass_name, | |
| occurrence_index, | |
| ) | |
| if source_output_path is None or duplicate_output_path is None: | |
| continue | |
| if not source_output_path.exists(): | |
| raise FileNotFoundError( | |
| f"Preview output not generated for repeated pass " | |
| f"{render_pass_name}: {source_output_path}" | |
| ) | |
| if duplicate_output_path.exists(): | |
| duplicate_output_path.unlink() | |
| shutil.copyfile(source_output_path, duplicate_output_path) | |
| def get_composite_separator_boundaries( | |
| self, | |
| render_pass_names: list[str] | tuple[str, ...], | |
| boundaries: np.ndarray, | |
| ) -> list[float]: | |
| """Return separator boundaries for adjacent passes that differ.""" | |
| if len(boundaries) != len(render_pass_names) + 1: | |
| raise ValueError( | |
| "boundaries length must match the number of render passes + 1." | |
| ) | |
| separator_boundaries: list[float] = [] | |
| for index, boundary in enumerate(boundaries[1:-1], start=1): | |
| if render_pass_names[index - 1] == render_pass_names[index]: | |
| continue | |
| separator_boundaries.append(float(boundary)) | |
| return separator_boundaries | |
| def build_composite_image( | |
| self, | |
| images: list[np.ndarray], | |
| render_pass_names: list[str] | tuple[str, ...], | |
| separator_width_px: int = 6, | |
| ) -> np.ndarray: | |
| if not images: | |
| raise ValueError("At least one image is required for composition.") | |
| if len(images) != len(render_pass_names): | |
| raise ValueError( | |
| "images and render_pass_names must have the same length." | |
| ) | |
| base_height, base_width = images[0].shape[:2] | |
| resized_images = [ | |
| ( | |
| image | |
| if image.shape[:2] == (base_height, base_width) | |
| else cv2.resize( | |
| image, | |
| (base_width, base_height), | |
| interpolation=cv2.INTER_LINEAR, | |
| ) | |
| ) | |
| for image in images | |
| ] | |
| x_coords = np.broadcast_to( | |
| np.arange(base_width, dtype=np.float32), | |
| (base_height, base_width), | |
| ) | |
| y_coords = np.broadcast_to( | |
| np.arange(base_height, dtype=np.float32)[:, None], | |
| (base_height, base_width), | |
| ) | |
| slash_slope = 0.28 * (base_width / base_height) | |
| diagonal_coord = x_coords + y_coords * slash_slope | |
| diagonal_min = float(diagonal_coord.min()) | |
| diagonal_max = float(diagonal_coord.max()) | |
| boundaries = np.linspace( | |
| diagonal_min, diagonal_max, len(resized_images) + 1 | |
| ) | |
| composite = np.zeros_like(resized_images[0]) | |
| region_indices = np.digitize( | |
| diagonal_coord, boundaries[1:-1], right=False | |
| ) | |
| for image_index, image in enumerate(resized_images): | |
| composite[region_indices == image_index] = image[ | |
| region_indices == image_index | |
| ] | |
| slash_mask = np.zeros((base_height, base_width), dtype=bool) | |
| separator_boundaries = self.get_composite_separator_boundaries( | |
| render_pass_names, boundaries | |
| ) | |
| for boundary in separator_boundaries: | |
| slash_mask |= ( | |
| np.abs(diagonal_coord - boundary) <= separator_width_px | |
| ) | |
| composite[slash_mask] = 255 | |
| return composite | |
| def save_composite_preview(self, output_path: Path) -> None: | |
| composite_images = self.collect_composite_images(output_path) | |
| if len(composite_images) < 2: | |
| return | |
| composite_output_path = self.get_composite_output_path( | |
| tuple(render_pass_name for render_pass_name, _ in composite_images) | |
| ) | |
| composite_image = self.build_composite_image( | |
| [image for _, image in composite_images], | |
| [render_pass_name for render_pass_name, _ in composite_images], | |
| ) | |
| if not cv2.imwrite(str(composite_output_path), composite_image): | |
| raise RuntimeError( | |
| f"Failed to write composite preview: {composite_output_path}" | |
| ) | |
| def render_flow_pass(self, output_path: Path) -> None: | |
| self.validate_flow_args() | |
| camera = self.scene.camera | |
| if camera is None: | |
| raise ValueError("Scene camera is required for flow rendering.") | |
| temp_output_path = self.get_flow_depth_temp_path(output_path) | |
| def finalize_flow_output(depth: np.ndarray) -> None: | |
| flow, valid_mask = self.compute_flow_from_depth( | |
| depth=depth, camera=camera | |
| ) | |
| self.save_flow_outputs( | |
| output_path=output_path, | |
| flow=flow, | |
| valid_mask=valid_mask, | |
| ) | |
| self.render_temp_output_pass( | |
| output_path=output_path, | |
| temp_output_path=temp_output_path, | |
| add_output_node=self.add_flow_depth_output_node, | |
| load_temp_output=self.load_flow_depth_temp_output, | |
| finalize_output=finalize_flow_output, | |
| color_mode="RGB", | |
| color_depth="8", | |
| enable_depth_pass=True, | |
| ) | |
| def render_normal_pass(self, output_path: Path) -> None: | |
| normal_output_path = self.get_normal_output_path(output_path) | |
| self.render_material_override_pass( | |
| preview_output_path=normal_output_path, | |
| material_factory=self.create_view_normal_material, | |
| color_mode="RGB", | |
| ) | |
| def render_mesh_pass(self, output_path: Path) -> None: | |
| mesh_output_path = self.get_mesh_output_path(output_path) | |
| self.render_material_override_pass( | |
| preview_output_path=mesh_output_path, | |
| material_factory=self.create_mesh_preview_material, | |
| color_mode="RGBA", | |
| ) | |
| def render_instance_seg_pass(self, output_path: Path) -> None: | |
| original_pass_indices = self.snapshot_object_pass_indices() | |
| self.assign_instance_ids() | |
| temp_output_path = self.get_instance_seg_temp_path(output_path) | |
| def finalize_instance_seg_output(instance_seg: np.ndarray) -> None: | |
| self.save_instance_seg_outputs( | |
| output_path=output_path, | |
| instance_seg=instance_seg, | |
| ) | |
| try: | |
| self.render_temp_output_pass( | |
| output_path=output_path, | |
| temp_output_path=temp_output_path, | |
| add_output_node=self.add_instance_seg_output_node, | |
| load_temp_output=self.load_instance_seg_temp_output, | |
| finalize_output=finalize_instance_seg_output, | |
| color_mode="BW", | |
| color_depth="16", | |
| enable_object_index_pass=True, | |
| ) | |
| finally: | |
| self.restore_object_pass_indices(original_pass_indices) | |
| def render(self, output_path: Path) -> None: | |
| """Run the requested render passes and write final outputs.""" | |
| self.scene.use_nodes = False | |
| auxiliary_outputs: list[tuple[Path, Path]] = [] | |
| needs_base_render = bool({"rgb", "depth"} & set(self.render_passes)) | |
| if "depth" in self.render_passes: | |
| auxiliary_outputs = self.configure_auxiliary_outputs( | |
| output_path, self.render_passes, self.depth_mode | |
| ) | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| if "rgb" in self.render_passes: | |
| self.scene.render.filepath = str(output_path) | |
| if needs_base_render: | |
| bpy.ops.render.render(write_still="rgb" in self.render_passes) | |
| for temp_path, final_path in auxiliary_outputs: | |
| if final_path == self.get_depth_vis_output_path(output_path): | |
| self.finalize_depth_output(temp_path, final_path) | |
| continue | |
| raise ValueError(f"Unsupported render output target: {final_path}") | |
| if auxiliary_outputs: | |
| self.clear_compositor_tree() | |
| self.scene.use_nodes = False | |
| if "normal" in self.render_passes: | |
| self.render_normal_pass(output_path) | |
| if "mesh" in self.render_passes: | |
| self.render_mesh_pass(output_path) | |
| if "instance_seg" in self.render_passes: | |
| self.render_instance_seg_pass(output_path) | |
| if "flow" in self.render_passes: | |
| self.render_flow_pass(output_path) | |
| self.replicate_duplicate_preview_outputs(output_path) | |
| self.save_composite_preview(output_path) | |
| def run(self) -> None: | |
| """Prepare the scene, configure rendering, and execute all passes.""" | |
| rgb_output_path = self.get_rgb_output_path() | |
| self.output_dir.mkdir(parents=True, exist_ok=True) | |
| self.clear_scene() | |
| self.import_usd() | |
| self.validate_glb_args() | |
| imported_glb_objects = self.import_glb_asset() | |
| self.place_glb_asset(imported_glb_objects) | |
| min_corner, max_corner = self.get_scene_bbox() | |
| center = (min_corner + max_corner) * 0.5 | |
| diagonal = (max_corner - min_corner).length | |
| self.create_camera() | |
| self.ensure_lighting(diagonal, center, max_corner.z) | |
| world_created = self.ensure_world() | |
| self.add_fill_light( | |
| diagonal, | |
| center, | |
| max_corner.z, | |
| energy=self.fill_light_energy, | |
| ) | |
| if world_created: | |
| self.add_light_rig( | |
| diagonal, | |
| center, | |
| max_corner.z, | |
| area_energy=1500.0, | |
| sun_energy=0.35, | |
| prefix="Fill", | |
| ) | |
| self.configure_color_management() | |
| self.configure_cycles() | |
| with tempfile.TemporaryDirectory( | |
| prefix="render_usd_", dir=None | |
| ) as temp_dir: | |
| self.temp_dir = Path(temp_dir) | |
| self.render(rgb_output_path) | |
| self.temp_dir = None | |
| logger.info("Rendered outputs to %s", self.output_dir) | |
| def main() -> None: | |
| logging.basicConfig(level=logging.INFO) | |
| args = _parse_args() | |
| RenderUsd.from_args(args).run() | |
| if __name__ == "__main__": | |
| main() | |