# Project EmbodiedGen # # Copyright (c) 2025 Horizon Robotics. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. See the License for the specific language governing # permissions and limitations under the License. from __future__ import annotations import logging import os import random import re import xml.etree.ElementTree as ET from dataclasses import dataclass, field from shutil import copy2, copytree from typing import TYPE_CHECKING, Literal import matplotlib.pyplot as plt import numpy as np import trimesh import tyro from scipy.spatial.transform import Rotation as R from shapely.affinity import translate from shapely.geometry import MultiPoint, MultiPolygon, Point, Polygon from shapely.ops import unary_union if TYPE_CHECKING: from matplotlib.axes import Axes logging.basicConfig( format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO, ) logger = logging.getLogger(__name__) # Type aliases Geometry = Polygon | MultiPolygon # Constants DEFAULT_MESH_SAMPLE_NUM = 50000 DEFAULT_IGNORE_ITEMS = ("ceiling", "light", "exterior") DEFAULT_ROTATION_RPY = (1.57, 0.0, 0.0) DEFAULT_MAX_PLACEMENT_ATTEMPTS = 2000 __all__ = [ "points_to_polygon", "get_actionable_surface", "FloorplanVisualizer", "UrdfSemanticInfoCollector", "Scene3DGenConfig", ] @dataclass class Scene3DGenConfig: """Configuration for 3D scene generation and floorplan visualization.""" urdf_path: str """Path to the input URDF scene file.""" output_path: str """Path to save the floorplan visualization image.""" # Optional paths usd_path: str | None = None """Optional path to the USD scene file for USD export.""" asset_path: str | None = None """Optional path to the asset mesh to add to the scene.""" # Instance configuration instance_key: str = "inserted_object" """Unique key for the added instance.""" in_room: str | None = None """Optional room name to constrain asset placement.""" on_instance: str | None = None """Optional instance name to place the asset on top of.""" place_strategy: Literal["top", "random"] = "random" """Placement strategy for the asset.""" rotation_rpy: tuple[float, float, float] = DEFAULT_ROTATION_RPY """Rotation in roll-pitch-yaw (radians).""" # Collector configuration ignore_items: list[str] = field( default_factory=lambda: list(DEFAULT_IGNORE_ITEMS) ) """List of item name patterns to ignore during parsing.""" mesh_sample_num: int = DEFAULT_MESH_SAMPLE_NUM """Number of points to sample from meshes.""" max_placement_attempts: int = DEFAULT_MAX_PLACEMENT_ATTEMPTS """Maximum attempts for asset placement.""" # Output flags update_urdf: bool = True """Whether to update and save the URDF file.""" update_usd: bool = True """Whether to update and save the USD file.""" def points_to_polygon( points: np.ndarray, smooth_thresh: float = 0.2, scanline_step: float = 0.01, ) -> Polygon: """Convert point clouds into polygon contours using sweep line algorithm. Args: points: Array of 2D points with shape (N, 2). smooth_thresh: Buffer threshold for smoothing the polygon. scanline_step: Step size for the scanline sweep. Returns: A Shapely Polygon representing the contour of the point cloud. """ if len(points) == 0: return Polygon() ys = points[:, 1] y_min, y_max = ys.min(), ys.max() y_values = np.arange(y_min, y_max + scanline_step, scanline_step) upper: list[list[float]] = [] lower: list[list[float]] = [] for y in y_values: pts_in_strip = points[(ys >= y) & (ys < y + scanline_step)] if len(pts_in_strip) == 0: continue xs = pts_in_strip[:, 0] upper.append([xs.max(), y]) lower.append([xs.min(), y]) contour = upper + lower[::-1] if len(contour) < 3: return Polygon() poly = Polygon(contour) return poly.buffer(smooth_thresh).buffer(-smooth_thresh) def get_actionable_surface( mesh: trimesh.Trimesh, tol_angle: int = 10, tol_z: float = 0.02, area_tolerance: float = 0.15, place_strategy: Literal["top", "random"] = "random", ) -> tuple[float, Geometry]: """Extract the actionable (placeable) surface from a mesh. Finds upward-facing surfaces and returns the best one based on the placement strategy. Args: mesh: The input trimesh object. tol_angle: Angle tolerance in degrees for detecting up-facing normals. tol_z: Z-coordinate tolerance for clustering faces. area_tolerance: Tolerance for selecting candidate surfaces by area. place_strategy: Either "top" (highest surface) or "random". Returns: A tuple of (z_height, surface_polygon) representing the selected actionable surface. """ up_vec = np.array([0, 0, 1]) dots = np.dot(mesh.face_normals, up_vec) valid_mask = dots > np.cos(np.deg2rad(tol_angle)) if not np.any(valid_mask): logger.warning( "No up-facing surfaces found. Falling back to bounding box top." ) verts = mesh.vertices[:, :2] return mesh.bounds[1][2], MultiPoint(verts).convex_hull valid_faces_indices = np.where(valid_mask)[0] face_z = mesh.triangles_center[valid_mask][:, 2] face_areas = mesh.area_faces[valid_mask] z_clusters = _cluster_faces_by_z( face_z, face_areas, valid_faces_indices, tol_z ) if not z_clusters: return mesh.bounds[1][2], MultiPoint(mesh.vertices[:, :2]).convex_hull selected_z, selected_data = _select_surface_cluster( z_clusters, area_tolerance, place_strategy ) cluster_faces = mesh.faces[selected_data["indices"]] temp_mesh = trimesh.Trimesh(vertices=mesh.vertices, faces=cluster_faces) samples, _ = trimesh.sample.sample_surface(temp_mesh, 10000) if len(samples) < 3: logger.warning( f"Failed to sample enough points on layer Z={selected_z}. " "Returning empty polygon." ) return selected_z, Polygon() surface_poly = MultiPoint(samples[:, :2]).convex_hull return selected_z, surface_poly def _cluster_faces_by_z( face_z: np.ndarray, face_areas: np.ndarray, face_indices: np.ndarray, tol_z: float, ) -> dict[float, dict]: """Cluster mesh faces by their Z coordinate. Args: face_z: Z coordinates of face centers. face_areas: Areas of each face. face_indices: Original indices of the faces. tol_z: Tolerance for Z clustering. Returns: Dictionary mapping Z values to cluster data (area and indices). """ z_clusters: dict[float, dict] = {} for i, z in enumerate(face_z): key = round(z / tol_z) * tol_z if key not in z_clusters: z_clusters[key] = {"area": 0.0, "indices": []} z_clusters[key]["area"] += face_areas[i] z_clusters[key]["indices"].append(face_indices[i]) return z_clusters def _select_surface_cluster( z_clusters: dict[float, dict], area_tolerance: float, place_strategy: Literal["top", "random"], ) -> tuple[float, dict]: """Select the best surface cluster based on strategy. Args: z_clusters: Dictionary of Z clusters with area and indices. area_tolerance: Tolerance for candidate selection by area. place_strategy: Either "top" or "random". Returns: Tuple of (selected_z, cluster_data). """ max_area = max(c["area"] for c in z_clusters.values()) candidates = [ (z, data) for z, data in z_clusters.items() if data["area"] >= max_area * (1.0 - area_tolerance) ] if not candidates: best_item = max(z_clusters.items(), key=lambda x: x[1]["area"]) candidates = [best_item] if place_strategy == "random": selected_z, selected_data = random.choice(candidates) logger.info( f"Strategy 'random': Selected Z={selected_z:.3f} " f"(Area={selected_data['area']:.3f}) " f"from {len(candidates)} candidates." ) else: candidates.sort(key=lambda x: x[0], reverse=True) selected_z, selected_data = candidates[0] logger.info( f"Strategy 'top': Selected highest Z={selected_z:.3f} " f"(Area={selected_data['area']:.3f})" ) return selected_z, selected_data class FloorplanVisualizer: """Static utility class for visualizing floorplans.""" @staticmethod def draw_poly(ax: Axes, poly: Geometry, **kwargs) -> None: """Draw a polygon or multi-polygon on matplotlib axes. Args: ax: Matplotlib axes object. poly: Shapely Polygon or MultiPolygon to draw. **kwargs: Additional arguments passed to ax.fill(). """ if poly.is_empty: return geoms = poly.geoms if hasattr(poly, "geoms") else [poly] color = kwargs.pop("color", None) if color is None: cmap = plt.get_cmap("tab10") colors = [cmap(i) for i in range(len(geoms))] else: colors = [color] * len(geoms) for i, p in enumerate(geoms): if p.is_empty: continue x, y = p.exterior.xy ax.fill(x, y, facecolor=colors[i], **kwargs) @classmethod def plot( cls, rooms: dict[str, Geometry], footprints: dict[str, Geometry], occ_area: Geometry, save_path: str, ) -> None: """Generate and save a floorplan visualization. Args: rooms: Dictionary mapping room names to floor polygons. footprints: Dictionary mapping object names to footprint polygons. occ_area: Union of all occupied areas. save_path: Path to save the output image. """ fig, ax = plt.subplots(figsize=(10, 10)) ax.set_aspect("equal") cmap_rooms = plt.get_cmap("Pastel1") cls._draw_room_floors(ax, rooms, cmap_rooms) cls._draw_occupied_area(ax, occ_area) cls._draw_footprint_outlines(ax, footprints) cls._draw_footprint_labels(ax, footprints) cls._draw_room_labels(ax, rooms) cls._configure_axes(ax, rooms, occ_area) plt.tight_layout() plt.savefig(save_path, dpi=300) plt.close(fig) logger.info(f"Saved floorplan to: {save_path}") @classmethod def _draw_room_floors( cls, ax: Axes, rooms: dict[str, Geometry], cmap: plt.cm.ScalarMappable, ) -> None: """Draw colored room floor polygons (Layer 1).""" for i, (name, poly) in enumerate(rooms.items()): color = cmap(i % cmap.N) cls.draw_poly( ax, poly, color=color, alpha=0.6, edgecolor="black", linestyle="--", zorder=1, ) @classmethod def _draw_occupied_area(cls, ax: Axes, occ_area: Geometry) -> None: """Draw the occupied area overlay (Layer 2).""" cls.draw_poly( ax, occ_area, color="tab:blue", alpha=0.3, lw=0, zorder=2, ) @staticmethod def _draw_footprint_outlines( ax: Axes, footprints: dict[str, Geometry], ) -> None: """Draw footprint outlines (Layer 3).""" for poly in footprints.values(): if poly.is_empty: continue geoms = poly.geoms if hasattr(poly, "geoms") else [poly] for p in geoms: ax.plot(*p.exterior.xy, "--", lw=0.8, color="gray", zorder=3) @staticmethod def _draw_footprint_labels( ax: Axes, footprints: dict[str, Geometry], ) -> None: """Draw footprint text labels (Layer 4).""" for name, poly in footprints.items(): if poly.is_empty: continue ax.text( poly.centroid.x, poly.centroid.y, name, fontsize=5, ha="center", va="center", bbox={ "facecolor": "white", "alpha": 0.5, "edgecolor": "none", "pad": 0.1, }, zorder=4, ) @staticmethod def _draw_room_labels(ax: Axes, rooms: dict[str, Geometry]) -> None: """Draw room text labels (Layer 5).""" for name, poly in rooms.items(): if poly.is_empty: continue label = name.replace("_floor", "") ax.text( poly.centroid.x, poly.centroid.y, label, fontsize=9, color="black", weight="bold", ha="center", va="center", bbox={ "facecolor": "lightgray", "alpha": 0.7, "edgecolor": "black", "boxstyle": "round,pad=0.3", }, zorder=5, ) @staticmethod def _configure_axes( ax: Axes, rooms: dict[str, Geometry], occ_area: Geometry, ) -> None: """Configure axes limits and labels.""" total_geom = unary_union(list(rooms.values()) + [occ_area]) if total_geom.is_empty: minx, miny, maxx, maxy = -1, -1, 1, 1 else: minx, miny, maxx, maxy = total_geom.bounds margin_x = max((maxx - minx) * 0.05, 0.5) margin_y = max((maxy - miny) * 0.05, 0.5) ax.set_xlim(minx - margin_x, maxx + margin_x) ax.set_ylim(miny - margin_y, maxy + margin_y) ax.set_title("Floorplan Analysis", fontsize=14) ax.set_xlabel("X (m)") ax.set_ylabel("Y (m)") class UrdfSemanticInfoCollector: """Collector for URDF semantic information. Parses URDF files to extract room layouts, object footprints, and provides methods for adding new instances and updating URDF/USD files. Attributes: mesh_sample_num: Number of points to sample from meshes. ignore_items: List of item name patterns to ignore. instances: Dictionary of instance name to footprint polygon. instance_meta: Dictionary of instance metadata (mesh path, pose). rooms: Dictionary of room polygons. footprints: Dictionary of object footprints. occ_area: Union of all occupied areas. floor_union: Union of all floor polygons. """ def __init__( self, mesh_sample_num: int = DEFAULT_MESH_SAMPLE_NUM, ignore_items: list[str] | None = None, ) -> None: """Initialize the collector. Args: mesh_sample_num: Number of points to sample from meshes. ignore_items: List of item name patterns to ignore during parsing. """ self.mesh_sample_num = mesh_sample_num self.ignore_items = ignore_items or list(DEFAULT_IGNORE_ITEMS) self.instances: dict[str, Polygon] = {} self.instance_meta: dict[str, dict] = {} self.rooms: dict[str, Geometry] = {} self.footprints: dict[str, Geometry] = {} self.occ_area: Geometry = Polygon() self.floor_union: Geometry = Polygon() self.urdf_path: str = "" self._tree: ET.ElementTree | None = None self._root: ET.Element | None = None def _get_transform( self, joint_elem: ET.Element, ) -> tuple[np.ndarray, np.ndarray]: """Extract transform (xyz, rpy) from a joint element. Args: joint_elem: XML Element representing a URDF joint. Returns: Tuple of (xyz, rpy) arrays. """ origin = joint_elem.find("origin") if origin is not None: xyz = np.fromstring(origin.attrib.get("xyz", "0 0 0"), sep=" ") rpy = np.fromstring(origin.attrib.get("rpy", "0 0 0"), sep=" ") else: xyz, rpy = np.zeros(3), np.zeros(3) return xyz, rpy def _process_mesh_to_poly( self, mesh_path: str, xyz: np.ndarray, rpy: np.ndarray, ) -> Polygon: """Load mesh file and convert to 2D footprint polygon. Args: mesh_path: Path to the mesh file. xyz: Translation vector. rpy: Rotation in roll-pitch-yaw. Returns: Footprint polygon of the mesh. """ if not os.path.exists(mesh_path): return Polygon() mesh = trimesh.load(mesh_path, force="mesh", skip_materials=True) matrix = np.eye(4) matrix[:3, :3] = R.from_euler("xyz", rpy).as_matrix() matrix[:3, 3] = xyz mesh.apply_transform(matrix) verts = np.asarray(mesh.sample(self.mesh_sample_num))[:, :2] return points_to_polygon(verts) def collect(self, urdf_path: str) -> None: """Parse URDF file and collect semantic information. Args: urdf_path: Path to the URDF file. """ logger.info(f"Collecting URDF semantic info from {urdf_path}") self.urdf_path = urdf_path urdf_dir = os.path.dirname(urdf_path) self._tree = ET.parse(urdf_path) self._root = self._tree.getroot() link_transforms = self._build_link_transforms() self._process_links(urdf_dir, link_transforms) self._update_internal_state() def _build_link_transforms( self, ) -> dict[str, tuple[np.ndarray, np.ndarray]]: """Build mapping from link names to their transforms. Returns: Dictionary mapping link names to (xyz, rpy) tuples. """ link_transforms: dict[str, tuple[np.ndarray, np.ndarray]] = {} for joint in self._tree.findall("joint"): child = joint.find("child") if child is not None: link_name = child.attrib["link"] link_transforms[link_name] = self._get_transform(joint) return link_transforms def _process_links( self, urdf_dir: str, link_transforms: dict[str, tuple[np.ndarray, np.ndarray]], ) -> None: """Process all links in the URDF tree. Args: urdf_dir: Directory containing the URDF file. link_transforms: Dictionary of link transforms. """ self.instances = {} self.instance_meta = {} wall_polys: list[Polygon] = [] for link in self._tree.findall("link"): name = link.attrib.get("name", "").lower() if any(ign in name for ign in self.ignore_items): continue visual = link.find("visual") if visual is None: continue mesh_node = visual.find("geometry/mesh") if mesh_node is None: continue mesh_path = os.path.join(urdf_dir, mesh_node.attrib["filename"]) default_transform = (np.zeros(3), np.zeros(3)) xyz, rpy = link_transforms.get( link.attrib["name"], default_transform ) poly = self._process_mesh_to_poly(mesh_path, xyz, rpy) if poly.is_empty: continue if "wall" in name: wall_polys.append(poly) else: key = self._process_safe_key_robust(link.attrib["name"]) self.instances[key] = poly self.instance_meta[key] = { "mesh_path": mesh_path, "xyz": xyz, "rpy": rpy, } self.instances["walls"] = unary_union(wall_polys) def _update_internal_state(self) -> None: """Update derived state (rooms, footprints, occupied area).""" self.rooms = { k: v for k, v in self.instances.items() if "_floor" in k.lower() and not v.is_empty } self.footprints = { k: v for k, v in self.instances.items() if k != "walls" and "_floor" not in k.lower() and "rug" not in k.lower() and not v.is_empty } self.occ_area = unary_union(list(self.footprints.values())) self.floor_union = unary_union(list(self.rooms.values())) def _process_safe_key_robust(self, name: str) -> str: """Convert a link name to a safe, normalized key. Args: name: Original link name. Returns: Normalized key string. """ if name.endswith("_floor"): parts = name.split("_") return "_".join(parts[:-2] + ["floor"]) if "Factory" in name: # Handle infinigen naming convention prefix = name.split("Factory")[0] suffix = f"_{name.split('_')[-1]}" else: prefix, suffix = name, "" res = prefix.replace(" ", "_") res = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", res) res = res.lower() res = re.sub(r"_+", "_", res).strip("_ ") return f"{res}{suffix}" def add_instance( self, asset_path: str, instance_key: str, in_room: str | None = None, on_instance: str | None = None, rotation_rpy: tuple[float, float, float] = DEFAULT_ROTATION_RPY, n_max_attempt: int = DEFAULT_MAX_PLACEMENT_ATTEMPTS, place_strategy: Literal["top", "random"] = "random", ) -> list[float] | None: """Add a new instance to the scene with automatic placement. Args: asset_path: Path to the asset mesh file. instance_key: Unique key for the new instance. in_room: Optional room name to constrain placement. on_instance: Optional instance name to place on top of. rotation_rpy: Initial rotation in roll-pitch-yaw. n_max_attempt: Maximum placement attempts. place_strategy: Either "top" or "random". Returns: List [x, y, z] of the placed instance center, or None if failed. Raises: ValueError: If instance_key already exists or room/instance not found. """ if instance_key in self.instances: raise ValueError(f"Instance key '{instance_key}' already exists.") room_poly = self._resolve_room_polygon(in_room) target_area, obstacles, base_z = self._resolve_placement_target( on_instance, room_poly, place_strategy ) if target_area.is_empty: logger.error("Target area for placement is empty.") return None mesh = trimesh.load(asset_path, force="mesh") mesh.apply_transform( trimesh.transformations.euler_matrix(*rotation_rpy, "sxyz") ) verts = np.asarray(mesh.sample(self.mesh_sample_num))[:, :2] base_poly = points_to_polygon(verts) centroid = base_poly.centroid base_poly = translate(base_poly, xoff=-centroid.x, yoff=-centroid.y) placement = self._try_place_polygon( base_poly, target_area, obstacles, n_max_attempt ) if placement is None: logger.error( f"Failed to place {asset_path} after {n_max_attempt} attempts." ) return None x, y, candidate = placement self.instances[instance_key] = candidate final_z = base_z + mesh.extents[2] / 2 self._update_internal_state() return [round(v, 4) for v in (x, y, final_z)] def _resolve_room_polygon(self, in_room: str | None) -> Geometry | None: """Resolve room name to polygon. Args: in_room: Room name query string. Returns: Room polygon or None if not specified. Raises: ValueError: If room not found. """ if in_room is None: return None query_room = in_room.lower() room_matches = [ k for k in self.rooms.keys() if query_room in k.lower() ] if not room_matches: raise ValueError(f"Room '{in_room}' not found.") return unary_union([self.rooms[k] for k in room_matches]) def _resolve_placement_target( self, on_instance: str | None, room_poly: Geometry | None, place_strategy: Literal["top", "random"], ) -> tuple[Geometry, Geometry, float]: """Resolve the target placement area and obstacles. Args: on_instance: Instance name to place on. room_poly: Room polygon constraint. place_strategy: Placement strategy. Returns: Tuple of (target_area, obstacles, base_z_height). Raises: ValueError: If on_instance not found. """ if on_instance is None: if room_poly is not None: return room_poly, self.occ_area, 0.0 return self.floor_union, self.occ_area, 0.0 query_obj = on_instance.lower() possible_matches = [ k for k in self.instances.keys() if query_obj in k.lower() and k != "walls" ] if room_poly is not None: possible_matches = [ k for k in possible_matches if self.instances[k].intersects(room_poly) ] if not possible_matches: location_msg = f" in room '{on_instance}'" if room_poly else "" raise ValueError( f"No instance matching '{on_instance}' found{location_msg}." ) if place_strategy == "random": target_parent_key = random.choice(possible_matches) else: target_parent_key = possible_matches[0] if len(possible_matches) > 1: logger.warning( f"Multiple matches for '{on_instance}': {possible_matches}. " f"Using '{target_parent_key}'." ) meta = self.instance_meta[target_parent_key] parent_mesh = trimesh.load(meta["mesh_path"], force="mesh") matrix = np.eye(4) matrix[:3, :3] = R.from_euler("xyz", meta["rpy"]).as_matrix() matrix[:3, 3] = meta["xyz"] parent_mesh.apply_transform(matrix) best_z, surface_poly = get_actionable_surface( parent_mesh, place_strategy=place_strategy ) obstacles = self.occ_area.difference(self.instances[target_parent_key]) logger.info(f"Placing on '{target_parent_key}' (Z={best_z:.3f})") return surface_poly, obstacles, best_z def _try_place_polygon( self, base_poly: Polygon, target_area: Geometry, obstacles: Geometry, n_max_attempt: int, ) -> tuple[float, float, Polygon] | None: """Try to place polygon in target area avoiding obstacles. Args: base_poly: Polygon to place (centered at origin). target_area: Area where placement is allowed. obstacles: Areas to avoid. n_max_attempt: Maximum attempts. Returns: Tuple of (x, y, placed_polygon) or None if failed. """ minx, miny, maxx, maxy = target_area.bounds for _ in range(n_max_attempt): x = np.random.uniform(minx, maxx) y = np.random.uniform(miny, maxy) candidate = translate(base_poly, xoff=x, yoff=y) if target_area.contains(candidate) and not candidate.intersects( obstacles ): return x, y, candidate return None def update_urdf_info( self, output_path: str, instance_key: str, visual_mesh_path: str, collision_mesh_path: str | None = None, trans_xyz: tuple[float, float, float] = (0, 0, 0), rot_rpy: tuple[float, float, float] = DEFAULT_ROTATION_RPY, joint_type: str = "fixed", ) -> None: """Add a new link to the URDF tree and save. Args: output_path: Path to save the updated URDF. instance_key: Name for the new link. visual_mesh_path: Path to the visual mesh file. collision_mesh_path: Optional path to collision mesh. trans_xyz: Translation (x, y, z). rot_rpy: Rotation (roll, pitch, yaw). joint_type: Type of joint (e.g., "fixed"). """ if self._root is None: return logger.info(f"Updating URDF for instance '{instance_key}'.") urdf_dir = os.path.dirname(self.urdf_path) # Copy mesh files copytree( os.path.dirname(visual_mesh_path), f"{urdf_dir}/{instance_key}", dirs_exist_ok=True, ) visual_rel_path = ( f"{instance_key}/{os.path.basename(visual_mesh_path)}" ) collision_rel_path = None if collision_mesh_path is not None: copytree( os.path.dirname(collision_mesh_path), f"{urdf_dir}/{instance_key}", dirs_exist_ok=True, ) collision_rel_path = ( f"{instance_key}/{os.path.basename(collision_mesh_path)}" ) # Create link element link = ET.SubElement(self._root, "link", attrib={"name": instance_key}) visual = ET.SubElement(link, "visual") v_geo = ET.SubElement(visual, "geometry") ET.SubElement(v_geo, "mesh", attrib={"filename": visual_rel_path}) if collision_rel_path is not None: collision = ET.SubElement(link, "collision") c_geo = ET.SubElement(collision, "geometry") ET.SubElement( c_geo, "mesh", attrib={"filename": collision_rel_path} ) # Create joint element joint_name = f"joint_{instance_key}" joint = ET.SubElement( self._root, "joint", attrib={"name": joint_name, "type": joint_type}, ) ET.SubElement(joint, "parent", attrib={"link": "base"}) ET.SubElement(joint, "child", attrib={"link": instance_key}) xyz_str = f"{trans_xyz[0]:.4f} {trans_xyz[1]:.4f} {trans_xyz[2]:.4f}" rpy_str = f"{rot_rpy[0]:.4f} {rot_rpy[1]:.4f} {rot_rpy[2]:.4f}" ET.SubElement(joint, "origin", attrib={"xyz": xyz_str, "rpy": rpy_str}) self.save_urdf(output_path) def update_usd_info( self, usd_path: str, output_path: str, instance_key: str, visual_mesh_path: str, trans_xyz: list[float], rot_rpy: tuple[float, float, float] = DEFAULT_ROTATION_RPY, ) -> None: """Add a mesh instance to an existing USD file. Args: usd_path: Path to the source USD file. output_path: Path to save the modified USD. instance_key: Prim path name for the new instance. visual_mesh_path: Path to the visual mesh (OBJ format). trans_xyz: Translation [x, y, z]. rot_rpy: Rotation (roll, pitch, yaw). """ import bpy from pxr import Gf, Usd, UsdGeom prim_path = f"/{instance_key}" out_dir = os.path.dirname(output_path) target_dir = os.path.join(out_dir, instance_key) os.makedirs(target_dir, exist_ok=True) mesh_filename = os.path.basename(visual_mesh_path) usdc_filename = os.path.splitext(mesh_filename)[0] + ".usdc" target_usdc_path = os.path.join(target_dir, usdc_filename) logger.info( f"Converting with Blender (bpy): " f"{visual_mesh_path} -> {target_usdc_path}" ) bpy.ops.wm.read_factory_settings(use_empty=True) bpy.ops.wm.obj_import( filepath=visual_mesh_path, forward_axis="Y", up_axis="Z", ) bpy.ops.wm.usd_export( filepath=target_usdc_path, selected_objects_only=False, ) # Copy texture files src_dir = os.path.dirname(visual_mesh_path) for f in os.listdir(src_dir): if f.lower().endswith((".png", ".jpg", ".jpeg", ".mtl")): copy2(os.path.join(src_dir, f), target_dir) final_rel_path = f"./{instance_key}/{usdc_filename}" # Update USD stage stage = Usd.Stage.Open(usd_path) mesh_prim = UsdGeom.Xform.Define(stage, prim_path) ref_prim = UsdGeom.Mesh.Define(stage, f"{prim_path}/Mesh") ref_prim.GetPrim().GetReferences().AddReference(final_rel_path) # Build transform matrix translation_mat = Gf.Matrix4d().SetTranslate( Gf.Vec3d(trans_xyz[0], trans_xyz[1], trans_xyz[2]) ) rx = Gf.Matrix4d().SetRotate( Gf.Rotation(Gf.Vec3d(1, 0, 0), np.degrees(rot_rpy[0])) ) ry = Gf.Matrix4d().SetRotate( Gf.Rotation(Gf.Vec3d(0, 1, 0), np.degrees(rot_rpy[1])) ) rz = Gf.Matrix4d().SetRotate( Gf.Rotation(Gf.Vec3d(0, 0, 1), np.degrees(rot_rpy[2])) ) rotation_mat = rx * ry * rz transform = rotation_mat * translation_mat mesh_prim.AddTransformOp().Set(transform) stage.GetRootLayer().Export(output_path) logger.info(f"Exported: {output_path}") def save_urdf(self, output_path: str) -> None: """Save the current URDF tree to file. Args: output_path: Path to save the URDF file. """ if self._tree is None: return if hasattr(ET, "indent"): ET.indent(self._tree, space=" ", level=0) self._tree.write(output_path, encoding="utf-8", xml_declaration=True) logger.info(f"Saved updated URDF to {output_path}") def entrypoint(cfg: Scene3DGenConfig) -> None: """Main entry point for floorplan visualization and scene manipulation. Args: cfg: Configuration object with all parameters. """ # Initialize collector and parse URDF collector = UrdfSemanticInfoCollector( mesh_sample_num=cfg.mesh_sample_num, ignore_items=cfg.ignore_items, ) collector.collect(cfg.urdf_path) # Add asset instance if specified center = None if cfg.asset_path is not None: center = collector.add_instance( asset_path=cfg.asset_path, instance_key=cfg.instance_key, in_room=cfg.in_room, on_instance=cfg.on_instance, rotation_rpy=cfg.rotation_rpy, n_max_attempt=cfg.max_placement_attempts, place_strategy=cfg.place_strategy, ) if center is not None: logger.info( f"Placed '{cfg.instance_key}' at position: " f"({center[0]:.3f}, {center[1]:.3f}, {center[2]:.3f})" ) # Update URDF if requested if cfg.update_urdf: urdf_output = cfg.urdf_path.replace(".urdf", "_updated.urdf") collision_path = cfg.asset_path.replace( ".obj", "_collision.obj" ) # Use collision mesh only if it exists if not os.path.exists(collision_path): collision_path = None collector.update_urdf_info( output_path=urdf_output, instance_key=cfg.instance_key, visual_mesh_path=cfg.asset_path, collision_mesh_path=collision_path, trans_xyz=tuple(center), rot_rpy=cfg.rotation_rpy, joint_type="fixed", ) # Update USD if requested and path is provided if cfg.update_usd and cfg.usd_path is not None: usd_output = cfg.usd_path.replace(".usdc", "_updated.usdc") collector.update_usd_info( usd_path=cfg.usd_path, output_path=usd_output, instance_key=cfg.instance_key, visual_mesh_path=cfg.asset_path, trans_xyz=center, rot_rpy=cfg.rotation_rpy, ) else: logger.warning( f"Failed to place '{cfg.instance_key}' in the scene." ) # Generate floorplan visualization FloorplanVisualizer.plot( collector.rooms, collector.footprints, collector.occ_area, cfg.output_path, ) if __name__ == "__main__": config = tyro.cli(Scene3DGenConfig) entrypoint(config) """ python embodied_gen/scripts/room_gen/visualize_floorplan.py \ --urdf_path outputs/rooms/Office_seed68661/urdf/export_scene/scene.urdf \ --output_path outputs/rooms/Office_seed68661/floorplan.png \ --usd_path outputs/rooms_v2/Kitchen_seed0/usd/export_scene/export_scene.usdc \ --asset_path /home/users/xinjie.wang/xinjie/asset3d-gen/outputs/semantics_tasks/task_0059/asset3d/red_apple/result/mesh/red_apple.obj \ --instance_key red_apple \ --in_room kitchen \ --on_instance oven \ --place_strategy top """