layout-viewer / src /dsynth /scene_gen /layouts /layout_generator.py
Spyspook's picture
initial commit
ce82348 verified
import logging
log = logging.getLogger(__name__)
import random
from abc import ABC, abstractmethod
from typing import Tuple, Optional, Dict
from omegaconf import DictConfig, OmegaConf
from dataclasses import dataclass, asdict
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
from dsynth.scene_gen.layouts.random_connectivity import add_many_zones, get_orientation
from dsynth.scene_gen.hydra_configs import LayoutGenType, LayoutContGenType
from dsynth.scene_gen.utils import RectFixture, check_collisions, flatten_dict
import dsynth.scene_gen.layouts.tensor_field as tfield
from dsynth.assets.asset import load_assets_lib
class LayoutGeneratorBase(ABC):
def __init__(self,
sizes_nm: Tuple[int],
start_coords: Tuple[int] = (0, 0),
rng: random.Random = random.Random(42),
max_tries = 5,
):
assert len(sizes_nm) == 2
assert len(start_coords) == 2
self.sizes_nm = sizes_nm
self.start_coords = start_coords
self.rng = rng
self.max_tries = max_tries
@abstractmethod
def __call__(self, *args, **kwargs):
pass
class RandomConnectedZones(LayoutGeneratorBase):
def __call__(self, *args, zones_dict: Dict = dict(), **kwargs):
n, m = self.sizes_nm
x, y = self.start_coords
for n_try in range(self.max_tries):
mat = [[0] * m for _ in range(n)]
if n_try > 0:
log.error(f"Can't generate! Retry...[{n_try + 1}/{self.max_tries}]")
is_gen, room = add_many_zones((x, y), mat, zones_dict, self.rng)
if is_gen:
break
if not is_gen:
log.error(f"Can't generate! Layout generation failed!")
return None
rotations = get_orientation((x, y), room)
return {
"darkstore": room,
"rotations": rotations
}
class FixedLayout(LayoutGeneratorBase):
def __call__(self, *args, zones_dict: Dict = dict(), darkstore_arrangement_cfg: Dict = dict(), **kwargs):
assert 'layout' in darkstore_arrangement_cfg
assert 'rotations' in darkstore_arrangement_cfg
n, m = self.sizes_nm
darkstore = [[0] * m for _ in range(n)]
rotations = [[0] * m for _ in range(n)]
darkstore_dict = OmegaConf.to_container(darkstore_arrangement_cfg['layout'], resolve = True)
rotations_dict = OmegaConf.to_container(darkstore_arrangement_cfg['rotations'], resolve = True)
assert n == len(darkstore_dict.keys())
assert n == len(rotations_dict.keys())
# convert to list of lists
for i in darkstore_dict.keys():
assert len(darkstore_dict[i]) == m
assert len(rotations_dict[i]) == m
darkstore[i] = darkstore_dict[i]
rotations[i] = rotations_dict[i]
return {
"darkstore": darkstore,
"rotations": rotations
}
LAYOUT_TYPES_TO_CLS = {
LayoutGenType.CONNECTED_ZONES: RandomConnectedZones,
LayoutGenType.DEFAULT: None,
LayoutGenType.FIXED_LAYOUT: FixedLayout,
}
class TensorFieldLayout(LayoutGeneratorBase):
def __init__(self,
name,
cfg,
rng: random.Random,
):
self.cfg = cfg
self.rng = rng
self.name = name
self.product_assets_lib = flatten_dict(load_assets_lib(cfg.assets, disable_caching=True), sep='.')
self.size_x = cfg.ds_continuous.size_x
self.size_y = cfg.ds_continuous.size_y
self.max_tries = cfg.ds_continuous.max_tries
self.all_fixtures = []
self.inactive_wall_shelvings_occupancy_width = cfg.ds_continuous.inactive_wall_shelvings_occupancy_width
self.inactive_shelvings_occupancy_width = cfg.ds_continuous.inactive_shelvings_occupancy_width
self.inactive_shelvings_skip_prob = cfg.ds_continuous.inactive_shelvings_skip_prob
self.passage_width = cfg.ds_continuous.passage_width
self.fixtures_occupancy_width = cfg.ds_continuous.fixtures_occupancy_width
def _all_fixtures_list(self):
all_fixtures = []
for key, val in self.all_fixtures.items():
all_fixtures.extend(val)
return all_fixtures
def __call__(self, *args, **kwargs):
self.all_fixtures = {
"service": [],
"scene_fixtures": [],
"inactive_wall_shelvings": [],
"active_wall_shelvings": [],
"inactive_shelvings": [],
"active_shelvings": []
}
self.all_fixtures['service'].append(
RectFixture('blocked_area',
x=self.size_x,
y=self.size_y - 2,
l=2, w=2, )
)
self.place_fixtures()
# self.place_inactive_wall_shelvings()
res = self.place_wall_shelvings()
if not res:
return None
res = self.place_shelvings()
if not res:
return None
return self.rect_fixture2dict(self.all_fixtures)
def rect_fixture2dict(self, rect_fixture_dict: list):
return {key: [asdict(r) for r in rect_fixture_list] for key, rect_fixture_list in rect_fixture_dict.items()}
def place_fixtures(self):
scene_fixtures_list = self.cfg.ds_continuous.scene_fixtures_list
for asset_name in scene_fixtures_list:
rect = RectFixture.make_from_asset(self.product_assets_lib[asset_name], name=f'scene_fixture:{asset_name}',
occupancy_width=self.fixtures_occupancy_width,
x=0., y=0., asset_name=asset_name)
success = False
for _ in range(self.max_tries):
if rect.is_valid(self.size_x, self.size_y) and not check_collisions(rect, self._all_fixtures_list()):
self.all_fixtures['scene_fixtures'].append(rect)
success = True
break
rect.x = self.rng.uniform(0.0, self.size_x)
rect.y = self.rng.uniform(0.0, self.size_y)
if not success:
log.warning('Failed to place scene fixture')
return True
def place_wall_shelvings(self):
half_perimeter = self.size_x + self.size_y
perimeter_points = np.linspace(0, half_perimeter, 40)
self.rng.shuffle(perimeter_points)
active_wall_shelvings_list = self.cfg.ds_continuous.active_wall_shelvings_list
for wall_active_shelving in active_wall_shelvings_list:
rect = RectFixture.make_from_asset(
self.product_assets_lib[wall_active_shelving.shelf_asset], name=f'{self.name}_{wall_active_shelving.name}',
occupancy_width=self.inactive_wall_shelvings_occupancy_width,
asset_name=wall_active_shelving.shelf_asset
)
success = False
for point in perimeter_points:
rect = self._place_rect_at_point(rect, point)
if rect.is_valid(self.size_x, self.size_y) and not check_collisions(rect, self._all_fixtures_list()):
success = True
self.all_fixtures['active_wall_shelvings'].append(rect)
break
if not success:
return False
inactive_wall_shelvings_list = self.cfg.ds_continuous.inactive_wall_shelvings_list
for asset_name in inactive_wall_shelvings_list:
rect = RectFixture.make_from_asset(self.product_assets_lib[asset_name], name=f'inactive_wall_shelving:{asset_name}',
occupancy_width=self.inactive_wall_shelvings_occupancy_width,
x=0., y=0., asset_name=asset_name)
success = False
for point in perimeter_points:
rect = self._place_rect_at_point(rect, point)
if rect.is_valid(self.size_x, self.size_y) and not check_collisions(rect, self._all_fixtures_list()):
success = True
self.all_fixtures['inactive_wall_shelvings'].append(rect)
break
if not success:
log.warning('Failed to place scene fixture')
return True
def _place_rect_at_point(self, rect, point):
if point <= self.size_x:
x = point
y = self.size_y
rect.orientation = 'horizontal'
elif self.size_x < point <= self.size_x + self.size_y:
x = 0
y = point - self.size_x
rect.orientation = 'vertical'
else:
raise RuntimeError
if rect.orientation == 'horizontal':
if y == self.size_y:
y -= rect.w / 2 + rect.occupancy_width + 1e-2
if rect.orientation == 'vertical':
if x == 0:
x += rect.w / 2 + rect.occupancy_width + 1e-2
rect.x = x
rect.y = y
return rect
def place_active_wall_shelvings(self):
active_wall_shelvings = self.cfg.ds_continuous.active_wall_shelvings_list
return []
def compose_tensor_field(self, decay):
tf = tfield.TensorField(self.size_x, self.size_y, decay=decay)
tf.add_boundary()
tf.add_fixture_list(self._all_fixtures_list())
return tf
def place_shelvings(self):
inactive_shelvings_list = self.cfg.ds_continuous.inactive_shelvings_list
assert len(self.cfg.ds_continuous.active_shelvings_list) <= 1
if len(self.cfg.ds_continuous.active_shelvings_list) > 0:
active_fixture = self.cfg.ds_continuous.active_shelvings_list[0]
if active_fixture['shelf_asset'] not in inactive_shelvings_list:
inactive_shelvings_list.append(active_fixture['shelf_asset'])
else:
active_fixture = None
sample_rects = []
for asset_name in inactive_shelvings_list:
rect = RectFixture.make_from_asset(self.product_assets_lib[asset_name], name=f'inactive_shelving:{asset_name}',
occupancy_width=self.inactive_shelvings_occupancy_width,
x=0., y=0., asset_name=asset_name)
sample_rects.append(rect)
success = False
for _ in range(self.max_tries):
self.rng.shuffle(sample_rects)
tf = self.compose_tensor_field(decay = self.cfg.ds_continuous.tf_blending_decay)
inactive_shelvings = tfield.place_shelves(tf,
sample_rects,
self.rng,
passage_width=self.passage_width,
skip_shelf_prob=self.inactive_shelvings_skip_prob,
scene_fixtures=self._all_fixtures_list()
)
if active_fixture is None:
success = True
break
elif active_fixture['shelf_asset'] in [r.asset_name for r in inactive_shelvings]:
success = True
break
if not success:
return False
self.all_fixtures['inactive_shelvings'] = inactive_shelvings
fig, ax = tf.vis_field()
for fixture in self._all_fixtures_list():
fixture.draw(ax[1], show_occupancy=False)
fig.savefig(Path(self.cfg.ds_continuous.output_dir) / f'{self.name}_tf.jpg')
if active_fixture is not None:
self.all_fixtures['active_shelvings'].append(RectFixture.make_from_asset(
self.product_assets_lib[active_fixture.shelf_asset], name=f'{self.name}_{active_fixture.name}',
asset_name=active_fixture.shelf_asset
))
return True
class TensorFieldHorisontalLayout(TensorFieldLayout):
def compose_tensor_field(self, decay):
tf = tfield.TensorField(self.size_x, self.size_y, decay=decay)
tf.add_line([[0, 0], [self.size_x, 0]], sample_step=0.5)
tf.add_line([[0, self.size_y], [self.size_x, self.size_y]], sample_step=0.5)
return tf
def __call__(self, *args, **kwargs):
self.all_fixtures = {
"service": [],
"scene_fixtures": [],
"inactive_wall_shelvings": [],
"active_wall_shelvings": [],
"inactive_shelvings": [],
"active_shelvings": []
}
self.all_fixtures['service'].append(
RectFixture('blocked_area',
x=self.size_x,
y=self.size_y - 2,
l=2, w=2, )
)
self.all_fixtures['service'].append(
RectFixture('blocked_area',
x=0.5,
y=self.size_y / 2,
l=self.size_y, w=0.5, orientation='vertical')
)
self.all_fixtures['service'].append(RectFixture('blocked_area',
x=self.size_x - 0.5,
y=self.size_y / 2,
l=self.size_y, w=0.5, orientation='vertical')
)
self.place_fixtures()
self.place_inactive_wall_shelvings()
self.place_active_wall_shelvings()
res = self.place_shelvings()
if not res:
return None
return self.rect_fixture2dict(self.all_fixtures)
class TensorFieldVerticallLayout(TensorFieldLayout):
def compose_tensor_field(self, decay):
tf = tfield.TensorField(self.size_x, self.size_y, decay=decay) # TODO: redo
tf.add_line([[0, 0], [0, self.size_y]], sample_step=0.5)
tf.add_line([[self.size_x, 0], [self.size_x, self.size_y]], sample_step=0.5)
return tf
LAYOUT_CONTINUOUS_TO_CLS = {
LayoutContGenType.PROCEDURAL_TENSOR_FIELD: TensorFieldLayout,
LayoutContGenType.PROCEDURAL_TENSOR_FIELD_HORIZONTAL: TensorFieldHorisontalLayout,
LayoutContGenType.PROCEDURAL_TENSOR_FIELD_VERTICAL: TensorFieldVerticallLayout
}