Spaces:
Running
on
Zero
Running
on
Zero
| # Project EmbodiedGen | |
| # | |
| # Copyright (c) 2025 Horizon Robotics. All Rights Reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
| # implied. See the License for the specific language governing | |
| # permissions and limitations under the License. | |
| from __future__ import annotations | |
| import logging | |
| import os | |
| import random | |
| import re | |
| import xml.etree.ElementTree as ET | |
| from dataclasses import dataclass, field | |
| from shutil import copy2, copytree | |
| from typing import TYPE_CHECKING, Literal | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import trimesh | |
| import tyro | |
| from scipy.spatial.transform import Rotation as R | |
| from shapely.affinity import translate | |
| from shapely.geometry import MultiPoint, MultiPolygon, Point, Polygon | |
| from shapely.ops import unary_union | |
| if TYPE_CHECKING: | |
| from matplotlib.axes import Axes | |
| logging.basicConfig( | |
| format="%(asctime)s - %(levelname)s - %(message)s", | |
| level=logging.INFO, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Type aliases | |
| Geometry = Polygon | MultiPolygon | |
| # Constants | |
| DEFAULT_MESH_SAMPLE_NUM = 50000 | |
| DEFAULT_IGNORE_ITEMS = ("ceiling", "light", "exterior") | |
| DEFAULT_ROTATION_RPY = (1.57, 0.0, 0.0) | |
| DEFAULT_MAX_PLACEMENT_ATTEMPTS = 2000 | |
| __all__ = [ | |
| "points_to_polygon", | |
| "get_actionable_surface", | |
| "FloorplanVisualizer", | |
| "UrdfSemanticInfoCollector", | |
| "Scene3DGenConfig", | |
| ] | |
| class Scene3DGenConfig: | |
| """Configuration for 3D scene generation and floorplan visualization.""" | |
| urdf_path: str | |
| """Path to the input URDF scene file.""" | |
| output_path: str | |
| """Path to save the floorplan visualization image.""" | |
| # Optional paths | |
| usd_path: str | None = None | |
| """Optional path to the USD scene file for USD export.""" | |
| asset_path: str | None = None | |
| """Optional path to the asset mesh to add to the scene.""" | |
| # Instance configuration | |
| instance_key: str = "inserted_object" | |
| """Unique key for the added instance.""" | |
| in_room: str | None = None | |
| """Optional room name to constrain asset placement.""" | |
| on_instance: str | None = None | |
| """Optional instance name to place the asset on top of.""" | |
| place_strategy: Literal["top", "random"] = "random" | |
| """Placement strategy for the asset.""" | |
| rotation_rpy: tuple[float, float, float] = DEFAULT_ROTATION_RPY | |
| """Rotation in roll-pitch-yaw (radians).""" | |
| # Collector configuration | |
| ignore_items: list[str] = field( | |
| default_factory=lambda: list(DEFAULT_IGNORE_ITEMS) | |
| ) | |
| """List of item name patterns to ignore during parsing.""" | |
| mesh_sample_num: int = DEFAULT_MESH_SAMPLE_NUM | |
| """Number of points to sample from meshes.""" | |
| max_placement_attempts: int = DEFAULT_MAX_PLACEMENT_ATTEMPTS | |
| """Maximum attempts for asset placement.""" | |
| # Output flags | |
| update_urdf: bool = True | |
| """Whether to update and save the URDF file.""" | |
| update_usd: bool = True | |
| """Whether to update and save the USD file.""" | |
| def points_to_polygon( | |
| points: np.ndarray, | |
| smooth_thresh: float = 0.2, | |
| scanline_step: float = 0.01, | |
| ) -> Polygon: | |
| """Convert point clouds into polygon contours using sweep line algorithm. | |
| Args: | |
| points: Array of 2D points with shape (N, 2). | |
| smooth_thresh: Buffer threshold for smoothing the polygon. | |
| scanline_step: Step size for the scanline sweep. | |
| Returns: | |
| A Shapely Polygon representing the contour of the point cloud. | |
| """ | |
| if len(points) == 0: | |
| return Polygon() | |
| ys = points[:, 1] | |
| y_min, y_max = ys.min(), ys.max() | |
| y_values = np.arange(y_min, y_max + scanline_step, scanline_step) | |
| upper: list[list[float]] = [] | |
| lower: list[list[float]] = [] | |
| for y in y_values: | |
| pts_in_strip = points[(ys >= y) & (ys < y + scanline_step)] | |
| if len(pts_in_strip) == 0: | |
| continue | |
| xs = pts_in_strip[:, 0] | |
| upper.append([xs.max(), y]) | |
| lower.append([xs.min(), y]) | |
| contour = upper + lower[::-1] | |
| if len(contour) < 3: | |
| return Polygon() | |
| poly = Polygon(contour) | |
| return poly.buffer(smooth_thresh).buffer(-smooth_thresh) | |
| def get_actionable_surface( | |
| mesh: trimesh.Trimesh, | |
| tol_angle: int = 10, | |
| tol_z: float = 0.02, | |
| area_tolerance: float = 0.15, | |
| place_strategy: Literal["top", "random"] = "random", | |
| ) -> tuple[float, Geometry]: | |
| """Extract the actionable (placeable) surface from a mesh. | |
| Finds upward-facing surfaces and returns the best one based on the | |
| placement strategy. | |
| Args: | |
| mesh: The input trimesh object. | |
| tol_angle: Angle tolerance in degrees for detecting up-facing normals. | |
| tol_z: Z-coordinate tolerance for clustering faces. | |
| area_tolerance: Tolerance for selecting candidate surfaces by area. | |
| place_strategy: Either "top" (highest surface) or "random". | |
| Returns: | |
| A tuple of (z_height, surface_polygon) representing the selected | |
| actionable surface. | |
| """ | |
| up_vec = np.array([0, 0, 1]) | |
| dots = np.dot(mesh.face_normals, up_vec) | |
| valid_mask = dots > np.cos(np.deg2rad(tol_angle)) | |
| if not np.any(valid_mask): | |
| logger.warning( | |
| "No up-facing surfaces found. Falling back to bounding box top." | |
| ) | |
| verts = mesh.vertices[:, :2] | |
| return mesh.bounds[1][2], MultiPoint(verts).convex_hull | |
| valid_faces_indices = np.where(valid_mask)[0] | |
| face_z = mesh.triangles_center[valid_mask][:, 2] | |
| face_areas = mesh.area_faces[valid_mask] | |
| z_clusters = _cluster_faces_by_z( | |
| face_z, face_areas, valid_faces_indices, tol_z | |
| ) | |
| if not z_clusters: | |
| return mesh.bounds[1][2], MultiPoint(mesh.vertices[:, :2]).convex_hull | |
| selected_z, selected_data = _select_surface_cluster( | |
| z_clusters, area_tolerance, place_strategy | |
| ) | |
| cluster_faces = mesh.faces[selected_data["indices"]] | |
| temp_mesh = trimesh.Trimesh(vertices=mesh.vertices, faces=cluster_faces) | |
| samples, _ = trimesh.sample.sample_surface(temp_mesh, 10000) | |
| if len(samples) < 3: | |
| logger.warning( | |
| f"Failed to sample enough points on layer Z={selected_z}. " | |
| "Returning empty polygon." | |
| ) | |
| return selected_z, Polygon() | |
| surface_poly = MultiPoint(samples[:, :2]).convex_hull | |
| return selected_z, surface_poly | |
| def _cluster_faces_by_z( | |
| face_z: np.ndarray, | |
| face_areas: np.ndarray, | |
| face_indices: np.ndarray, | |
| tol_z: float, | |
| ) -> dict[float, dict]: | |
| """Cluster mesh faces by their Z coordinate. | |
| Args: | |
| face_z: Z coordinates of face centers. | |
| face_areas: Areas of each face. | |
| face_indices: Original indices of the faces. | |
| tol_z: Tolerance for Z clustering. | |
| Returns: | |
| Dictionary mapping Z values to cluster data (area and indices). | |
| """ | |
| z_clusters: dict[float, dict] = {} | |
| for i, z in enumerate(face_z): | |
| key = round(z / tol_z) * tol_z | |
| if key not in z_clusters: | |
| z_clusters[key] = {"area": 0.0, "indices": []} | |
| z_clusters[key]["area"] += face_areas[i] | |
| z_clusters[key]["indices"].append(face_indices[i]) | |
| return z_clusters | |
| def _select_surface_cluster( | |
| z_clusters: dict[float, dict], | |
| area_tolerance: float, | |
| place_strategy: Literal["top", "random"], | |
| ) -> tuple[float, dict]: | |
| """Select the best surface cluster based on strategy. | |
| Args: | |
| z_clusters: Dictionary of Z clusters with area and indices. | |
| area_tolerance: Tolerance for candidate selection by area. | |
| place_strategy: Either "top" or "random". | |
| Returns: | |
| Tuple of (selected_z, cluster_data). | |
| """ | |
| max_area = max(c["area"] for c in z_clusters.values()) | |
| candidates = [ | |
| (z, data) | |
| for z, data in z_clusters.items() | |
| if data["area"] >= max_area * (1.0 - area_tolerance) | |
| ] | |
| if not candidates: | |
| best_item = max(z_clusters.items(), key=lambda x: x[1]["area"]) | |
| candidates = [best_item] | |
| if place_strategy == "random": | |
| selected_z, selected_data = random.choice(candidates) | |
| logger.info( | |
| f"Strategy 'random': Selected Z={selected_z:.3f} " | |
| f"(Area={selected_data['area']:.3f}) " | |
| f"from {len(candidates)} candidates." | |
| ) | |
| else: | |
| candidates.sort(key=lambda x: x[0], reverse=True) | |
| selected_z, selected_data = candidates[0] | |
| logger.info( | |
| f"Strategy 'top': Selected highest Z={selected_z:.3f} " | |
| f"(Area={selected_data['area']:.3f})" | |
| ) | |
| return selected_z, selected_data | |
| class FloorplanVisualizer: | |
| """Static utility class for visualizing floorplans.""" | |
| def draw_poly(ax: Axes, poly: Geometry, **kwargs) -> None: | |
| """Draw a polygon or multi-polygon on matplotlib axes. | |
| Args: | |
| ax: Matplotlib axes object. | |
| poly: Shapely Polygon or MultiPolygon to draw. | |
| **kwargs: Additional arguments passed to ax.fill(). | |
| """ | |
| if poly.is_empty: | |
| return | |
| geoms = poly.geoms if hasattr(poly, "geoms") else [poly] | |
| color = kwargs.pop("color", None) | |
| if color is None: | |
| cmap = plt.get_cmap("tab10") | |
| colors = [cmap(i) for i in range(len(geoms))] | |
| else: | |
| colors = [color] * len(geoms) | |
| for i, p in enumerate(geoms): | |
| if p.is_empty: | |
| continue | |
| x, y = p.exterior.xy | |
| ax.fill(x, y, facecolor=colors[i], **kwargs) | |
| def plot( | |
| cls, | |
| rooms: dict[str, Geometry], | |
| footprints: dict[str, Geometry], | |
| occ_area: Geometry, | |
| save_path: str, | |
| ) -> None: | |
| """Generate and save a floorplan visualization. | |
| Args: | |
| rooms: Dictionary mapping room names to floor polygons. | |
| footprints: Dictionary mapping object names to footprint polygons. | |
| occ_area: Union of all occupied areas. | |
| save_path: Path to save the output image. | |
| """ | |
| fig, ax = plt.subplots(figsize=(10, 10)) | |
| ax.set_aspect("equal") | |
| cmap_rooms = plt.get_cmap("Pastel1") | |
| cls._draw_room_floors(ax, rooms, cmap_rooms) | |
| cls._draw_occupied_area(ax, occ_area) | |
| cls._draw_footprint_outlines(ax, footprints) | |
| cls._draw_footprint_labels(ax, footprints) | |
| cls._draw_room_labels(ax, rooms) | |
| cls._configure_axes(ax, rooms, occ_area) | |
| plt.tight_layout() | |
| plt.savefig(save_path, dpi=300) | |
| plt.close(fig) | |
| logger.info(f"Saved floorplan to: {save_path}") | |
| def _draw_room_floors( | |
| cls, | |
| ax: Axes, | |
| rooms: dict[str, Geometry], | |
| cmap: plt.cm.ScalarMappable, | |
| ) -> None: | |
| """Draw colored room floor polygons (Layer 1).""" | |
| for i, (name, poly) in enumerate(rooms.items()): | |
| color = cmap(i % cmap.N) | |
| cls.draw_poly( | |
| ax, | |
| poly, | |
| color=color, | |
| alpha=0.6, | |
| edgecolor="black", | |
| linestyle="--", | |
| zorder=1, | |
| ) | |
| def _draw_occupied_area(cls, ax: Axes, occ_area: Geometry) -> None: | |
| """Draw the occupied area overlay (Layer 2).""" | |
| cls.draw_poly( | |
| ax, | |
| occ_area, | |
| color="tab:blue", | |
| alpha=0.3, | |
| lw=0, | |
| zorder=2, | |
| ) | |
| def _draw_footprint_outlines( | |
| ax: Axes, | |
| footprints: dict[str, Geometry], | |
| ) -> None: | |
| """Draw footprint outlines (Layer 3).""" | |
| for poly in footprints.values(): | |
| if poly.is_empty: | |
| continue | |
| geoms = poly.geoms if hasattr(poly, "geoms") else [poly] | |
| for p in geoms: | |
| ax.plot(*p.exterior.xy, "--", lw=0.8, color="gray", zorder=3) | |
| def _draw_footprint_labels( | |
| ax: Axes, | |
| footprints: dict[str, Geometry], | |
| ) -> None: | |
| """Draw footprint text labels (Layer 4).""" | |
| for name, poly in footprints.items(): | |
| if poly.is_empty: | |
| continue | |
| ax.text( | |
| poly.centroid.x, | |
| poly.centroid.y, | |
| name, | |
| fontsize=5, | |
| ha="center", | |
| va="center", | |
| bbox={ | |
| "facecolor": "white", | |
| "alpha": 0.5, | |
| "edgecolor": "none", | |
| "pad": 0.1, | |
| }, | |
| zorder=4, | |
| ) | |
| def _draw_room_labels(ax: Axes, rooms: dict[str, Geometry]) -> None: | |
| """Draw room text labels (Layer 5).""" | |
| for name, poly in rooms.items(): | |
| if poly.is_empty: | |
| continue | |
| label = name.replace("_floor", "") | |
| ax.text( | |
| poly.centroid.x, | |
| poly.centroid.y, | |
| label, | |
| fontsize=9, | |
| color="black", | |
| weight="bold", | |
| ha="center", | |
| va="center", | |
| bbox={ | |
| "facecolor": "lightgray", | |
| "alpha": 0.7, | |
| "edgecolor": "black", | |
| "boxstyle": "round,pad=0.3", | |
| }, | |
| zorder=5, | |
| ) | |
| def _configure_axes( | |
| ax: Axes, | |
| rooms: dict[str, Geometry], | |
| occ_area: Geometry, | |
| ) -> None: | |
| """Configure axes limits and labels.""" | |
| total_geom = unary_union(list(rooms.values()) + [occ_area]) | |
| if total_geom.is_empty: | |
| minx, miny, maxx, maxy = -1, -1, 1, 1 | |
| else: | |
| minx, miny, maxx, maxy = total_geom.bounds | |
| margin_x = max((maxx - minx) * 0.05, 0.5) | |
| margin_y = max((maxy - miny) * 0.05, 0.5) | |
| ax.set_xlim(minx - margin_x, maxx + margin_x) | |
| ax.set_ylim(miny - margin_y, maxy + margin_y) | |
| ax.set_title("Floorplan Analysis", fontsize=14) | |
| ax.set_xlabel("X (m)") | |
| ax.set_ylabel("Y (m)") | |
| class UrdfSemanticInfoCollector: | |
| """Collector for URDF semantic information. | |
| Parses URDF files to extract room layouts, object footprints, and | |
| provides methods for adding new instances and updating URDF/USD files. | |
| Attributes: | |
| mesh_sample_num: Number of points to sample from meshes. | |
| ignore_items: List of item name patterns to ignore. | |
| instances: Dictionary of instance name to footprint polygon. | |
| instance_meta: Dictionary of instance metadata (mesh path, pose). | |
| rooms: Dictionary of room polygons. | |
| footprints: Dictionary of object footprints. | |
| occ_area: Union of all occupied areas. | |
| floor_union: Union of all floor polygons. | |
| """ | |
| def __init__( | |
| self, | |
| mesh_sample_num: int = DEFAULT_MESH_SAMPLE_NUM, | |
| ignore_items: list[str] | None = None, | |
| ) -> None: | |
| """Initialize the collector. | |
| Args: | |
| mesh_sample_num: Number of points to sample from meshes. | |
| ignore_items: List of item name patterns to ignore during parsing. | |
| """ | |
| self.mesh_sample_num = mesh_sample_num | |
| self.ignore_items = ignore_items or list(DEFAULT_IGNORE_ITEMS) | |
| self.instances: dict[str, Polygon] = {} | |
| self.instance_meta: dict[str, dict] = {} | |
| self.rooms: dict[str, Geometry] = {} | |
| self.footprints: dict[str, Geometry] = {} | |
| self.occ_area: Geometry = Polygon() | |
| self.floor_union: Geometry = Polygon() | |
| self.urdf_path: str = "" | |
| self._tree: ET.ElementTree | None = None | |
| self._root: ET.Element | None = None | |
| def _get_transform( | |
| self, | |
| joint_elem: ET.Element, | |
| ) -> tuple[np.ndarray, np.ndarray]: | |
| """Extract transform (xyz, rpy) from a joint element. | |
| Args: | |
| joint_elem: XML Element representing a URDF joint. | |
| Returns: | |
| Tuple of (xyz, rpy) arrays. | |
| """ | |
| origin = joint_elem.find("origin") | |
| if origin is not None: | |
| xyz = np.fromstring(origin.attrib.get("xyz", "0 0 0"), sep=" ") | |
| rpy = np.fromstring(origin.attrib.get("rpy", "0 0 0"), sep=" ") | |
| else: | |
| xyz, rpy = np.zeros(3), np.zeros(3) | |
| return xyz, rpy | |
| def _process_mesh_to_poly( | |
| self, | |
| mesh_path: str, | |
| xyz: np.ndarray, | |
| rpy: np.ndarray, | |
| ) -> Polygon: | |
| """Load mesh file and convert to 2D footprint polygon. | |
| Args: | |
| mesh_path: Path to the mesh file. | |
| xyz: Translation vector. | |
| rpy: Rotation in roll-pitch-yaw. | |
| Returns: | |
| Footprint polygon of the mesh. | |
| """ | |
| if not os.path.exists(mesh_path): | |
| return Polygon() | |
| mesh = trimesh.load(mesh_path, force="mesh", skip_materials=True) | |
| matrix = np.eye(4) | |
| matrix[:3, :3] = R.from_euler("xyz", rpy).as_matrix() | |
| matrix[:3, 3] = xyz | |
| mesh.apply_transform(matrix) | |
| verts = np.asarray(mesh.sample(self.mesh_sample_num))[:, :2] | |
| return points_to_polygon(verts) | |
| def collect(self, urdf_path: str) -> None: | |
| """Parse URDF file and collect semantic information. | |
| Args: | |
| urdf_path: Path to the URDF file. | |
| """ | |
| logger.info(f"Collecting URDF semantic info from {urdf_path}") | |
| self.urdf_path = urdf_path | |
| urdf_dir = os.path.dirname(urdf_path) | |
| self._tree = ET.parse(urdf_path) | |
| self._root = self._tree.getroot() | |
| link_transforms = self._build_link_transforms() | |
| self._process_links(urdf_dir, link_transforms) | |
| self._update_internal_state() | |
| def _build_link_transforms( | |
| self, | |
| ) -> dict[str, tuple[np.ndarray, np.ndarray]]: | |
| """Build mapping from link names to their transforms. | |
| Returns: | |
| Dictionary mapping link names to (xyz, rpy) tuples. | |
| """ | |
| link_transforms: dict[str, tuple[np.ndarray, np.ndarray]] = {} | |
| for joint in self._tree.findall("joint"): | |
| child = joint.find("child") | |
| if child is not None: | |
| link_name = child.attrib["link"] | |
| link_transforms[link_name] = self._get_transform(joint) | |
| return link_transforms | |
| def _process_links( | |
| self, | |
| urdf_dir: str, | |
| link_transforms: dict[str, tuple[np.ndarray, np.ndarray]], | |
| ) -> None: | |
| """Process all links in the URDF tree. | |
| Args: | |
| urdf_dir: Directory containing the URDF file. | |
| link_transforms: Dictionary of link transforms. | |
| """ | |
| self.instances = {} | |
| self.instance_meta = {} | |
| wall_polys: list[Polygon] = [] | |
| for link in self._tree.findall("link"): | |
| name = link.attrib.get("name", "").lower() | |
| if any(ign in name for ign in self.ignore_items): | |
| continue | |
| visual = link.find("visual") | |
| if visual is None: | |
| continue | |
| mesh_node = visual.find("geometry/mesh") | |
| if mesh_node is None: | |
| continue | |
| mesh_path = os.path.join(urdf_dir, mesh_node.attrib["filename"]) | |
| default_transform = (np.zeros(3), np.zeros(3)) | |
| xyz, rpy = link_transforms.get( | |
| link.attrib["name"], default_transform | |
| ) | |
| poly = self._process_mesh_to_poly(mesh_path, xyz, rpy) | |
| if poly.is_empty: | |
| continue | |
| if "wall" in name: | |
| wall_polys.append(poly) | |
| else: | |
| key = self._process_safe_key_robust(link.attrib["name"]) | |
| self.instances[key] = poly | |
| self.instance_meta[key] = { | |
| "mesh_path": mesh_path, | |
| "xyz": xyz, | |
| "rpy": rpy, | |
| } | |
| self.instances["walls"] = unary_union(wall_polys) | |
| def _update_internal_state(self) -> None: | |
| """Update derived state (rooms, footprints, occupied area).""" | |
| self.rooms = { | |
| k: v | |
| for k, v in self.instances.items() | |
| if "_floor" in k.lower() and not v.is_empty | |
| } | |
| self.footprints = { | |
| k: v | |
| for k, v in self.instances.items() | |
| if k != "walls" | |
| and "_floor" not in k.lower() | |
| and "rug" not in k.lower() | |
| and not v.is_empty | |
| } | |
| self.occ_area = unary_union(list(self.footprints.values())) | |
| self.floor_union = unary_union(list(self.rooms.values())) | |
| def _process_safe_key_robust(self, name: str) -> str: | |
| """Convert a link name to a safe, normalized key. | |
| Args: | |
| name: Original link name. | |
| Returns: | |
| Normalized key string. | |
| """ | |
| if name.endswith("_floor"): | |
| parts = name.split("_") | |
| return "_".join(parts[:-2] + ["floor"]) | |
| if "Factory" in name: | |
| # Handle infinigen naming convention | |
| prefix = name.split("Factory")[0] | |
| suffix = f"_{name.split('_')[-1]}" | |
| else: | |
| prefix, suffix = name, "" | |
| res = prefix.replace(" ", "_") | |
| res = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", res) | |
| res = res.lower() | |
| res = re.sub(r"_+", "_", res).strip("_ ") | |
| return f"{res}{suffix}" | |
| def add_instance( | |
| self, | |
| asset_path: str, | |
| instance_key: str, | |
| in_room: str | None = None, | |
| on_instance: str | None = None, | |
| rotation_rpy: tuple[float, float, float] = DEFAULT_ROTATION_RPY, | |
| n_max_attempt: int = DEFAULT_MAX_PLACEMENT_ATTEMPTS, | |
| place_strategy: Literal["top", "random"] = "random", | |
| ) -> list[float] | None: | |
| """Add a new instance to the scene with automatic placement. | |
| Args: | |
| asset_path: Path to the asset mesh file. | |
| instance_key: Unique key for the new instance. | |
| in_room: Optional room name to constrain placement. | |
| on_instance: Optional instance name to place on top of. | |
| rotation_rpy: Initial rotation in roll-pitch-yaw. | |
| n_max_attempt: Maximum placement attempts. | |
| place_strategy: Either "top" or "random". | |
| Returns: | |
| List [x, y, z] of the placed instance center, or None if failed. | |
| Raises: | |
| ValueError: If instance_key already exists or room/instance not found. | |
| """ | |
| if instance_key in self.instances: | |
| raise ValueError(f"Instance key '{instance_key}' already exists.") | |
| room_poly = self._resolve_room_polygon(in_room) | |
| target_area, obstacles, base_z = self._resolve_placement_target( | |
| on_instance, room_poly, place_strategy | |
| ) | |
| if target_area.is_empty: | |
| logger.error("Target area for placement is empty.") | |
| return None | |
| mesh = trimesh.load(asset_path, force="mesh") | |
| mesh.apply_transform( | |
| trimesh.transformations.euler_matrix(*rotation_rpy, "sxyz") | |
| ) | |
| verts = np.asarray(mesh.sample(self.mesh_sample_num))[:, :2] | |
| base_poly = points_to_polygon(verts) | |
| centroid = base_poly.centroid | |
| base_poly = translate(base_poly, xoff=-centroid.x, yoff=-centroid.y) | |
| placement = self._try_place_polygon( | |
| base_poly, target_area, obstacles, n_max_attempt | |
| ) | |
| if placement is None: | |
| logger.error( | |
| f"Failed to place {asset_path} after {n_max_attempt} attempts." | |
| ) | |
| return None | |
| x, y, candidate = placement | |
| self.instances[instance_key] = candidate | |
| final_z = base_z + mesh.extents[2] / 2 | |
| self._update_internal_state() | |
| return [round(v, 4) for v in (x, y, final_z)] | |
| def _resolve_room_polygon(self, in_room: str | None) -> Geometry | None: | |
| """Resolve room name to polygon. | |
| Args: | |
| in_room: Room name query string. | |
| Returns: | |
| Room polygon or None if not specified. | |
| Raises: | |
| ValueError: If room not found. | |
| """ | |
| if in_room is None: | |
| return None | |
| query_room = in_room.lower() | |
| room_matches = [ | |
| k for k in self.rooms.keys() if query_room in k.lower() | |
| ] | |
| if not room_matches: | |
| raise ValueError(f"Room '{in_room}' not found.") | |
| return unary_union([self.rooms[k] for k in room_matches]) | |
| def _resolve_placement_target( | |
| self, | |
| on_instance: str | None, | |
| room_poly: Geometry | None, | |
| place_strategy: Literal["top", "random"], | |
| ) -> tuple[Geometry, Geometry, float]: | |
| """Resolve the target placement area and obstacles. | |
| Args: | |
| on_instance: Instance name to place on. | |
| room_poly: Room polygon constraint. | |
| place_strategy: Placement strategy. | |
| Returns: | |
| Tuple of (target_area, obstacles, base_z_height). | |
| Raises: | |
| ValueError: If on_instance not found. | |
| """ | |
| if on_instance is None: | |
| if room_poly is not None: | |
| return room_poly, self.occ_area, 0.0 | |
| return self.floor_union, self.occ_area, 0.0 | |
| query_obj = on_instance.lower() | |
| possible_matches = [ | |
| k | |
| for k in self.instances.keys() | |
| if query_obj in k.lower() and k != "walls" | |
| ] | |
| if room_poly is not None: | |
| possible_matches = [ | |
| k | |
| for k in possible_matches | |
| if self.instances[k].intersects(room_poly) | |
| ] | |
| if not possible_matches: | |
| location_msg = f" in room '{on_instance}'" if room_poly else "" | |
| raise ValueError( | |
| f"No instance matching '{on_instance}' found{location_msg}." | |
| ) | |
| if place_strategy == "random": | |
| target_parent_key = random.choice(possible_matches) | |
| else: | |
| target_parent_key = possible_matches[0] | |
| if len(possible_matches) > 1: | |
| logger.warning( | |
| f"Multiple matches for '{on_instance}': {possible_matches}. " | |
| f"Using '{target_parent_key}'." | |
| ) | |
| meta = self.instance_meta[target_parent_key] | |
| parent_mesh = trimesh.load(meta["mesh_path"], force="mesh") | |
| matrix = np.eye(4) | |
| matrix[:3, :3] = R.from_euler("xyz", meta["rpy"]).as_matrix() | |
| matrix[:3, 3] = meta["xyz"] | |
| parent_mesh.apply_transform(matrix) | |
| best_z, surface_poly = get_actionable_surface( | |
| parent_mesh, place_strategy=place_strategy | |
| ) | |
| obstacles = self.occ_area.difference(self.instances[target_parent_key]) | |
| logger.info(f"Placing on '{target_parent_key}' (Z={best_z:.3f})") | |
| return surface_poly, obstacles, best_z | |
| def _try_place_polygon( | |
| self, | |
| base_poly: Polygon, | |
| target_area: Geometry, | |
| obstacles: Geometry, | |
| n_max_attempt: int, | |
| ) -> tuple[float, float, Polygon] | None: | |
| """Try to place polygon in target area avoiding obstacles. | |
| Args: | |
| base_poly: Polygon to place (centered at origin). | |
| target_area: Area where placement is allowed. | |
| obstacles: Areas to avoid. | |
| n_max_attempt: Maximum attempts. | |
| Returns: | |
| Tuple of (x, y, placed_polygon) or None if failed. | |
| """ | |
| minx, miny, maxx, maxy = target_area.bounds | |
| for _ in range(n_max_attempt): | |
| x = np.random.uniform(minx, maxx) | |
| y = np.random.uniform(miny, maxy) | |
| candidate = translate(base_poly, xoff=x, yoff=y) | |
| if target_area.contains(candidate) and not candidate.intersects( | |
| obstacles | |
| ): | |
| return x, y, candidate | |
| return None | |
| def update_urdf_info( | |
| self, | |
| output_path: str, | |
| instance_key: str, | |
| visual_mesh_path: str, | |
| collision_mesh_path: str | None = None, | |
| trans_xyz: tuple[float, float, float] = (0, 0, 0), | |
| rot_rpy: tuple[float, float, float] = DEFAULT_ROTATION_RPY, | |
| joint_type: str = "fixed", | |
| ) -> None: | |
| """Add a new link to the URDF tree and save. | |
| Args: | |
| output_path: Path to save the updated URDF. | |
| instance_key: Name for the new link. | |
| visual_mesh_path: Path to the visual mesh file. | |
| collision_mesh_path: Optional path to collision mesh. | |
| trans_xyz: Translation (x, y, z). | |
| rot_rpy: Rotation (roll, pitch, yaw). | |
| joint_type: Type of joint (e.g., "fixed"). | |
| """ | |
| if self._root is None: | |
| return | |
| logger.info(f"Updating URDF for instance '{instance_key}'.") | |
| urdf_dir = os.path.dirname(self.urdf_path) | |
| # Copy mesh files | |
| copytree( | |
| os.path.dirname(visual_mesh_path), | |
| f"{urdf_dir}/{instance_key}", | |
| dirs_exist_ok=True, | |
| ) | |
| visual_rel_path = ( | |
| f"{instance_key}/{os.path.basename(visual_mesh_path)}" | |
| ) | |
| collision_rel_path = None | |
| if collision_mesh_path is not None: | |
| copytree( | |
| os.path.dirname(collision_mesh_path), | |
| f"{urdf_dir}/{instance_key}", | |
| dirs_exist_ok=True, | |
| ) | |
| collision_rel_path = ( | |
| f"{instance_key}/{os.path.basename(collision_mesh_path)}" | |
| ) | |
| # Create link element | |
| link = ET.SubElement(self._root, "link", attrib={"name": instance_key}) | |
| visual = ET.SubElement(link, "visual") | |
| v_geo = ET.SubElement(visual, "geometry") | |
| ET.SubElement(v_geo, "mesh", attrib={"filename": visual_rel_path}) | |
| if collision_rel_path is not None: | |
| collision = ET.SubElement(link, "collision") | |
| c_geo = ET.SubElement(collision, "geometry") | |
| ET.SubElement( | |
| c_geo, "mesh", attrib={"filename": collision_rel_path} | |
| ) | |
| # Create joint element | |
| joint_name = f"joint_{instance_key}" | |
| joint = ET.SubElement( | |
| self._root, | |
| "joint", | |
| attrib={"name": joint_name, "type": joint_type}, | |
| ) | |
| ET.SubElement(joint, "parent", attrib={"link": "base"}) | |
| ET.SubElement(joint, "child", attrib={"link": instance_key}) | |
| xyz_str = f"{trans_xyz[0]:.4f} {trans_xyz[1]:.4f} {trans_xyz[2]:.4f}" | |
| rpy_str = f"{rot_rpy[0]:.4f} {rot_rpy[1]:.4f} {rot_rpy[2]:.4f}" | |
| ET.SubElement(joint, "origin", attrib={"xyz": xyz_str, "rpy": rpy_str}) | |
| self.save_urdf(output_path) | |
| def update_usd_info( | |
| self, | |
| usd_path: str, | |
| output_path: str, | |
| instance_key: str, | |
| visual_mesh_path: str, | |
| trans_xyz: list[float], | |
| rot_rpy: tuple[float, float, float] = DEFAULT_ROTATION_RPY, | |
| ) -> None: | |
| """Add a mesh instance to an existing USD file. | |
| Args: | |
| usd_path: Path to the source USD file. | |
| output_path: Path to save the modified USD. | |
| instance_key: Prim path name for the new instance. | |
| visual_mesh_path: Path to the visual mesh (OBJ format). | |
| trans_xyz: Translation [x, y, z]. | |
| rot_rpy: Rotation (roll, pitch, yaw). | |
| """ | |
| import bpy | |
| from pxr import Gf, Usd, UsdGeom | |
| prim_path = f"/{instance_key}" | |
| out_dir = os.path.dirname(output_path) | |
| target_dir = os.path.join(out_dir, instance_key) | |
| os.makedirs(target_dir, exist_ok=True) | |
| mesh_filename = os.path.basename(visual_mesh_path) | |
| usdc_filename = os.path.splitext(mesh_filename)[0] + ".usdc" | |
| target_usdc_path = os.path.join(target_dir, usdc_filename) | |
| logger.info( | |
| f"Converting with Blender (bpy): " | |
| f"{visual_mesh_path} -> {target_usdc_path}" | |
| ) | |
| bpy.ops.wm.read_factory_settings(use_empty=True) | |
| bpy.ops.wm.obj_import( | |
| filepath=visual_mesh_path, | |
| forward_axis="Y", | |
| up_axis="Z", | |
| ) | |
| bpy.ops.wm.usd_export( | |
| filepath=target_usdc_path, | |
| selected_objects_only=False, | |
| ) | |
| # Copy texture files | |
| src_dir = os.path.dirname(visual_mesh_path) | |
| for f in os.listdir(src_dir): | |
| if f.lower().endswith((".png", ".jpg", ".jpeg", ".mtl")): | |
| copy2(os.path.join(src_dir, f), target_dir) | |
| final_rel_path = f"./{instance_key}/{usdc_filename}" | |
| # Update USD stage | |
| stage = Usd.Stage.Open(usd_path) | |
| mesh_prim = UsdGeom.Xform.Define(stage, prim_path) | |
| ref_prim = UsdGeom.Mesh.Define(stage, f"{prim_path}/Mesh") | |
| ref_prim.GetPrim().GetReferences().AddReference(final_rel_path) | |
| # Build transform matrix | |
| translation_mat = Gf.Matrix4d().SetTranslate( | |
| Gf.Vec3d(trans_xyz[0], trans_xyz[1], trans_xyz[2]) | |
| ) | |
| rx = Gf.Matrix4d().SetRotate( | |
| Gf.Rotation(Gf.Vec3d(1, 0, 0), np.degrees(rot_rpy[0])) | |
| ) | |
| ry = Gf.Matrix4d().SetRotate( | |
| Gf.Rotation(Gf.Vec3d(0, 1, 0), np.degrees(rot_rpy[1])) | |
| ) | |
| rz = Gf.Matrix4d().SetRotate( | |
| Gf.Rotation(Gf.Vec3d(0, 0, 1), np.degrees(rot_rpy[2])) | |
| ) | |
| rotation_mat = rx * ry * rz | |
| transform = rotation_mat * translation_mat | |
| mesh_prim.AddTransformOp().Set(transform) | |
| stage.GetRootLayer().Export(output_path) | |
| logger.info(f"Exported: {output_path}") | |
| def save_urdf(self, output_path: str) -> None: | |
| """Save the current URDF tree to file. | |
| Args: | |
| output_path: Path to save the URDF file. | |
| """ | |
| if self._tree is None: | |
| return | |
| if hasattr(ET, "indent"): | |
| ET.indent(self._tree, space=" ", level=0) | |
| self._tree.write(output_path, encoding="utf-8", xml_declaration=True) | |
| logger.info(f"Saved updated URDF to {output_path}") | |
| def entrypoint(cfg: Scene3DGenConfig) -> None: | |
| """Main entry point for floorplan visualization and scene manipulation. | |
| Args: | |
| cfg: Configuration object with all parameters. | |
| """ | |
| # Initialize collector and parse URDF | |
| collector = UrdfSemanticInfoCollector( | |
| mesh_sample_num=cfg.mesh_sample_num, | |
| ignore_items=cfg.ignore_items, | |
| ) | |
| collector.collect(cfg.urdf_path) | |
| # Add asset instance if specified | |
| center = None | |
| if cfg.asset_path is not None: | |
| center = collector.add_instance( | |
| asset_path=cfg.asset_path, | |
| instance_key=cfg.instance_key, | |
| in_room=cfg.in_room, | |
| on_instance=cfg.on_instance, | |
| rotation_rpy=cfg.rotation_rpy, | |
| n_max_attempt=cfg.max_placement_attempts, | |
| place_strategy=cfg.place_strategy, | |
| ) | |
| if center is not None: | |
| logger.info( | |
| f"Placed '{cfg.instance_key}' at position: " | |
| f"({center[0]:.3f}, {center[1]:.3f}, {center[2]:.3f})" | |
| ) | |
| # Update URDF if requested | |
| if cfg.update_urdf: | |
| urdf_output = cfg.urdf_path.replace(".urdf", "_updated.urdf") | |
| collision_path = cfg.asset_path.replace( | |
| ".obj", "_collision.obj" | |
| ) | |
| # Use collision mesh only if it exists | |
| if not os.path.exists(collision_path): | |
| collision_path = None | |
| collector.update_urdf_info( | |
| output_path=urdf_output, | |
| instance_key=cfg.instance_key, | |
| visual_mesh_path=cfg.asset_path, | |
| collision_mesh_path=collision_path, | |
| trans_xyz=tuple(center), | |
| rot_rpy=cfg.rotation_rpy, | |
| joint_type="fixed", | |
| ) | |
| # Update USD if requested and path is provided | |
| if cfg.update_usd and cfg.usd_path is not None: | |
| usd_output = cfg.usd_path.replace(".usdc", "_updated.usdc") | |
| collector.update_usd_info( | |
| usd_path=cfg.usd_path, | |
| output_path=usd_output, | |
| instance_key=cfg.instance_key, | |
| visual_mesh_path=cfg.asset_path, | |
| trans_xyz=center, | |
| rot_rpy=cfg.rotation_rpy, | |
| ) | |
| else: | |
| logger.warning( | |
| f"Failed to place '{cfg.instance_key}' in the scene." | |
| ) | |
| # Generate floorplan visualization | |
| FloorplanVisualizer.plot( | |
| collector.rooms, | |
| collector.footprints, | |
| collector.occ_area, | |
| cfg.output_path, | |
| ) | |
| if __name__ == "__main__": | |
| config = tyro.cli(Scene3DGenConfig) | |
| entrypoint(config) | |
| """ | |
| python embodied_gen/scripts/room_gen/visualize_floorplan.py \ | |
| --urdf_path outputs/rooms/Office_seed68661/urdf/export_scene/scene.urdf \ | |
| --output_path outputs/rooms/Office_seed68661/floorplan.png \ | |
| --usd_path outputs/rooms_v2/Kitchen_seed0/usd/export_scene/export_scene.usdc \ | |
| --asset_path /home/users/xinjie.wang/xinjie/asset3d-gen/outputs/semantics_tasks/task_0059/asset3d/red_apple/result/mesh/red_apple.obj \ | |
| --instance_key red_apple \ | |
| --in_room kitchen \ | |
| --on_instance oven \ | |
| --place_strategy top | |
| """ | |