Spaces:
Paused
Paused
| import logging | |
| log = logging.getLogger(__name__) | |
| import random | |
| from abc import ABC, abstractmethod | |
| from typing import Tuple, Optional, Dict | |
| from omegaconf import DictConfig, OmegaConf | |
| from dataclasses import dataclass, asdict | |
| from pathlib import Path | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| from dsynth.scene_gen.layouts.random_connectivity import add_many_zones, get_orientation | |
| from dsynth.scene_gen.hydra_configs import LayoutGenType, LayoutContGenType | |
| from dsynth.scene_gen.utils import RectFixture, check_collisions, flatten_dict | |
| import dsynth.scene_gen.layouts.tensor_field as tfield | |
| from dsynth.assets.asset import load_assets_lib | |
| class LayoutGeneratorBase(ABC): | |
| def __init__(self, | |
| sizes_nm: Tuple[int], | |
| start_coords: Tuple[int] = (0, 0), | |
| rng: random.Random = random.Random(42), | |
| max_tries = 5, | |
| ): | |
| assert len(sizes_nm) == 2 | |
| assert len(start_coords) == 2 | |
| self.sizes_nm = sizes_nm | |
| self.start_coords = start_coords | |
| self.rng = rng | |
| self.max_tries = max_tries | |
| def __call__(self, *args, **kwargs): | |
| pass | |
| class RandomConnectedZones(LayoutGeneratorBase): | |
| def __call__(self, *args, zones_dict: Dict = dict(), **kwargs): | |
| n, m = self.sizes_nm | |
| x, y = self.start_coords | |
| for n_try in range(self.max_tries): | |
| mat = [[0] * m for _ in range(n)] | |
| if n_try > 0: | |
| log.error(f"Can't generate! Retry...[{n_try + 1}/{self.max_tries}]") | |
| is_gen, room = add_many_zones((x, y), mat, zones_dict, self.rng) | |
| if is_gen: | |
| break | |
| if not is_gen: | |
| log.error(f"Can't generate! Layout generation failed!") | |
| return None | |
| rotations = get_orientation((x, y), room) | |
| return { | |
| "darkstore": room, | |
| "rotations": rotations | |
| } | |
| class FixedLayout(LayoutGeneratorBase): | |
| def __call__(self, *args, zones_dict: Dict = dict(), darkstore_arrangement_cfg: Dict = dict(), **kwargs): | |
| assert 'layout' in darkstore_arrangement_cfg | |
| assert 'rotations' in darkstore_arrangement_cfg | |
| n, m = self.sizes_nm | |
| darkstore = [[0] * m for _ in range(n)] | |
| rotations = [[0] * m for _ in range(n)] | |
| darkstore_dict = OmegaConf.to_container(darkstore_arrangement_cfg['layout'], resolve = True) | |
| rotations_dict = OmegaConf.to_container(darkstore_arrangement_cfg['rotations'], resolve = True) | |
| assert n == len(darkstore_dict.keys()) | |
| assert n == len(rotations_dict.keys()) | |
| # convert to list of lists | |
| for i in darkstore_dict.keys(): | |
| assert len(darkstore_dict[i]) == m | |
| assert len(rotations_dict[i]) == m | |
| darkstore[i] = darkstore_dict[i] | |
| rotations[i] = rotations_dict[i] | |
| return { | |
| "darkstore": darkstore, | |
| "rotations": rotations | |
| } | |
| LAYOUT_TYPES_TO_CLS = { | |
| LayoutGenType.CONNECTED_ZONES: RandomConnectedZones, | |
| LayoutGenType.DEFAULT: None, | |
| LayoutGenType.FIXED_LAYOUT: FixedLayout, | |
| } | |
| class TensorFieldLayout(LayoutGeneratorBase): | |
| def __init__(self, | |
| name, | |
| cfg, | |
| rng: random.Random, | |
| ): | |
| self.cfg = cfg | |
| self.rng = rng | |
| self.name = name | |
| self.product_assets_lib = flatten_dict(load_assets_lib(cfg.assets, disable_caching=True), sep='.') | |
| self.size_x = cfg.ds_continuous.size_x | |
| self.size_y = cfg.ds_continuous.size_y | |
| self.max_tries = cfg.ds_continuous.max_tries | |
| self.all_fixtures = [] | |
| self.inactive_wall_shelvings_occupancy_width = cfg.ds_continuous.inactive_wall_shelvings_occupancy_width | |
| self.inactive_shelvings_occupancy_width = cfg.ds_continuous.inactive_shelvings_occupancy_width | |
| self.inactive_shelvings_skip_prob = cfg.ds_continuous.inactive_shelvings_skip_prob | |
| self.passage_width = cfg.ds_continuous.passage_width | |
| self.fixtures_occupancy_width = cfg.ds_continuous.fixtures_occupancy_width | |
| def _all_fixtures_list(self): | |
| all_fixtures = [] | |
| for key, val in self.all_fixtures.items(): | |
| all_fixtures.extend(val) | |
| return all_fixtures | |
| def __call__(self, *args, **kwargs): | |
| self.all_fixtures = { | |
| "service": [], | |
| "scene_fixtures": [], | |
| "inactive_wall_shelvings": [], | |
| "active_wall_shelvings": [], | |
| "inactive_shelvings": [], | |
| "active_shelvings": [] | |
| } | |
| self.all_fixtures['service'].append( | |
| RectFixture('blocked_area', | |
| x=self.size_x, | |
| y=self.size_y - 2, | |
| l=2, w=2, ) | |
| ) | |
| self.place_fixtures() | |
| # self.place_inactive_wall_shelvings() | |
| res = self.place_wall_shelvings() | |
| if not res: | |
| return None | |
| res = self.place_shelvings() | |
| if not res: | |
| return None | |
| return self.rect_fixture2dict(self.all_fixtures) | |
| def rect_fixture2dict(self, rect_fixture_dict: list): | |
| return {key: [asdict(r) for r in rect_fixture_list] for key, rect_fixture_list in rect_fixture_dict.items()} | |
| def place_fixtures(self): | |
| scene_fixtures_list = self.cfg.ds_continuous.scene_fixtures_list | |
| for asset_name in scene_fixtures_list: | |
| rect = RectFixture.make_from_asset(self.product_assets_lib[asset_name], name=f'scene_fixture:{asset_name}', | |
| occupancy_width=self.fixtures_occupancy_width, | |
| x=0., y=0., asset_name=asset_name) | |
| success = False | |
| for _ in range(self.max_tries): | |
| if rect.is_valid(self.size_x, self.size_y) and not check_collisions(rect, self._all_fixtures_list()): | |
| self.all_fixtures['scene_fixtures'].append(rect) | |
| success = True | |
| break | |
| rect.x = self.rng.uniform(0.0, self.size_x) | |
| rect.y = self.rng.uniform(0.0, self.size_y) | |
| if not success: | |
| log.warning('Failed to place scene fixture') | |
| return True | |
| def place_wall_shelvings(self): | |
| half_perimeter = self.size_x + self.size_y | |
| perimeter_points = np.linspace(0, half_perimeter, 40) | |
| self.rng.shuffle(perimeter_points) | |
| active_wall_shelvings_list = self.cfg.ds_continuous.active_wall_shelvings_list | |
| for wall_active_shelving in active_wall_shelvings_list: | |
| rect = RectFixture.make_from_asset( | |
| self.product_assets_lib[wall_active_shelving.shelf_asset], name=f'{self.name}_{wall_active_shelving.name}', | |
| occupancy_width=self.inactive_wall_shelvings_occupancy_width, | |
| asset_name=wall_active_shelving.shelf_asset | |
| ) | |
| success = False | |
| for point in perimeter_points: | |
| rect = self._place_rect_at_point(rect, point) | |
| if rect.is_valid(self.size_x, self.size_y) and not check_collisions(rect, self._all_fixtures_list()): | |
| success = True | |
| self.all_fixtures['active_wall_shelvings'].append(rect) | |
| break | |
| if not success: | |
| return False | |
| inactive_wall_shelvings_list = self.cfg.ds_continuous.inactive_wall_shelvings_list | |
| for asset_name in inactive_wall_shelvings_list: | |
| rect = RectFixture.make_from_asset(self.product_assets_lib[asset_name], name=f'inactive_wall_shelving:{asset_name}', | |
| occupancy_width=self.inactive_wall_shelvings_occupancy_width, | |
| x=0., y=0., asset_name=asset_name) | |
| success = False | |
| for point in perimeter_points: | |
| rect = self._place_rect_at_point(rect, point) | |
| if rect.is_valid(self.size_x, self.size_y) and not check_collisions(rect, self._all_fixtures_list()): | |
| success = True | |
| self.all_fixtures['inactive_wall_shelvings'].append(rect) | |
| break | |
| if not success: | |
| log.warning('Failed to place scene fixture') | |
| return True | |
| def _place_rect_at_point(self, rect, point): | |
| if point <= self.size_x: | |
| x = point | |
| y = self.size_y | |
| rect.orientation = 'horizontal' | |
| elif self.size_x < point <= self.size_x + self.size_y: | |
| x = 0 | |
| y = point - self.size_x | |
| rect.orientation = 'vertical' | |
| else: | |
| raise RuntimeError | |
| if rect.orientation == 'horizontal': | |
| if y == self.size_y: | |
| y -= rect.w / 2 + rect.occupancy_width + 1e-2 | |
| if rect.orientation == 'vertical': | |
| if x == 0: | |
| x += rect.w / 2 + rect.occupancy_width + 1e-2 | |
| rect.x = x | |
| rect.y = y | |
| return rect | |
| def place_active_wall_shelvings(self): | |
| active_wall_shelvings = self.cfg.ds_continuous.active_wall_shelvings_list | |
| return [] | |
| def compose_tensor_field(self, decay): | |
| tf = tfield.TensorField(self.size_x, self.size_y, decay=decay) | |
| tf.add_boundary() | |
| tf.add_fixture_list(self._all_fixtures_list()) | |
| return tf | |
| def place_shelvings(self): | |
| inactive_shelvings_list = self.cfg.ds_continuous.inactive_shelvings_list | |
| assert len(self.cfg.ds_continuous.active_shelvings_list) <= 1 | |
| if len(self.cfg.ds_continuous.active_shelvings_list) > 0: | |
| active_fixture = self.cfg.ds_continuous.active_shelvings_list[0] | |
| if active_fixture['shelf_asset'] not in inactive_shelvings_list: | |
| inactive_shelvings_list.append(active_fixture['shelf_asset']) | |
| else: | |
| active_fixture = None | |
| sample_rects = [] | |
| for asset_name in inactive_shelvings_list: | |
| rect = RectFixture.make_from_asset(self.product_assets_lib[asset_name], name=f'inactive_shelving:{asset_name}', | |
| occupancy_width=self.inactive_shelvings_occupancy_width, | |
| x=0., y=0., asset_name=asset_name) | |
| sample_rects.append(rect) | |
| success = False | |
| for _ in range(self.max_tries): | |
| self.rng.shuffle(sample_rects) | |
| tf = self.compose_tensor_field(decay = self.cfg.ds_continuous.tf_blending_decay) | |
| inactive_shelvings = tfield.place_shelves(tf, | |
| sample_rects, | |
| self.rng, | |
| passage_width=self.passage_width, | |
| skip_shelf_prob=self.inactive_shelvings_skip_prob, | |
| scene_fixtures=self._all_fixtures_list() | |
| ) | |
| if active_fixture is None: | |
| success = True | |
| break | |
| elif active_fixture['shelf_asset'] in [r.asset_name for r in inactive_shelvings]: | |
| success = True | |
| break | |
| if not success: | |
| return False | |
| self.all_fixtures['inactive_shelvings'] = inactive_shelvings | |
| fig, ax = tf.vis_field() | |
| for fixture in self._all_fixtures_list(): | |
| fixture.draw(ax[1], show_occupancy=False) | |
| fig.savefig(Path(self.cfg.ds_continuous.output_dir) / f'{self.name}_tf.jpg') | |
| if active_fixture is not None: | |
| self.all_fixtures['active_shelvings'].append(RectFixture.make_from_asset( | |
| self.product_assets_lib[active_fixture.shelf_asset], name=f'{self.name}_{active_fixture.name}', | |
| asset_name=active_fixture.shelf_asset | |
| )) | |
| return True | |
| class TensorFieldHorisontalLayout(TensorFieldLayout): | |
| def compose_tensor_field(self, decay): | |
| tf = tfield.TensorField(self.size_x, self.size_y, decay=decay) | |
| tf.add_line([[0, 0], [self.size_x, 0]], sample_step=0.5) | |
| tf.add_line([[0, self.size_y], [self.size_x, self.size_y]], sample_step=0.5) | |
| return tf | |
| def __call__(self, *args, **kwargs): | |
| self.all_fixtures = { | |
| "service": [], | |
| "scene_fixtures": [], | |
| "inactive_wall_shelvings": [], | |
| "active_wall_shelvings": [], | |
| "inactive_shelvings": [], | |
| "active_shelvings": [] | |
| } | |
| self.all_fixtures['service'].append( | |
| RectFixture('blocked_area', | |
| x=self.size_x, | |
| y=self.size_y - 2, | |
| l=2, w=2, ) | |
| ) | |
| self.all_fixtures['service'].append( | |
| RectFixture('blocked_area', | |
| x=0.5, | |
| y=self.size_y / 2, | |
| l=self.size_y, w=0.5, orientation='vertical') | |
| ) | |
| self.all_fixtures['service'].append(RectFixture('blocked_area', | |
| x=self.size_x - 0.5, | |
| y=self.size_y / 2, | |
| l=self.size_y, w=0.5, orientation='vertical') | |
| ) | |
| self.place_fixtures() | |
| self.place_inactive_wall_shelvings() | |
| self.place_active_wall_shelvings() | |
| res = self.place_shelvings() | |
| if not res: | |
| return None | |
| return self.rect_fixture2dict(self.all_fixtures) | |
| class TensorFieldVerticallLayout(TensorFieldLayout): | |
| def compose_tensor_field(self, decay): | |
| tf = tfield.TensorField(self.size_x, self.size_y, decay=decay) # TODO: redo | |
| tf.add_line([[0, 0], [0, self.size_y]], sample_step=0.5) | |
| tf.add_line([[self.size_x, 0], [self.size_x, self.size_y]], sample_step=0.5) | |
| return tf | |
| LAYOUT_CONTINUOUS_TO_CLS = { | |
| LayoutContGenType.PROCEDURAL_TENSOR_FIELD: TensorFieldLayout, | |
| LayoutContGenType.PROCEDURAL_TENSOR_FIELD_HORIZONTAL: TensorFieldHorisontalLayout, | |
| LayoutContGenType.PROCEDURAL_TENSOR_FIELD_VERTICAL: TensorFieldVerticallLayout | |
| } | |