xinjjj's picture
chore(space): sync EmbodiedGen texture space updates
0562108
# Project EmbodiedGen
#
# Copyright (c) 2025 Horizon Robotics. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
from __future__ import annotations
import json
import logging
import sys
import warnings
import tyro
from ..api.floorplan_api import (
FloorplanConfig,
FloorplanManager,
)
warnings.filterwarnings("ignore", category=RuntimeWarning)
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s",
level=logging.INFO,
force=True,
)
logger = logging.getLogger(__name__)
def _get_gpt_client() -> object | None:
"""Lazy-import GPT_CLIENT for semantic --on_instance resolution."""
try:
from embodied_gen.utils.gpt_clients import GPT_CLIENT
return GPT_CLIENT
except Exception:
return None
def entrypoint(cfg: FloorplanConfig) -> None:
"""Main entry point for floorplan visualization and scene manipulation.
Args:
cfg: Configuration object with all parameters.
"""
manager = FloorplanManager(
urdf_path=cfg.urdf_path,
usd_path=cfg.usd_path,
mesh_sample_num=cfg.mesh_sample_num,
ignore_items=cfg.ignore_items,
output_strategy=cfg.output_strategy,
)
# List instances/rooms and exit if requested
if cfg.list_instances:
names = manager.get_instance_names()
rooms = manager.get_room_names()
logger.info(f"instance_names: {names}")
logger.info(f"room_names: {rooms}")
return
# Batch insertion
if cfg.batch_insert_config is not None:
logger.info(
f"Loading batch insert config from {cfg.batch_insert_config}"
)
with open(cfg.batch_insert_config, 'r') as f:
objects = json.load(f)
logger.info(f"Batch inserting {len(objects)} objects...")
centers = manager.batch_insert_objects(objects)
success_count = len([c for c in centers if c is not None])
logger.info(
f"✅ Successfully placed {success_count}/{len(objects)} objects"
)
if success_count < len(objects):
logger.warning(
f"⚠️ Failed to place {len(objects) - success_count} objects"
)
sys.exit(1)
# Generate floorplan visualization if requested
if cfg.output_path is not None:
manager.visualize(output_path=cfg.output_path)
return
gpt_client = _get_gpt_client()
on_instance = cfg.on_instance
if on_instance is not None:
resolved = manager.resolve_on_instance(
on_instance, gpt_client=gpt_client
)
if resolved is None:
logger.error(
"No object matched \"%s\"。Current scene instance name: %s。",
on_instance,
manager.get_instance_names(),
)
sys.exit(1)
on_instance = resolved
if resolved != cfg.on_instance:
logger.info("\"%s\" -> \"%s\"", cfg.on_instance, resolved)
in_room = cfg.in_room
if in_room is not None:
resolved = manager.resolve_in_room(in_room, gpt_client=gpt_client)
if resolved is None:
logger.error(
"No room matched \"%s\"。Current scene room names: %s。",
in_room,
manager.get_room_names(),
)
sys.exit(1)
in_room = resolved
if resolved != cfg.in_room:
logger.info("\"%s\" -> \"%s\"", cfg.in_room, resolved)
beside_instance = cfg.beside_instance
if beside_instance is not None:
resolved = manager.resolve_beside_instance(
beside_instance, gpt_client=gpt_client, in_room=in_room
)
if resolved is None:
candidates = (
manager.get_instance_names_in_room(in_room)
if in_room
else manager.get_instance_names()
)
logger.error(
"No object matched \"%s\"。Current %sinstance name: %s。",
beside_instance,
f"room '{in_room}' " if in_room else "",
candidates,
)
sys.exit(1)
beside_instance = resolved
if resolved != cfg.beside_instance:
logger.info("\"%s\" -> \"%s\"", cfg.beside_instance, resolved)
delete_instance = cfg.delete_instance
delete_in_room = cfg.delete_in_room
if delete_instance is not None:
# Resolve room constraint if provided
if delete_in_room is not None:
resolved_room = manager.resolve_in_room(
delete_in_room, gpt_client=gpt_client
)
if resolved_room is None:
logger.error(
"No room matched \"%s\"。Current scene room names: %s。",
delete_in_room,
manager.get_room_names(),
)
sys.exit(1)
delete_in_room = resolved_room
if resolved_room != cfg.delete_in_room:
logger.info(
"\"%s\" -> \"%s\"", cfg.delete_in_room, resolved_room
)
# Resolve delete_instance with fuzzy matching
resolved = manager.resolve_delete_instance(
delete_instance, gpt_client=gpt_client, in_room=delete_in_room
)
if resolved is None:
candidates = (
manager.get_instance_names_in_room(delete_in_room)
if delete_in_room
else manager.get_instance_names()
)
logger.error(
"No object matched \"%s\"。Current %sinstance name: %s。",
delete_instance,
f"room '{delete_in_room}' " if delete_in_room else "",
candidates,
)
sys.exit(1)
delete_instance = resolved
if resolved != cfg.delete_instance:
logger.info("\"%s\" -> \"%s\"", cfg.delete_instance, resolved)
# Perform deletion
success = manager.delete_object(
instance_key=delete_instance,
in_room=delete_in_room,
)
if success:
logger.info(
f"✅ Successfully deleted '{delete_instance}' from scene."
)
else:
logger.error(f"❌ Failed to delete '{delete_instance}'.")
sys.exit(1)
# Query instance position
query_instance = cfg.query_instance
if query_instance is not None:
# Resolve instance with fuzzy matching
resolved_name, center = manager.resolve_and_query_instance(
query_instance, gpt_client=gpt_client
)
if resolved_name is None or center is None:
logger.error(
"No object matched \"%s\"。Current instance names: %s。",
query_instance,
manager.get_instance_names(),
)
sys.exit(1)
if resolved_name != query_instance:
logger.info("\"%s\" -> \"%s\"", query_instance, resolved_name)
logger.info(
f"📍 Instance '{resolved_name}' center: "
f"({center[0]:.4f}, {center[1]:.4f}, {center[2]:.4f})"
)
# Add asset instance if specified
center = None
if cfg.asset_path is not None:
center = manager.insert_object(
asset_path=cfg.asset_path,
instance_key=cfg.instance_key,
in_room=in_room,
on_instance=on_instance,
beside_instance=beside_instance,
beside_distance=cfg.beside_distance,
rotation_rpy=cfg.rotation_rpy,
n_max_attempt=cfg.max_placement_attempts,
place_strategy=cfg.place_strategy,
)
if center is not None:
logger.info(
f"Successfully placed '{cfg.instance_key}' at "
f"({center[0]:.3f}, {center[1]:.3f}, {center[2]:.3f})"
)
else:
logger.error(
f"❌ Failed to place '{cfg.instance_key}' in the scene."
)
sys.exit(1)
# Generate floorplan visualization
if cfg.output_path is not None:
manager.visualize(output_path=cfg.output_path)
if __name__ == "__main__":
config = tyro.cli(FloorplanConfig)
entrypoint(config)