xinjie.wang
update
7778681
# Project EmbodiedGen
#
# Copyright (c) 2025 Horizon Robotics. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
from __future__ import annotations
import logging
import os
import random
import re
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
from shutil import copy2, copytree
from typing import TYPE_CHECKING, Literal
import matplotlib.pyplot as plt
import numpy as np
import trimesh
import tyro
from scipy.spatial.transform import Rotation as R
from shapely.affinity import translate
from shapely.geometry import MultiPoint, MultiPolygon, Point, Polygon
from shapely.ops import unary_union
if TYPE_CHECKING:
from matplotlib.axes import Axes
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s",
level=logging.INFO,
)
logger = logging.getLogger(__name__)
# Type aliases
Geometry = Polygon | MultiPolygon
# Constants
DEFAULT_MESH_SAMPLE_NUM = 50000
DEFAULT_IGNORE_ITEMS = ("ceiling", "light", "exterior")
DEFAULT_ROTATION_RPY = (1.57, 0.0, 0.0)
DEFAULT_MAX_PLACEMENT_ATTEMPTS = 2000
__all__ = [
"points_to_polygon",
"get_actionable_surface",
"FloorplanVisualizer",
"UrdfSemanticInfoCollector",
"Scene3DGenConfig",
]
@dataclass
class Scene3DGenConfig:
"""Configuration for 3D scene generation and floorplan visualization."""
urdf_path: str
"""Path to the input URDF scene file."""
output_path: str
"""Path to save the floorplan visualization image."""
# Optional paths
usd_path: str | None = None
"""Optional path to the USD scene file for USD export."""
asset_path: str | None = None
"""Optional path to the asset mesh to add to the scene."""
# Instance configuration
instance_key: str = "inserted_object"
"""Unique key for the added instance."""
in_room: str | None = None
"""Optional room name to constrain asset placement."""
on_instance: str | None = None
"""Optional instance name to place the asset on top of."""
place_strategy: Literal["top", "random"] = "random"
"""Placement strategy for the asset."""
rotation_rpy: tuple[float, float, float] = DEFAULT_ROTATION_RPY
"""Rotation in roll-pitch-yaw (radians)."""
# Collector configuration
ignore_items: list[str] = field(
default_factory=lambda: list(DEFAULT_IGNORE_ITEMS)
)
"""List of item name patterns to ignore during parsing."""
mesh_sample_num: int = DEFAULT_MESH_SAMPLE_NUM
"""Number of points to sample from meshes."""
max_placement_attempts: int = DEFAULT_MAX_PLACEMENT_ATTEMPTS
"""Maximum attempts for asset placement."""
# Output flags
update_urdf: bool = True
"""Whether to update and save the URDF file."""
update_usd: bool = True
"""Whether to update and save the USD file."""
def points_to_polygon(
points: np.ndarray,
smooth_thresh: float = 0.2,
scanline_step: float = 0.01,
) -> Polygon:
"""Convert point clouds into polygon contours using sweep line algorithm.
Args:
points: Array of 2D points with shape (N, 2).
smooth_thresh: Buffer threshold for smoothing the polygon.
scanline_step: Step size for the scanline sweep.
Returns:
A Shapely Polygon representing the contour of the point cloud.
"""
if len(points) == 0:
return Polygon()
ys = points[:, 1]
y_min, y_max = ys.min(), ys.max()
y_values = np.arange(y_min, y_max + scanline_step, scanline_step)
upper: list[list[float]] = []
lower: list[list[float]] = []
for y in y_values:
pts_in_strip = points[(ys >= y) & (ys < y + scanline_step)]
if len(pts_in_strip) == 0:
continue
xs = pts_in_strip[:, 0]
upper.append([xs.max(), y])
lower.append([xs.min(), y])
contour = upper + lower[::-1]
if len(contour) < 3:
return Polygon()
poly = Polygon(contour)
return poly.buffer(smooth_thresh).buffer(-smooth_thresh)
def get_actionable_surface(
mesh: trimesh.Trimesh,
tol_angle: int = 10,
tol_z: float = 0.02,
area_tolerance: float = 0.15,
place_strategy: Literal["top", "random"] = "random",
) -> tuple[float, Geometry]:
"""Extract the actionable (placeable) surface from a mesh.
Finds upward-facing surfaces and returns the best one based on the
placement strategy.
Args:
mesh: The input trimesh object.
tol_angle: Angle tolerance in degrees for detecting up-facing normals.
tol_z: Z-coordinate tolerance for clustering faces.
area_tolerance: Tolerance for selecting candidate surfaces by area.
place_strategy: Either "top" (highest surface) or "random".
Returns:
A tuple of (z_height, surface_polygon) representing the selected
actionable surface.
"""
up_vec = np.array([0, 0, 1])
dots = np.dot(mesh.face_normals, up_vec)
valid_mask = dots > np.cos(np.deg2rad(tol_angle))
if not np.any(valid_mask):
logger.warning(
"No up-facing surfaces found. Falling back to bounding box top."
)
verts = mesh.vertices[:, :2]
return mesh.bounds[1][2], MultiPoint(verts).convex_hull
valid_faces_indices = np.where(valid_mask)[0]
face_z = mesh.triangles_center[valid_mask][:, 2]
face_areas = mesh.area_faces[valid_mask]
z_clusters = _cluster_faces_by_z(
face_z, face_areas, valid_faces_indices, tol_z
)
if not z_clusters:
return mesh.bounds[1][2], MultiPoint(mesh.vertices[:, :2]).convex_hull
selected_z, selected_data = _select_surface_cluster(
z_clusters, area_tolerance, place_strategy
)
cluster_faces = mesh.faces[selected_data["indices"]]
temp_mesh = trimesh.Trimesh(vertices=mesh.vertices, faces=cluster_faces)
samples, _ = trimesh.sample.sample_surface(temp_mesh, 10000)
if len(samples) < 3:
logger.warning(
f"Failed to sample enough points on layer Z={selected_z}. "
"Returning empty polygon."
)
return selected_z, Polygon()
surface_poly = MultiPoint(samples[:, :2]).convex_hull
return selected_z, surface_poly
def _cluster_faces_by_z(
face_z: np.ndarray,
face_areas: np.ndarray,
face_indices: np.ndarray,
tol_z: float,
) -> dict[float, dict]:
"""Cluster mesh faces by their Z coordinate.
Args:
face_z: Z coordinates of face centers.
face_areas: Areas of each face.
face_indices: Original indices of the faces.
tol_z: Tolerance for Z clustering.
Returns:
Dictionary mapping Z values to cluster data (area and indices).
"""
z_clusters: dict[float, dict] = {}
for i, z in enumerate(face_z):
key = round(z / tol_z) * tol_z
if key not in z_clusters:
z_clusters[key] = {"area": 0.0, "indices": []}
z_clusters[key]["area"] += face_areas[i]
z_clusters[key]["indices"].append(face_indices[i])
return z_clusters
def _select_surface_cluster(
z_clusters: dict[float, dict],
area_tolerance: float,
place_strategy: Literal["top", "random"],
) -> tuple[float, dict]:
"""Select the best surface cluster based on strategy.
Args:
z_clusters: Dictionary of Z clusters with area and indices.
area_tolerance: Tolerance for candidate selection by area.
place_strategy: Either "top" or "random".
Returns:
Tuple of (selected_z, cluster_data).
"""
max_area = max(c["area"] for c in z_clusters.values())
candidates = [
(z, data)
for z, data in z_clusters.items()
if data["area"] >= max_area * (1.0 - area_tolerance)
]
if not candidates:
best_item = max(z_clusters.items(), key=lambda x: x[1]["area"])
candidates = [best_item]
if place_strategy == "random":
selected_z, selected_data = random.choice(candidates)
logger.info(
f"Strategy 'random': Selected Z={selected_z:.3f} "
f"(Area={selected_data['area']:.3f}) "
f"from {len(candidates)} candidates."
)
else:
candidates.sort(key=lambda x: x[0], reverse=True)
selected_z, selected_data = candidates[0]
logger.info(
f"Strategy 'top': Selected highest Z={selected_z:.3f} "
f"(Area={selected_data['area']:.3f})"
)
return selected_z, selected_data
class FloorplanVisualizer:
"""Static utility class for visualizing floorplans."""
@staticmethod
def draw_poly(ax: Axes, poly: Geometry, **kwargs) -> None:
"""Draw a polygon or multi-polygon on matplotlib axes.
Args:
ax: Matplotlib axes object.
poly: Shapely Polygon or MultiPolygon to draw.
**kwargs: Additional arguments passed to ax.fill().
"""
if poly.is_empty:
return
geoms = poly.geoms if hasattr(poly, "geoms") else [poly]
color = kwargs.pop("color", None)
if color is None:
cmap = plt.get_cmap("tab10")
colors = [cmap(i) for i in range(len(geoms))]
else:
colors = [color] * len(geoms)
for i, p in enumerate(geoms):
if p.is_empty:
continue
x, y = p.exterior.xy
ax.fill(x, y, facecolor=colors[i], **kwargs)
@classmethod
def plot(
cls,
rooms: dict[str, Geometry],
footprints: dict[str, Geometry],
occ_area: Geometry,
save_path: str,
) -> None:
"""Generate and save a floorplan visualization.
Args:
rooms: Dictionary mapping room names to floor polygons.
footprints: Dictionary mapping object names to footprint polygons.
occ_area: Union of all occupied areas.
save_path: Path to save the output image.
"""
fig, ax = plt.subplots(figsize=(10, 10))
ax.set_aspect("equal")
cmap_rooms = plt.get_cmap("Pastel1")
cls._draw_room_floors(ax, rooms, cmap_rooms)
cls._draw_occupied_area(ax, occ_area)
cls._draw_footprint_outlines(ax, footprints)
cls._draw_footprint_labels(ax, footprints)
cls._draw_room_labels(ax, rooms)
cls._configure_axes(ax, rooms, occ_area)
plt.tight_layout()
plt.savefig(save_path, dpi=300)
plt.close(fig)
logger.info(f"Saved floorplan to: {save_path}")
@classmethod
def _draw_room_floors(
cls,
ax: Axes,
rooms: dict[str, Geometry],
cmap: plt.cm.ScalarMappable,
) -> None:
"""Draw colored room floor polygons (Layer 1)."""
for i, (name, poly) in enumerate(rooms.items()):
color = cmap(i % cmap.N)
cls.draw_poly(
ax,
poly,
color=color,
alpha=0.6,
edgecolor="black",
linestyle="--",
zorder=1,
)
@classmethod
def _draw_occupied_area(cls, ax: Axes, occ_area: Geometry) -> None:
"""Draw the occupied area overlay (Layer 2)."""
cls.draw_poly(
ax,
occ_area,
color="tab:blue",
alpha=0.3,
lw=0,
zorder=2,
)
@staticmethod
def _draw_footprint_outlines(
ax: Axes,
footprints: dict[str, Geometry],
) -> None:
"""Draw footprint outlines (Layer 3)."""
for poly in footprints.values():
if poly.is_empty:
continue
geoms = poly.geoms if hasattr(poly, "geoms") else [poly]
for p in geoms:
ax.plot(*p.exterior.xy, "--", lw=0.8, color="gray", zorder=3)
@staticmethod
def _draw_footprint_labels(
ax: Axes,
footprints: dict[str, Geometry],
) -> None:
"""Draw footprint text labels (Layer 4)."""
for name, poly in footprints.items():
if poly.is_empty:
continue
ax.text(
poly.centroid.x,
poly.centroid.y,
name,
fontsize=5,
ha="center",
va="center",
bbox={
"facecolor": "white",
"alpha": 0.5,
"edgecolor": "none",
"pad": 0.1,
},
zorder=4,
)
@staticmethod
def _draw_room_labels(ax: Axes, rooms: dict[str, Geometry]) -> None:
"""Draw room text labels (Layer 5)."""
for name, poly in rooms.items():
if poly.is_empty:
continue
label = name.replace("_floor", "")
ax.text(
poly.centroid.x,
poly.centroid.y,
label,
fontsize=9,
color="black",
weight="bold",
ha="center",
va="center",
bbox={
"facecolor": "lightgray",
"alpha": 0.7,
"edgecolor": "black",
"boxstyle": "round,pad=0.3",
},
zorder=5,
)
@staticmethod
def _configure_axes(
ax: Axes,
rooms: dict[str, Geometry],
occ_area: Geometry,
) -> None:
"""Configure axes limits and labels."""
total_geom = unary_union(list(rooms.values()) + [occ_area])
if total_geom.is_empty:
minx, miny, maxx, maxy = -1, -1, 1, 1
else:
minx, miny, maxx, maxy = total_geom.bounds
margin_x = max((maxx - minx) * 0.05, 0.5)
margin_y = max((maxy - miny) * 0.05, 0.5)
ax.set_xlim(minx - margin_x, maxx + margin_x)
ax.set_ylim(miny - margin_y, maxy + margin_y)
ax.set_title("Floorplan Analysis", fontsize=14)
ax.set_xlabel("X (m)")
ax.set_ylabel("Y (m)")
class UrdfSemanticInfoCollector:
"""Collector for URDF semantic information.
Parses URDF files to extract room layouts, object footprints, and
provides methods for adding new instances and updating URDF/USD files.
Attributes:
mesh_sample_num: Number of points to sample from meshes.
ignore_items: List of item name patterns to ignore.
instances: Dictionary of instance name to footprint polygon.
instance_meta: Dictionary of instance metadata (mesh path, pose).
rooms: Dictionary of room polygons.
footprints: Dictionary of object footprints.
occ_area: Union of all occupied areas.
floor_union: Union of all floor polygons.
"""
def __init__(
self,
mesh_sample_num: int = DEFAULT_MESH_SAMPLE_NUM,
ignore_items: list[str] | None = None,
) -> None:
"""Initialize the collector.
Args:
mesh_sample_num: Number of points to sample from meshes.
ignore_items: List of item name patterns to ignore during parsing.
"""
self.mesh_sample_num = mesh_sample_num
self.ignore_items = ignore_items or list(DEFAULT_IGNORE_ITEMS)
self.instances: dict[str, Polygon] = {}
self.instance_meta: dict[str, dict] = {}
self.rooms: dict[str, Geometry] = {}
self.footprints: dict[str, Geometry] = {}
self.occ_area: Geometry = Polygon()
self.floor_union: Geometry = Polygon()
self.urdf_path: str = ""
self._tree: ET.ElementTree | None = None
self._root: ET.Element | None = None
def _get_transform(
self,
joint_elem: ET.Element,
) -> tuple[np.ndarray, np.ndarray]:
"""Extract transform (xyz, rpy) from a joint element.
Args:
joint_elem: XML Element representing a URDF joint.
Returns:
Tuple of (xyz, rpy) arrays.
"""
origin = joint_elem.find("origin")
if origin is not None:
xyz = np.fromstring(origin.attrib.get("xyz", "0 0 0"), sep=" ")
rpy = np.fromstring(origin.attrib.get("rpy", "0 0 0"), sep=" ")
else:
xyz, rpy = np.zeros(3), np.zeros(3)
return xyz, rpy
def _process_mesh_to_poly(
self,
mesh_path: str,
xyz: np.ndarray,
rpy: np.ndarray,
) -> Polygon:
"""Load mesh file and convert to 2D footprint polygon.
Args:
mesh_path: Path to the mesh file.
xyz: Translation vector.
rpy: Rotation in roll-pitch-yaw.
Returns:
Footprint polygon of the mesh.
"""
if not os.path.exists(mesh_path):
return Polygon()
mesh = trimesh.load(mesh_path, force="mesh", skip_materials=True)
matrix = np.eye(4)
matrix[:3, :3] = R.from_euler("xyz", rpy).as_matrix()
matrix[:3, 3] = xyz
mesh.apply_transform(matrix)
verts = np.asarray(mesh.sample(self.mesh_sample_num))[:, :2]
return points_to_polygon(verts)
def collect(self, urdf_path: str) -> None:
"""Parse URDF file and collect semantic information.
Args:
urdf_path: Path to the URDF file.
"""
logger.info(f"Collecting URDF semantic info from {urdf_path}")
self.urdf_path = urdf_path
urdf_dir = os.path.dirname(urdf_path)
self._tree = ET.parse(urdf_path)
self._root = self._tree.getroot()
link_transforms = self._build_link_transforms()
self._process_links(urdf_dir, link_transforms)
self._update_internal_state()
def _build_link_transforms(
self,
) -> dict[str, tuple[np.ndarray, np.ndarray]]:
"""Build mapping from link names to their transforms.
Returns:
Dictionary mapping link names to (xyz, rpy) tuples.
"""
link_transforms: dict[str, tuple[np.ndarray, np.ndarray]] = {}
for joint in self._tree.findall("joint"):
child = joint.find("child")
if child is not None:
link_name = child.attrib["link"]
link_transforms[link_name] = self._get_transform(joint)
return link_transforms
def _process_links(
self,
urdf_dir: str,
link_transforms: dict[str, tuple[np.ndarray, np.ndarray]],
) -> None:
"""Process all links in the URDF tree.
Args:
urdf_dir: Directory containing the URDF file.
link_transforms: Dictionary of link transforms.
"""
self.instances = {}
self.instance_meta = {}
wall_polys: list[Polygon] = []
for link in self._tree.findall("link"):
name = link.attrib.get("name", "").lower()
if any(ign in name for ign in self.ignore_items):
continue
visual = link.find("visual")
if visual is None:
continue
mesh_node = visual.find("geometry/mesh")
if mesh_node is None:
continue
mesh_path = os.path.join(urdf_dir, mesh_node.attrib["filename"])
default_transform = (np.zeros(3), np.zeros(3))
xyz, rpy = link_transforms.get(
link.attrib["name"], default_transform
)
poly = self._process_mesh_to_poly(mesh_path, xyz, rpy)
if poly.is_empty:
continue
if "wall" in name:
wall_polys.append(poly)
else:
key = self._process_safe_key_robust(link.attrib["name"])
self.instances[key] = poly
self.instance_meta[key] = {
"mesh_path": mesh_path,
"xyz": xyz,
"rpy": rpy,
}
self.instances["walls"] = unary_union(wall_polys)
def _update_internal_state(self) -> None:
"""Update derived state (rooms, footprints, occupied area)."""
self.rooms = {
k: v
for k, v in self.instances.items()
if "_floor" in k.lower() and not v.is_empty
}
self.footprints = {
k: v
for k, v in self.instances.items()
if k != "walls"
and "_floor" not in k.lower()
and "rug" not in k.lower()
and not v.is_empty
}
self.occ_area = unary_union(list(self.footprints.values()))
self.floor_union = unary_union(list(self.rooms.values()))
def _process_safe_key_robust(self, name: str) -> str:
"""Convert a link name to a safe, normalized key.
Args:
name: Original link name.
Returns:
Normalized key string.
"""
if name.endswith("_floor"):
parts = name.split("_")
return "_".join(parts[:-2] + ["floor"])
if "Factory" in name:
# Handle infinigen naming convention
prefix = name.split("Factory")[0]
suffix = f"_{name.split('_')[-1]}"
else:
prefix, suffix = name, ""
res = prefix.replace(" ", "_")
res = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", res)
res = res.lower()
res = re.sub(r"_+", "_", res).strip("_ ")
return f"{res}{suffix}"
def add_instance(
self,
asset_path: str,
instance_key: str,
in_room: str | None = None,
on_instance: str | None = None,
rotation_rpy: tuple[float, float, float] = DEFAULT_ROTATION_RPY,
n_max_attempt: int = DEFAULT_MAX_PLACEMENT_ATTEMPTS,
place_strategy: Literal["top", "random"] = "random",
) -> list[float] | None:
"""Add a new instance to the scene with automatic placement.
Args:
asset_path: Path to the asset mesh file.
instance_key: Unique key for the new instance.
in_room: Optional room name to constrain placement.
on_instance: Optional instance name to place on top of.
rotation_rpy: Initial rotation in roll-pitch-yaw.
n_max_attempt: Maximum placement attempts.
place_strategy: Either "top" or "random".
Returns:
List [x, y, z] of the placed instance center, or None if failed.
Raises:
ValueError: If instance_key already exists or room/instance not found.
"""
if instance_key in self.instances:
raise ValueError(f"Instance key '{instance_key}' already exists.")
room_poly = self._resolve_room_polygon(in_room)
target_area, obstacles, base_z = self._resolve_placement_target(
on_instance, room_poly, place_strategy
)
if target_area.is_empty:
logger.error("Target area for placement is empty.")
return None
mesh = trimesh.load(asset_path, force="mesh")
mesh.apply_transform(
trimesh.transformations.euler_matrix(*rotation_rpy, "sxyz")
)
verts = np.asarray(mesh.sample(self.mesh_sample_num))[:, :2]
base_poly = points_to_polygon(verts)
centroid = base_poly.centroid
base_poly = translate(base_poly, xoff=-centroid.x, yoff=-centroid.y)
placement = self._try_place_polygon(
base_poly, target_area, obstacles, n_max_attempt
)
if placement is None:
logger.error(
f"Failed to place {asset_path} after {n_max_attempt} attempts."
)
return None
x, y, candidate = placement
self.instances[instance_key] = candidate
final_z = base_z + mesh.extents[2] / 2
self._update_internal_state()
return [round(v, 4) for v in (x, y, final_z)]
def _resolve_room_polygon(self, in_room: str | None) -> Geometry | None:
"""Resolve room name to polygon.
Args:
in_room: Room name query string.
Returns:
Room polygon or None if not specified.
Raises:
ValueError: If room not found.
"""
if in_room is None:
return None
query_room = in_room.lower()
room_matches = [
k for k in self.rooms.keys() if query_room in k.lower()
]
if not room_matches:
raise ValueError(f"Room '{in_room}' not found.")
return unary_union([self.rooms[k] for k in room_matches])
def _resolve_placement_target(
self,
on_instance: str | None,
room_poly: Geometry | None,
place_strategy: Literal["top", "random"],
) -> tuple[Geometry, Geometry, float]:
"""Resolve the target placement area and obstacles.
Args:
on_instance: Instance name to place on.
room_poly: Room polygon constraint.
place_strategy: Placement strategy.
Returns:
Tuple of (target_area, obstacles, base_z_height).
Raises:
ValueError: If on_instance not found.
"""
if on_instance is None:
if room_poly is not None:
return room_poly, self.occ_area, 0.0
return self.floor_union, self.occ_area, 0.0
query_obj = on_instance.lower()
possible_matches = [
k
for k in self.instances.keys()
if query_obj in k.lower() and k != "walls"
]
if room_poly is not None:
possible_matches = [
k
for k in possible_matches
if self.instances[k].intersects(room_poly)
]
if not possible_matches:
location_msg = f" in room '{on_instance}'" if room_poly else ""
raise ValueError(
f"No instance matching '{on_instance}' found{location_msg}."
)
if place_strategy == "random":
target_parent_key = random.choice(possible_matches)
else:
target_parent_key = possible_matches[0]
if len(possible_matches) > 1:
logger.warning(
f"Multiple matches for '{on_instance}': {possible_matches}. "
f"Using '{target_parent_key}'."
)
meta = self.instance_meta[target_parent_key]
parent_mesh = trimesh.load(meta["mesh_path"], force="mesh")
matrix = np.eye(4)
matrix[:3, :3] = R.from_euler("xyz", meta["rpy"]).as_matrix()
matrix[:3, 3] = meta["xyz"]
parent_mesh.apply_transform(matrix)
best_z, surface_poly = get_actionable_surface(
parent_mesh, place_strategy=place_strategy
)
obstacles = self.occ_area.difference(self.instances[target_parent_key])
logger.info(f"Placing on '{target_parent_key}' (Z={best_z:.3f})")
return surface_poly, obstacles, best_z
def _try_place_polygon(
self,
base_poly: Polygon,
target_area: Geometry,
obstacles: Geometry,
n_max_attempt: int,
) -> tuple[float, float, Polygon] | None:
"""Try to place polygon in target area avoiding obstacles.
Args:
base_poly: Polygon to place (centered at origin).
target_area: Area where placement is allowed.
obstacles: Areas to avoid.
n_max_attempt: Maximum attempts.
Returns:
Tuple of (x, y, placed_polygon) or None if failed.
"""
minx, miny, maxx, maxy = target_area.bounds
for _ in range(n_max_attempt):
x = np.random.uniform(minx, maxx)
y = np.random.uniform(miny, maxy)
candidate = translate(base_poly, xoff=x, yoff=y)
if target_area.contains(candidate) and not candidate.intersects(
obstacles
):
return x, y, candidate
return None
def update_urdf_info(
self,
output_path: str,
instance_key: str,
visual_mesh_path: str,
collision_mesh_path: str | None = None,
trans_xyz: tuple[float, float, float] = (0, 0, 0),
rot_rpy: tuple[float, float, float] = DEFAULT_ROTATION_RPY,
joint_type: str = "fixed",
) -> None:
"""Add a new link to the URDF tree and save.
Args:
output_path: Path to save the updated URDF.
instance_key: Name for the new link.
visual_mesh_path: Path to the visual mesh file.
collision_mesh_path: Optional path to collision mesh.
trans_xyz: Translation (x, y, z).
rot_rpy: Rotation (roll, pitch, yaw).
joint_type: Type of joint (e.g., "fixed").
"""
if self._root is None:
return
logger.info(f"Updating URDF for instance '{instance_key}'.")
urdf_dir = os.path.dirname(self.urdf_path)
# Copy mesh files
copytree(
os.path.dirname(visual_mesh_path),
f"{urdf_dir}/{instance_key}",
dirs_exist_ok=True,
)
visual_rel_path = (
f"{instance_key}/{os.path.basename(visual_mesh_path)}"
)
collision_rel_path = None
if collision_mesh_path is not None:
copytree(
os.path.dirname(collision_mesh_path),
f"{urdf_dir}/{instance_key}",
dirs_exist_ok=True,
)
collision_rel_path = (
f"{instance_key}/{os.path.basename(collision_mesh_path)}"
)
# Create link element
link = ET.SubElement(self._root, "link", attrib={"name": instance_key})
visual = ET.SubElement(link, "visual")
v_geo = ET.SubElement(visual, "geometry")
ET.SubElement(v_geo, "mesh", attrib={"filename": visual_rel_path})
if collision_rel_path is not None:
collision = ET.SubElement(link, "collision")
c_geo = ET.SubElement(collision, "geometry")
ET.SubElement(
c_geo, "mesh", attrib={"filename": collision_rel_path}
)
# Create joint element
joint_name = f"joint_{instance_key}"
joint = ET.SubElement(
self._root,
"joint",
attrib={"name": joint_name, "type": joint_type},
)
ET.SubElement(joint, "parent", attrib={"link": "base"})
ET.SubElement(joint, "child", attrib={"link": instance_key})
xyz_str = f"{trans_xyz[0]:.4f} {trans_xyz[1]:.4f} {trans_xyz[2]:.4f}"
rpy_str = f"{rot_rpy[0]:.4f} {rot_rpy[1]:.4f} {rot_rpy[2]:.4f}"
ET.SubElement(joint, "origin", attrib={"xyz": xyz_str, "rpy": rpy_str})
self.save_urdf(output_path)
def update_usd_info(
self,
usd_path: str,
output_path: str,
instance_key: str,
visual_mesh_path: str,
trans_xyz: list[float],
rot_rpy: tuple[float, float, float] = DEFAULT_ROTATION_RPY,
) -> None:
"""Add a mesh instance to an existing USD file.
Args:
usd_path: Path to the source USD file.
output_path: Path to save the modified USD.
instance_key: Prim path name for the new instance.
visual_mesh_path: Path to the visual mesh (OBJ format).
trans_xyz: Translation [x, y, z].
rot_rpy: Rotation (roll, pitch, yaw).
"""
import bpy
from pxr import Gf, Usd, UsdGeom
prim_path = f"/{instance_key}"
out_dir = os.path.dirname(output_path)
target_dir = os.path.join(out_dir, instance_key)
os.makedirs(target_dir, exist_ok=True)
mesh_filename = os.path.basename(visual_mesh_path)
usdc_filename = os.path.splitext(mesh_filename)[0] + ".usdc"
target_usdc_path = os.path.join(target_dir, usdc_filename)
logger.info(
f"Converting with Blender (bpy): "
f"{visual_mesh_path} -> {target_usdc_path}"
)
bpy.ops.wm.read_factory_settings(use_empty=True)
bpy.ops.wm.obj_import(
filepath=visual_mesh_path,
forward_axis="Y",
up_axis="Z",
)
bpy.ops.wm.usd_export(
filepath=target_usdc_path,
selected_objects_only=False,
)
# Copy texture files
src_dir = os.path.dirname(visual_mesh_path)
for f in os.listdir(src_dir):
if f.lower().endswith((".png", ".jpg", ".jpeg", ".mtl")):
copy2(os.path.join(src_dir, f), target_dir)
final_rel_path = f"./{instance_key}/{usdc_filename}"
# Update USD stage
stage = Usd.Stage.Open(usd_path)
mesh_prim = UsdGeom.Xform.Define(stage, prim_path)
ref_prim = UsdGeom.Mesh.Define(stage, f"{prim_path}/Mesh")
ref_prim.GetPrim().GetReferences().AddReference(final_rel_path)
# Build transform matrix
translation_mat = Gf.Matrix4d().SetTranslate(
Gf.Vec3d(trans_xyz[0], trans_xyz[1], trans_xyz[2])
)
rx = Gf.Matrix4d().SetRotate(
Gf.Rotation(Gf.Vec3d(1, 0, 0), np.degrees(rot_rpy[0]))
)
ry = Gf.Matrix4d().SetRotate(
Gf.Rotation(Gf.Vec3d(0, 1, 0), np.degrees(rot_rpy[1]))
)
rz = Gf.Matrix4d().SetRotate(
Gf.Rotation(Gf.Vec3d(0, 0, 1), np.degrees(rot_rpy[2]))
)
rotation_mat = rx * ry * rz
transform = rotation_mat * translation_mat
mesh_prim.AddTransformOp().Set(transform)
stage.GetRootLayer().Export(output_path)
logger.info(f"Exported: {output_path}")
def save_urdf(self, output_path: str) -> None:
"""Save the current URDF tree to file.
Args:
output_path: Path to save the URDF file.
"""
if self._tree is None:
return
if hasattr(ET, "indent"):
ET.indent(self._tree, space=" ", level=0)
self._tree.write(output_path, encoding="utf-8", xml_declaration=True)
logger.info(f"Saved updated URDF to {output_path}")
def entrypoint(cfg: Scene3DGenConfig) -> None:
"""Main entry point for floorplan visualization and scene manipulation.
Args:
cfg: Configuration object with all parameters.
"""
# Initialize collector and parse URDF
collector = UrdfSemanticInfoCollector(
mesh_sample_num=cfg.mesh_sample_num,
ignore_items=cfg.ignore_items,
)
collector.collect(cfg.urdf_path)
# Add asset instance if specified
center = None
if cfg.asset_path is not None:
center = collector.add_instance(
asset_path=cfg.asset_path,
instance_key=cfg.instance_key,
in_room=cfg.in_room,
on_instance=cfg.on_instance,
rotation_rpy=cfg.rotation_rpy,
n_max_attempt=cfg.max_placement_attempts,
place_strategy=cfg.place_strategy,
)
if center is not None:
logger.info(
f"Placed '{cfg.instance_key}' at position: "
f"({center[0]:.3f}, {center[1]:.3f}, {center[2]:.3f})"
)
# Update URDF if requested
if cfg.update_urdf:
urdf_output = cfg.urdf_path.replace(".urdf", "_updated.urdf")
collision_path = cfg.asset_path.replace(
".obj", "_collision.obj"
)
# Use collision mesh only if it exists
if not os.path.exists(collision_path):
collision_path = None
collector.update_urdf_info(
output_path=urdf_output,
instance_key=cfg.instance_key,
visual_mesh_path=cfg.asset_path,
collision_mesh_path=collision_path,
trans_xyz=tuple(center),
rot_rpy=cfg.rotation_rpy,
joint_type="fixed",
)
# Update USD if requested and path is provided
if cfg.update_usd and cfg.usd_path is not None:
usd_output = cfg.usd_path.replace(".usdc", "_updated.usdc")
collector.update_usd_info(
usd_path=cfg.usd_path,
output_path=usd_output,
instance_key=cfg.instance_key,
visual_mesh_path=cfg.asset_path,
trans_xyz=center,
rot_rpy=cfg.rotation_rpy,
)
else:
logger.warning(
f"Failed to place '{cfg.instance_key}' in the scene."
)
# Generate floorplan visualization
FloorplanVisualizer.plot(
collector.rooms,
collector.footprints,
collector.occ_area,
cfg.output_path,
)
if __name__ == "__main__":
config = tyro.cli(Scene3DGenConfig)
entrypoint(config)
"""
python embodied_gen/scripts/room_gen/visualize_floorplan.py \
--urdf_path outputs/rooms/Office_seed68661/urdf/export_scene/scene.urdf \
--output_path outputs/rooms/Office_seed68661/floorplan.png \
--usd_path outputs/rooms_v2/Kitchen_seed0/usd/export_scene/export_scene.usdc \
--asset_path /home/users/xinjie.wang/xinjie/asset3d-gen/outputs/semantics_tasks/task_0059/asset3d/red_apple/result/mesh/red_apple.obj \
--instance_key red_apple \
--in_room kitchen \
--on_instance oven \
--place_strategy top
"""