# Project EmbodiedGen # # Copyright (c) 2025 Horizon Robotics. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. See the License for the specific language governing # permissions and limitations under the License. from __future__ import annotations import logging import os from dataclasses import dataclass, field from datetime import datetime from typing import Literal from shapely.geometry import MultiPolygon, Polygon from embodied_gen.utils.llm_resolve import resolve_instance_with_llm from ..core import ( UrdfSemanticInfoCollector, ) from ..core.collector import ( DEFAULT_BESIDE_DISTANCE, DEFAULT_IGNORE_ITEMS, DEFAULT_MESH_SAMPLE_NUM, DEFAULT_ROTATION_RPY, ) from ..core.visualizer import ( FloorplanVisualizer, ) # Type aliases Geometry = Polygon | MultiPolygon logger = logging.getLogger(__name__) @dataclass class FloorplanConfig: """Configuration for floorplan operations.""" urdf_path: str """Path to the input URDF scene file.""" output_path: str | None = None """Path to save the floorplan visualization image.""" usd_path: str | None = None """Optional path to the USD scene file for USD export.""" asset_path: str | None = None """Optional path to the asset mesh file (.obj).""" instance_key: str = "inserted_object" """Unique key for the added instance.""" in_room: str | None = None """Optional room name to constrain asset placement.""" on_instance: str | None = None """Optional instance name to place the asset on top of (exact key from get_instance_names()).""" beside_instance: str | None = None """Optional instance name to place the asset beside (on floor, near the target).""" beside_distance: float = DEFAULT_BESIDE_DISTANCE """Max distance (meters) from the target instance for beside placement.""" place_strategy: Literal["top", "random"] = "random" """Placement strategy for the asset.""" rotation_rpy: tuple[float, float, float] = DEFAULT_ROTATION_RPY """Rotation in roll-pitch-yaw (radians).""" ignore_items: list[str] = field( default_factory=lambda: list(DEFAULT_IGNORE_ITEMS) ) """List of item name patterns to ignore during parsing.""" mesh_sample_num: int = DEFAULT_MESH_SAMPLE_NUM """Number of points to sample from meshes.""" max_placement_attempts: int = 2000 """Maximum attempts for asset placement.""" update_urdf: bool = True """Whether to update and save the URDF file.""" update_usd: bool = True """Whether to update and save the USD file.""" list_instances: bool = False """If True, print instance and room names then exit (no placement/visualization).""" delete_instance: str | None = None """Optional instance name to delete from the scene (supports fuzzy matching with LLM).""" delete_in_room: str | None = None """Optional room constraint for deletion - only delete if instance is in this room.""" query_instance: str | None = None """Optional instance name to query and return its center coordinates (supports fuzzy matching with LLM).""" output_strategy: Literal["suffix", "overwrite", "timestamp"] = "suffix" """File naming strategy for output files. - "suffix": Add '_updated' suffix (default, non-destructive) - "overwrite": Overwrite original files (use with caution) - "timestamp": Add timestamp suffix (e.g., '_20260311_171500') """ batch_insert_config: str | None = None """Path to JSON config file for batch insertion (3-4x faster than multiple CLI calls). JSON format example: [ { "asset_path": "path/to/chair1.obj", "instance_key": "chair_1", "beside_instance": "table_dining_7178300", "in_room": "dining_room_0_floor" }, { "asset_path": "path/to/chair2.obj", "instance_key": "chair_2", "beside_instance": "table_dining_7178300", "in_room": "dining_room_0_floor" } ] """ class FloorplanManager: """High-level API for floorplan operations. This class provides simplified methods for: - Loading and analyzing URDF scenes - Visualizing floorplans - Inserting objects into scenes - Updating URDF and USD files Example: >>> manager = FloorplanManager(urdf_path="scene.urdf", usd_path="scene.usdc") >>> manager.visualize(output_path="floorplan.png") >>> position = manager.insert_object( ... asset_path="chair.obj", ... instance_key="chair_1", ... in_room="kitchen" ... ) # URDF/USD are updated automatically after insert """ def __init__( self, urdf_path: str, usd_path: str | None = None, mesh_sample_num: int = DEFAULT_MESH_SAMPLE_NUM, ignore_items: list[str] | None = None, output_strategy: Literal[ "suffix", "overwrite", "timestamp" ] = "suffix", ) -> None: """Initialize the floorplan manager. Args: urdf_path: Path to the URDF file. usd_path: Optional path to the USD file for scene updates. mesh_sample_num: Number of points to sample from meshes. ignore_items: List of item name patterns to ignore. output_strategy: File naming strategy for output files. """ self.urdf_path = urdf_path self.usd_path = usd_path self.output_strategy = output_strategy self.collector = UrdfSemanticInfoCollector( mesh_sample_num=mesh_sample_num, ignore_items=ignore_items, ) self.collector.collect(urdf_path) self.pending_instance_data: dict | None = None def _get_output_path( self, input_path: str, custom_output_path: str | None = None, ) -> str: """Generate output path based on the naming strategy. Smart file naming strategy: - "suffix" (default): * If input already ends with "_updated", overwrite it (continuous operations) * Otherwise, add "_updated" suffix (first operation) - "timestamp": Add timestamp suffix for unique versioning - "overwrite": Always overwrite the input file Args: input_path: Original input file path. custom_output_path: Optional custom output path (highest priority). Returns: Generated output path based on strategy. """ if custom_output_path is not None: return custom_output_path name, ext = os.path.splitext(input_path) if self.output_strategy == "overwrite": return input_path elif self.output_strategy == "timestamp": timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") return f"{name}_{timestamp}{ext}" else: # "suffix" (default) - smart continuous operation support # If input already has "_updated" suffix, overwrite it (continuous operation) if name.endswith("_updated"): return input_path # Otherwise, add "_updated" suffix (first operation) else: return f"{name}_updated{ext}" def visualize( self, output_path: str, ) -> None: """Generate and save a floorplan visualization. Args: output_path: Path to save the output image. """ FloorplanVisualizer.plot( self.collector.rooms, self.collector.footprints, self.collector.occ_area, output_path, ) logger.info(f"✅ Floorplan visualization saved to {output_path}") def insert_object( self, asset_path: str, instance_key: str, in_room: str | None = None, on_instance: str | None = None, beside_instance: str | None = None, beside_distance: float = DEFAULT_BESIDE_DISTANCE, rotation_rpy: tuple[float, float, float] = DEFAULT_ROTATION_RPY, n_max_attempt: int = 2000, place_strategy: Literal["top", "random"] = "random", ) -> list[float] | None: """Insert an object into the scene with automatic placement. Args: asset_path: Path to the asset mesh file (.obj). instance_key: Unique key for the new instance. in_room: Optional room name to constrain placement. on_instance: Optional instance name to place on top of. beside_instance: Optional instance name to place beside (on floor). beside_distance: Max distance from target for beside placement. rotation_rpy: Initial rotation in roll-pitch-yaw. n_max_attempt: Maximum placement attempts. place_strategy: Either "top" or "random". Returns: List [x, y, z] of the placed instance center, or None if failed. """ center = self.collector.add_instance( asset_path=asset_path, instance_key=instance_key, in_room=in_room, on_instance=on_instance, beside_instance=beside_instance, beside_distance=beside_distance, rotation_rpy=rotation_rpy, n_max_attempt=n_max_attempt, place_strategy=place_strategy, ) if center is not None: self.pending_instance_data = { "asset_path": asset_path, "instance_key": instance_key, "center": center, "rotation_rpy": rotation_rpy, } self.update_scene() return center def batch_insert_objects( self, objects: list[dict], defer_update: bool = False, ) -> list[list[float] | None]: """Batch insert multiple objects into the scene efficiently. Args: objects: List of object configs, each containing: asset_path: Path to the asset mesh file (.obj). instance_key: Unique key for the new instance. in_room: Optional room name to constrain placement. on_instance: Optional instance name to place on top of. beside_instance: Optional instance name to place beside. beside_distance: Max distance from target (default: 0.5m). rotation_rpy: Initial rotation (default: (0, 0, 0)). place_strategy: Either "top" or "random" (default: "random"). defer_update: If True, don't update URDF/USD after each insertion. Useful when inserting many objects at once. Returns: List of centers [x, y, z] for each inserted object, or None if failed. Example: >>> objects = [ ... {"asset_path": "chair1.obj", ... "instance_key": "chair_1", ... "beside_instance": "table"}, ... ] >>> centers = manager.batch_insert_objects(objects) """ centers = [] usd_source = self.usd_path for i, obj_config in enumerate(objects, 1): logger.info( f"[{i}/{len(objects)}] Inserting '{obj_config.get('instance_key', 'unknown')}'..." ) center = self.collector.add_instance( asset_path=obj_config["asset_path"], instance_key=obj_config["instance_key"], in_room=obj_config.get("in_room"), on_instance=obj_config.get("on_instance"), beside_instance=obj_config.get("beside_instance"), beside_distance=obj_config.get( "beside_distance", DEFAULT_BESIDE_DISTANCE ), rotation_rpy=obj_config.get( "rotation_rpy", DEFAULT_ROTATION_RPY ), n_max_attempt=obj_config.get("n_max_attempt", 2000), place_strategy=obj_config.get("place_strategy", "random"), ) if center is not None: # Store instance data for later update collision_path = obj_config["asset_path"].replace( ".obj", "_collision.obj" ) if not os.path.exists(collision_path): collision_path = None # Update URDF incrementally if self.urdf_path and not defer_update: urdf_out = self._get_output_path(self.urdf_path) self.collector.update_urdf_info( output_path=urdf_out, instance_key=obj_config["instance_key"], visual_mesh_path=obj_config["asset_path"], collision_mesh_path=collision_path, trans_xyz=tuple(center), rot_rpy=obj_config.get( "rotation_rpy", DEFAULT_ROTATION_RPY ), joint_type="fixed", ) # Update USD incrementally if self.usd_path and not defer_update: usd_out = self._get_output_path(self.usd_path) self.collector.update_usd_info( usd_path=usd_source, output_path=usd_out, instance_key=obj_config["instance_key"], visual_mesh_path=obj_config["asset_path"], trans_xyz=center, rot_rpy=obj_config.get( "rotation_rpy", DEFAULT_ROTATION_RPY ), ) usd_source = usd_out logger.info(f"✅ Placed at {center}") else: logger.warning(f"❌ Failed to place object") centers.append(center) return centers def update_scene( self, urdf_output_path: str | None = None, usd_output_path: str | None = None, ) -> None: """Update URDF and/or USD with inserted instances. Updates URDF if self.urdf_path is set, USD if self.usd_path is set. Both are updated when both paths are set. No-op when no instance was inserted. Note: USD updates require Blender (bpy) to convert .obj to .usdc format. Args: urdf_output_path: Optional custom path for URDF output. usd_output_path: Optional custom path for USD output. Raises: ValueError: If no instance has been inserted. """ if self.pending_instance_data is None: raise ValueError( "No instance to update. Call insert_object() first." ) data = self.pending_instance_data collision_path = data["asset_path"].replace(".obj", "_collision.obj") if not os.path.exists(collision_path): collision_path = None if self.urdf_path: urdf_out = self._get_output_path(self.urdf_path, urdf_output_path) self.collector.update_urdf_info( output_path=urdf_out, instance_key=data["instance_key"], visual_mesh_path=data["asset_path"], collision_mesh_path=collision_path, trans_xyz=tuple(data["center"]), rot_rpy=data["rotation_rpy"], joint_type="fixed", ) if self.usd_path: usd_out = self._get_output_path(self.usd_path, usd_output_path) self.collector.update_usd_info( usd_path=self.usd_path, output_path=usd_out, instance_key=data["instance_key"], visual_mesh_path=data["asset_path"], trans_xyz=data["center"], rot_rpy=data["rotation_rpy"], ) def delete_object( self, instance_key: str, in_room: str | None = None, urdf_output_path: str | None = None, usd_output_path: str | None = None, ) -> bool: """Delete an object from the scene. Args: instance_key: Exact instance name to delete. in_room: Optional room constraint - only delete if instance is in this room. urdf_output_path: Optional custom path for URDF output. usd_output_path: Optional custom path for USD output. Returns: True if deletion succeeded, False otherwise. """ success = self.collector.remove_instance( instance_key=instance_key, in_room=in_room, ) if success: # Update URDF if self.urdf_path: urdf_out = self._get_output_path( self.urdf_path, urdf_output_path ) self.collector.save_urdf(urdf_out) # Update USD if self.usd_path: usd_out = self._get_output_path(self.usd_path, usd_output_path) self.collector.remove_usd_instance( usd_path=self.usd_path, output_path=usd_out, instance_key=instance_key, ) return success def get_instance_names(self) -> list[str]: """Get list of instance names in the scene. Returns: List of instance names. """ return [ k for k in self.collector.instances.keys() if k != "walls" and "floor" not in k.lower() ] def get_room_names(self) -> list[str]: """Get list of room names in the scene. Returns: List of room names. """ return list(self.collector.rooms.keys()) def get_instance_names_in_room(self, in_room: str) -> list[str]: """Get instance names that are spatially inside a given room. Buffers the room polygon slightly to handle mesh-sampling precision. Args: in_room: Exact room key (must exist in get_room_names()). Returns: List of instance names within the room. """ room_poly = self.collector.rooms.get(in_room) if room_poly is None: return self.get_instance_names() room_buffered = room_poly.buffer(0.1) all_names = self.get_instance_names() return [ k for k in all_names if room_buffered.contains( self.collector.instances[k].representative_point() ) ] def resolve_on_instance( self, on_instance: str, gpt_client: object | None = None, ) -> str | None: r"""Resolve on_instance to an exact key (for placement). If on_instance is already in get_instance_names(), return it. Otherwise if gpt_client is provided, use LLM to resolve user description (e.g. \"柜子\", \"书柜\") to one exact instance key. Args: on_instance: Exact instance key or semantic description. gpt_client: Optional GPT client for semantic resolve (e.g. GPT_CLIENT). Returns: Exact instance key, or None if not found / LLM returned NONE. """ names = self.get_instance_names() if on_instance in names: return on_instance if gpt_client is not None: return resolve_instance_with_llm( gpt_client, names, on_instance # type: ignore[arg-type] ) return None def resolve_in_room( self, in_room: str, gpt_client: object | None = None, ) -> str | None: r"""Resolve in_room to an exact room name (for placement). If in_room is already in get_room_names(), return it. Otherwise if gpt_client is provided, use LLM to resolve user description (e.g. \"kitchen\", \"the place for cooking\") to one exact room name. Args: in_room: Exact room name or semantic description. gpt_client: Optional GPT client for semantic resolve (e.g. GPT_CLIENT). Returns: Exact room name, or None if not found / LLM returned NONE. """ names = self.get_room_names() if in_room in names: return in_room if gpt_client is not None: return resolve_instance_with_llm( gpt_client, names, in_room # type: ignore[arg-type] ) return None def resolve_beside_instance( self, beside_instance: str, gpt_client: object | None = None, in_room: str | None = None, ) -> str | None: r"""Resolve beside_instance to an exact key (for beside placement). If beside_instance is already in get_instance_names(), return it. Otherwise if gpt_client is provided, use LLM to resolve user description (e.g. "桌子", "沙发") to one exact instance key. When `in_room` is given, only instances spatially inside that room are considered as candidates. Args: beside_instance: Exact instance key or semantic description. gpt_client: Optional GPT client for semantic resolve. in_room: Optional resolved room key to restrict candidate scope. Returns: Exact instance key, or None if not found / LLM returned NONE. """ if in_room is not None: names = self.get_instance_names_in_room(in_room) else: names = self.get_instance_names() if beside_instance in names: return beside_instance # Substring matching as fallback query_lower = beside_instance.lower() matches = [n for n in names if query_lower in n.lower()] if len(matches) == 1: logger.info( "Substring match: '%s' -> '%s'", beside_instance, matches[0] ) return matches[0] elif len(matches) > 1: logger.warning( "Multiple substring matches for '%s': %s. Using '%s'. " "Use exact name or LLM for better matching.", beside_instance, matches, matches[0], ) return matches[0] if gpt_client is not None: return resolve_instance_with_llm( gpt_client, names, beside_instance # type: ignore[arg-type] ) return None def resolve_delete_instance( self, delete_instance: str, gpt_client: object | None = None, in_room: str | None = None, ) -> str | None: r"""Resolve delete_instance to an exact key (for deletion). Similar to resolve_beside_instance but specifically for deletion. If delete_instance is already in get_instance_names(), return it. Otherwise if gpt_client is provided, use LLM to resolve user description (e.g. "桌子", "沙发") to one exact instance key. When `in_room` is given, only instances spatially inside that room are considered as candidates. Args: delete_instance: Exact instance key or semantic description. gpt_client: Optional GPT client for semantic resolve. in_room: Optional resolved room key to restrict candidate scope. Returns: Exact instance key, or None if not found / LLM returned NONE. """ if in_room is not None: names = self.get_instance_names_in_room(in_room) else: names = self.get_instance_names() if delete_instance in names: return delete_instance # Substring matching as fallback query_lower = delete_instance.lower() matches = [n for n in names if query_lower in n.lower()] if len(matches) == 1: logger.info( "Substring match: '%s' -> '%s'", delete_instance, matches[0] ) return matches[0] elif len(matches) > 1: logger.warning( "Multiple substring matches for '%s': %s. Using '%s'. " "Use exact name or LLM for better matching.", delete_instance, matches, matches[0], ) return matches[0] if gpt_client is not None: return resolve_instance_with_llm( gpt_client, names, delete_instance # type: ignore[arg-type] ) return None def query_instance_center( self, instance_key: str, ) -> list[float] | None: """Query the center coordinates of an instance. Args: instance_key: Exact instance name to query. Returns: List [x, y, z] of the instance center, or None if not found. """ return self.collector.get_instance_center(instance_key) def resolve_and_query_instance( self, query_instance: str, gpt_client: object | None = None, ) -> tuple[str | None, list[float] | None]: r"""Resolve instance name and return its center coordinates. Combines fuzzy matching with coordinate query. If query_instance is already in get_instance_names(), return its center. Otherwise if gpt_client is provided, use LLM to resolve user description (e.g. "床", "沙发") to one exact instance key. Args: query_instance: Exact instance key or semantic description. gpt_client: Optional GPT client for semantic resolve. Returns: Tuple of (resolved_instance_name, center_coordinates), or (None, None) if not found. """ names = self.get_instance_names() if query_instance in names: center = self.query_instance_center(query_instance) return query_instance, center # Substring matching as fallback query_lower = query_instance.lower() matches = [n for n in names if query_lower in n.lower()] if len(matches) == 1: logger.info( "Substring match: '%s' -> '%s'", query_instance, matches[0] ) center = self.query_instance_center(matches[0]) return matches[0], center elif len(matches) > 1: logger.warning( "Multiple substring matches for '%s': %s. Using '%s'. " "Use exact name or LLM for better matching.", query_instance, matches, matches[0], ) center = self.query_instance_center(matches[0]) return matches[0], center if gpt_client is not None: resolved = resolve_instance_with_llm( gpt_client, names, query_instance # type: ignore[arg-type] ) if resolved: center = self.query_instance_center(resolved) return resolved, center return None, None def get_occupied_area(self) -> Geometry: """Get the union of all occupied areas. Returns: Shapely geometry representing occupied areas. """ return self.collector.occ_area def get_floor_union(self) -> Geometry: """Get the union of all floor areas. Returns: Shapely geometry representing floor areas. """ return self.collector.floor_union def visualize_floorplan( urdf_path: str, output_path: str, mesh_sample_num: int = DEFAULT_MESH_SAMPLE_NUM, ignore_items: list[str] | None = None, ) -> None: """Quick function to visualize a floorplan. Args: urdf_path: Path to the URDF file. output_path: Path to save the output image. mesh_sample_num: Number of points to sample from meshes. ignore_items: List of item name patterns to ignore. """ manager = FloorplanManager( urdf_path=urdf_path, mesh_sample_num=mesh_sample_num, ignore_items=ignore_items, ) manager.visualize(output_path=output_path) def insert_object_to_scene( urdf_path: str, asset_path: str, instance_key: str, output_path: str, usd_path: str | None = None, in_room: str | None = None, on_instance: str | None = None, beside_instance: str | None = None, beside_distance: float = DEFAULT_BESIDE_DISTANCE, place_strategy: Literal["top", "random"] = "random", rotation_rpy: tuple[float, float, float] = DEFAULT_ROTATION_RPY, ) -> list[float] | None: """Quick function to insert an object and generate floorplan. Note: USD updates require Blender (bpy) to convert .obj to .usdc format. Args: urdf_path: Path to the URDF file. asset_path: Path to the asset mesh file (.obj). instance_key: Unique key for the new instance. output_path: Path to save the floorplan image. usd_path: Optional path to the USD file (requires Blender). in_room: Optional room name to constrain placement. on_instance: Optional instance name to place on top of. beside_instance: Optional instance name to place beside (on floor). beside_distance: Max distance for beside placement (meters). place_strategy: Either "top" or "random". rotation_rpy: Initial rotation in roll-pitch-yaw. Returns: List [x, y, z] of the placed instance center, or None if failed. """ manager = FloorplanManager(urdf_path=urdf_path, usd_path=usd_path) center = manager.insert_object( asset_path=asset_path, instance_key=instance_key, in_room=in_room, on_instance=on_instance, beside_instance=beside_instance, beside_distance=beside_distance, rotation_rpy=rotation_rpy, place_strategy=place_strategy, ) if center is not None: manager.visualize(output_path=output_path) return center def delete_object_from_scene( urdf_path: str, instance_key: str, in_room: str | None = None, output_path: str | None = None, ) -> bool: """Quick function to delete an object from scene. Args: urdf_path: Path to the URDF file. instance_key: Exact instance name to delete. in_room: Optional room constraint - only delete if instance is in this room. output_path: Optional path to save the floorplan image after deletion. Returns: True if deletion succeeded, False otherwise. """ manager = FloorplanManager(urdf_path=urdf_path) success = manager.delete_object( instance_key=instance_key, in_room=in_room, ) if success and output_path is not None: manager.visualize(output_path=output_path) return success def query_instance_position( urdf_path: str, instance_key: str, ) -> list[float] | None: """Quick function to query instance center coordinates. Args: urdf_path: Path to the URDF file. instance_key: Exact instance name to query. Returns: List [x, y, z] of the instance center, or None if not found. """ manager = FloorplanManager(urdf_path=urdf_path) return manager.query_instance_center(instance_key)