Spaces:
Paused
Paused
| import logging | |
| log = logging.getLogger(__name__) | |
| import os | |
| import random | |
| from functools import partial | |
| import traceback | |
| from multiprocessing import Pool | |
| import json | |
| from pathlib import Path | |
| import hashlib | |
| from tqdm import tqdm | |
| from typing import Optional, Union, BinaryIO, IO, Dict, Tuple | |
| import numpy as np | |
| import scene_synthesizer as synth | |
| from omegaconf import DictConfig | |
| from omegaconf import OmegaConf | |
| from dsynth.scene_gen.layouts.layout_generator import LayoutGeneratorBase | |
| from dsynth.scene_gen.hydra_configs import ShelfConfig, FillingType | |
| from dsynth.scene_gen.arrangements import shelf_placement_v2 | |
| from dsynth.scene_gen.utils import flatten_dict, ProductnameIteratorInfinite, ProductnameIterator | |
| from dsynth.scene_gen.layouts.layout_generator import LAYOUT_CONTINUOUS_TO_CLS | |
| from dsynth.scene_gen.hydra_configs import DsContinuousConfig, ShelfConfig | |
| from dsynth.assets.asset import load_assets_lib | |
| from dsynth.assets.ss_assets import DefaultShelf | |
| from dsynth.scene_gen.arrangements import set_shelf, add_objects_to_shelf_v2 | |
| import os | |
| os.environ['PYTHONHASHSEED'] = '42' | |
| class SceneGenerator: | |
| def __init__(self, | |
| layout_generator: LayoutGeneratorBase, | |
| product_assets_lib: Dict, | |
| darkstore_arrangement_cfg: DictConfig, | |
| num_scenes: int, | |
| num_workers: int = 1, | |
| output_dir: Optional[Union[str, os.PathLike, BinaryIO, IO[bytes]]] = None, | |
| randomize_layout: bool = False, | |
| randomize_arrangements: bool = True, | |
| random_seed: int = 42, | |
| show: bool = False | |
| ): | |
| self.num_workers = num_workers | |
| config_hash = hashlib.sha1(OmegaConf.to_yaml(darkstore_arrangement_cfg).encode()).hexdigest()[-8:] | |
| seeds_layout = [random_seed] * num_scenes | |
| if randomize_layout: | |
| seeds_layout = np.arange(num_scenes) + random_seed | |
| seeds_arrangements = [random_seed] * num_scenes | |
| if randomize_arrangements: | |
| seeds_arrangements = np.arange(num_scenes) + random_seed | |
| self.task_params = [] | |
| for n, (seed_layout, seed_arr) in enumerate(zip(seeds_layout, seeds_arrangements)): | |
| self.task_params.append({ | |
| 'layout_gen_params': {}, | |
| 'seed_layout': seed_layout, | |
| 'seed_arrangement': seed_arr, | |
| 'output_name': f'scene_config_{config_hash}_{n}.json' | |
| }) | |
| self.generate_routine = partial( | |
| _generate_routine, | |
| layout_generator = layout_generator, | |
| product_assets_lib = product_assets_lib, | |
| darkstore_arrangement_cfg = darkstore_arrangement_cfg, | |
| output_dir = output_dir, | |
| show = show | |
| ) | |
| def generate(self): | |
| if self.num_workers == 1: | |
| results = [] | |
| for task_param in tqdm(self.task_params): | |
| results.append(self.generate_routine(task_param)) | |
| # results = list(map(self.generate_routine, self.task_params)) | |
| else: | |
| with Pool(self.num_workers) as p: | |
| total_samples = len(self.task_params) | |
| results = list(tqdm(p.imap(self.generate_routine, self.task_params), total=total_samples)) | |
| return results | |
| class SceneGeneratorContinuous: | |
| def __init__(self, cfg, output_dir): | |
| self.cfg = cfg | |
| self.output_dir = Path(output_dir) | |
| if not self.output_dir.exists(): | |
| self.output_dir.mkdir(parents=True) | |
| random_seed = cfg.ds_continuous.random_seed | |
| num_scenes = cfg.ds_continuous.num_scenes | |
| self.num_workers = cfg.ds_continuous.num_workers | |
| seeds_layout = [random_seed] * num_scenes | |
| if cfg.ds_continuous.randomize_layout: | |
| seeds_layout = np.arange(num_scenes) + random_seed | |
| seeds_arrangements = [random_seed] * num_scenes | |
| if cfg.ds_continuous.randomize_arrangements: | |
| seeds_arrangements = np.arange(num_scenes) + random_seed | |
| with open(self.output_dir / f"input_config.yaml", "w") as f: | |
| f.write(OmegaConf.to_yaml(cfg)) | |
| self.task_params = [] | |
| for n, (seed_layout, seed_arr) in enumerate(zip(seeds_layout, seeds_arrangements)): | |
| self.task_params.append({ | |
| 'scene_id': n, | |
| 'cfg': cfg, | |
| 'output_dir': self.output_dir, | |
| 'seed_layout_gen': seed_layout, | |
| 'seed_arrangement_gen': seed_arr | |
| }) | |
| def generate(self): | |
| if self.num_workers == 1: | |
| results = [] | |
| for task_param in tqdm(self.task_params): | |
| results.append(_generate_continuous_routine(task_param)) | |
| else: | |
| with Pool(self.num_workers) as p: | |
| total_samples = len(self.task_params) | |
| results = list(tqdm(p.imap(_generate_continuous_routine, self.task_params), total=total_samples)) | |
| return results | |
| def _generate_continuous_routine(task_params): | |
| scene_id = task_params['scene_id'] | |
| cfg = task_params['cfg'] | |
| output_dir = task_params['output_dir'] | |
| seed_layout_gen = task_params['seed_layout_gen'] | |
| seed_arrangement_gen = task_params['seed_arrangement_gen'] | |
| config_hash = hashlib.sha1(OmegaConf.to_yaml(cfg).encode()).hexdigest()[-8:] | |
| scene_name = f'{cfg.ds_continuous.name}_{config_hash}_{scene_id}' | |
| try: | |
| layout_gen_cls = LAYOUT_CONTINUOUS_TO_CLS[cfg.ds_continuous.layout_gen_type] | |
| layout_generator = layout_gen_cls(name=scene_name, | |
| cfg=cfg, | |
| rng=random.Random(seed_layout_gen), | |
| ) | |
| layout_data = layout_generator() | |
| if layout_data is None: | |
| log.error(f"Failed to generate {scene_name}!") | |
| return False | |
| fake_arrangements_mapping = OmegaConf.to_container(cfg.ds_continuous.fake_arrangements_mapping, resolve = True) | |
| results = dict(layout_data=layout_data, | |
| size_x=cfg.ds_continuous.size_x, | |
| size_y=cfg.ds_continuous.size_y, | |
| fake_arrangements_mapping=fake_arrangements_mapping) | |
| results['hash'] = config_hash | |
| results['scene_id'] = scene_id | |
| results['scene_name'] = scene_name | |
| with open(Path(output_dir) / f'layout_data_{scene_name}.json', "w") as f: | |
| json.dump(results, f, indent=4) | |
| product_assets_lib = flatten_dict(load_assets_lib(cfg.assets, disable_caching=True), sep='.') | |
| for shelvings_list in [cfg.ds_continuous.active_shelvings_list, cfg.ds_continuous.active_wall_shelvings_list]: | |
| for active_fixture_cfg in shelvings_list: | |
| filling, shelf_name, shelf_type = product_filling_from_shelf_config(active_fixture_cfg, | |
| list(product_assets_lib.keys()), rng=random.Random(seed_arrangement_gen) | |
| ) | |
| scene = synth.Scene() | |
| shelf_name = f'{scene_name}_{active_fixture_cfg.name}' | |
| shelf_asset_name = active_fixture_cfg.shelf_asset | |
| if shelf_asset_name is None: | |
| shelf = DefaultShelf | |
| shelf_asset_name = 'fixtures.shelf' | |
| else: | |
| shelf = product_assets_lib[shelf_asset_name].ss_asset | |
| support_data = set_shelf( | |
| scene, | |
| shelf, | |
| 0, | |
| 0, | |
| 0, | |
| f'SHELF_{shelf_name}', | |
| f'support_SHELF_{shelf_name}', | |
| ) | |
| add_objects_to_shelf_v2( | |
| scene, | |
| 0, | |
| filling, | |
| product_assets_lib, | |
| support_data, | |
| active_fixture_cfg.x_gap, | |
| active_fixture_cfg.y_gap, | |
| active_fixture_cfg.delta_x, | |
| active_fixture_cfg.delta_y, | |
| active_fixture_cfg.start_point_x, | |
| active_fixture_cfg.start_point_y, | |
| active_fixture_cfg.filling_type, | |
| seed_arrangement_gen, | |
| active_fixture_cfg.noise_std_x, | |
| active_fixture_cfg.noise_std_y, | |
| active_fixture_cfg.rotation_lower, | |
| active_fixture_cfg.rotation_upper, | |
| ) | |
| json_str = synth.exchange.export.export_json(scene, include_metadata=False) | |
| data = json.loads(json_str) | |
| del data["geometry"] | |
| output_name = f'{shelf_name}.json' | |
| with open(Path(output_dir) / output_name, "w") as f: | |
| json.dump(data, f, indent=4) | |
| return True | |
| except Exception as e: | |
| log.error(f"Failed to generate {scene_name}! Unexpected error: {e}") | |
| traceback.print_exc() | |
| return False | |
| def _generate_routine( | |
| task_params: Tuple, | |
| layout_generator: LayoutGeneratorBase, | |
| product_assets_lib: Dict, | |
| darkstore_arrangement_cfg: DictConfig, | |
| output_dir: Optional[Union[str, os.PathLike, BinaryIO, IO[bytes]]] = None, | |
| show: bool = False | |
| ): | |
| layout_gen_params = task_params['layout_gen_params'] | |
| seed_layout = task_params['seed_layout'] | |
| seed_arrangement = task_params['seed_arrangement'] | |
| output_name = task_params['output_name'] | |
| product_filling, ds_names = product_filling_from_darkstore_config( | |
| darkstore_arrangement_cfg.zones, | |
| list(product_assets_lib.keys()), | |
| rng=random.Random(seed_arrangement) | |
| ) | |
| zones_dict = {key: list(val.keys()) for key, val in product_filling.items()} | |
| product_filling_flattened = flatten_dict(product_filling, sep='.') | |
| layout_generator.rng = random.Random(seed_layout) | |
| layout_data = layout_generator(**layout_gen_params, zones_dict=zones_dict, darkstore_arrangement_cfg=darkstore_arrangement_cfg) | |
| if layout_data is None: | |
| log.error(f"Can't generate {output_name}!") | |
| return False | |
| scene_meta = shelf_placement_v2( | |
| product_filling_flattened=product_filling_flattened, | |
| product_assets_lib=product_assets_lib, | |
| is_showed=show, | |
| darkstore_cfg=darkstore_arrangement_cfg, | |
| **layout_data | |
| ) | |
| scene_meta["meta"]["ds_names"] = ds_names | |
| if output_dir is not None: | |
| with open(Path(output_dir) / output_name, "w") as f: | |
| json.dump(scene_meta, f, indent=4) | |
| return True | |
| else: | |
| return scene_meta | |
| def product_filling_from_shelf_config(shelf_config: ShelfConfig, all_product_names, rng): | |
| assert 0 <= shelf_config.start_filling_board <= shelf_config.end_filling_from_board <= shelf_config.num_boards | |
| shelf_name = shelf_config.name | |
| shelf_type = shelf_config.shelf_type.name | |
| # all_product_names = ['products_hierarchy.' + name for name in all_product_names] | |
| filling = [[] for _ in range(shelf_config.start_filling_board)] | |
| if '_INFINITE' in str(shelf_config.filling_type): | |
| product_iterator = ProductnameIteratorInfinite(shelf_config.queries, all_product_names, rng=rng) | |
| else: | |
| product_iterator = ProductnameIterator(shelf_config.queries, all_product_names, rng=rng) | |
| if shelf_config.filling_type == FillingType.FULL_AUTO: | |
| product = next(product_iterator) #pick first suitable product | |
| for _ in range(shelf_config.start_filling_board, shelf_config.end_filling_from_board): | |
| filling.append([product for _ in range(shelf_config.num_products_per_board)]) | |
| elif shelf_config.filling_type in [FillingType.BOARDWISE_AUTO, FillingType.BOARDWISE_AUTO_INFINITE]: | |
| for _ in range(shelf_config.start_filling_board, shelf_config.end_filling_from_board): | |
| try: | |
| product = next(product_iterator) | |
| except StopIteration: | |
| break | |
| filling.append([product for _ in range(shelf_config.num_products_per_board)]) | |
| elif shelf_config.filling_type in [FillingType.BLOCKWISE_AUTO, FillingType.BLOCKWISE_AUTO_INFINITE]: | |
| cur_board = shelf_config.start_filling_board | |
| cur_product = next(product_iterator) | |
| left_products_to_put = shelf_config.num_products_per_block | |
| left_space_on_board = shelf_config.num_products_per_board | |
| while True: | |
| num_products = min(left_products_to_put, left_space_on_board) | |
| if len(filling) <= cur_board: | |
| filling.append([cur_product for _ in range(num_products)]) | |
| else: | |
| filling[cur_board].extend([cur_product for _ in range(num_products)]) | |
| left_space_on_board -= num_products | |
| left_products_to_put -= num_products | |
| if left_space_on_board <= 0: | |
| cur_board += 1 | |
| left_space_on_board = shelf_config.num_products_per_board | |
| if left_products_to_put <= 0: | |
| try: | |
| cur_product = next(product_iterator) | |
| except StopIteration: | |
| break | |
| left_products_to_put = shelf_config.num_products_per_block | |
| if cur_board >= shelf_config.end_filling_from_board: | |
| break | |
| elif shelf_config.filling_type == FillingType.BOARDWISE_COLUMNS: | |
| board_product_numcol = OmegaConf.to_container(shelf_config['board_product_numcol']) | |
| # filling = shelf_config['board_product_numcol'] | |
| for board_idx in range(shelf_config.start_filling_board, shelf_config.end_filling_from_board): | |
| if not board_idx in board_product_numcol: | |
| filling.append([]) | |
| continue | |
| cur_board_arrangement = list(board_product_numcol[board_idx].items()) | |
| rng.shuffle(cur_board_arrangement) | |
| filling.append([f'{key}:{val}' for key, val in cur_board_arrangement]) | |
| for _ in range(shelf_config.end_filling_from_board, shelf_config.num_boards): | |
| filling.append([]) | |
| if shelf_config.shuffle_boards: | |
| # shuffle only non-empty boards | |
| non_empty_boards_idxs = [i for i in range(len(filling)) if len(filling[i]) > 0] | |
| non_empty_boards_filling = [filling[i] for i in range(len(filling)) if len(filling[i]) > 0] | |
| rng.shuffle(non_empty_boards_filling) | |
| for i, board_filling in zip(non_empty_boards_idxs, non_empty_boards_filling): | |
| filling[i] = board_filling | |
| if shelf_config.shuffle_items_on_board: | |
| for i in range(len(filling)): | |
| rng.shuffle(filling[i]) | |
| return filling, shelf_name, shelf_type | |
| def product_filling_from_zone_config(zone_config, all_product_names, rng): | |
| filling = {} | |
| if 'name' in zone_config: | |
| zone_names = {'zone_name': zone_config['name'], 'shelf_names': {}, 'shelf_types': {}} | |
| else: | |
| zone_names = {'zone_name': 'Unnamed', 'shelf_names': {}, 'shelf_types': {}} | |
| for key, val in zone_config.items(): | |
| if key != 'name': | |
| filling[key], \ | |
| zone_names['shelf_names'][key], \ | |
| zone_names['shelf_types'][key] = product_filling_from_shelf_config(val, all_product_names, rng) | |
| return filling, zone_names | |
| def product_filling_from_darkstore_config(darkstore_config: DictConfig, all_product_names, rng): | |
| filling = {} | |
| ds_names = {} | |
| for zone_name, zone_config in darkstore_config.items(): | |
| filling[zone_name], ds_names[zone_name] = product_filling_from_zone_config(zone_config, all_product_names, rng) | |
| return filling, ds_names | |