Spaces:
Sleeping
Sleeping
| from itertools import cycle | |
| import re | |
| import numpy as np | |
| import random | |
| from scene_synthesizer.utils import PositionIterator2D | |
| from shapely.geometry import Point | |
| from dsynth.assets.ss_assets import WIDTH, DEPTH | |
| from dataclasses import dataclass | |
| import numpy as np | |
| from dsynth.assets.asset import Asset | |
| class PositionIteratorPI(PositionIterator2D): | |
| def __init__( | |
| self, | |
| step_x, | |
| step_y, | |
| noise_std_x=0.0, | |
| noise_std_y=0.0, | |
| direction="x", | |
| stop_on_new_line=False, | |
| seed=None, | |
| shelf_width=WIDTH, | |
| shelf_depth=DEPTH | |
| ): | |
| super().__init__(seed=seed) | |
| self.step = np.array([step_x, step_y]) | |
| self.noise_std_x = noise_std_x | |
| self.noise_std_y = noise_std_y | |
| self.direction = direction | |
| self.new_line = False | |
| self.stop_on_new_line = stop_on_new_line | |
| # if self.direction | |
| # raise ValueError(f"Unknown direction: {self.direction}") | |
| self.start_point = None | |
| self.end_point = None | |
| self.i = 0 | |
| self.j = 0 | |
| self.lst_of_pos = [(1.45 * shelf_width / 4, shelf_depth / 3), | |
| (1.45 * shelf_width / 4, 2 * shelf_depth / 3), | |
| (shelf_width / 2, 2 * shelf_depth / 3), | |
| (3 * shelf_width / 4 - 0.45 * shelf_width / 4, 2 * shelf_depth / 3), | |
| (3 * shelf_width / 4 - 0.45 * shelf_width / 4, shelf_depth / 3)] | |
| self.counter = 0 | |
| def __next__(self): | |
| while True: | |
| if self.stop_on_new_line and self.new_line: | |
| self.new_line = False | |
| raise StopIteration | |
| current_point = self.lst_of_pos[self.counter] | |
| self.counter += 1 | |
| p = Point(current_point) | |
| if np.all(current_point > self.end_point): | |
| break | |
| if p.within(self.polygon): | |
| return np.array([p.x, p.y]) | |
| raise StopIteration | |
| def __call__(self, support): | |
| if support.polygon != self.polygon: | |
| self.polygon = support.polygon | |
| minx, miny, maxx, maxy = self.polygon.bounds | |
| self.start_point = np.array([minx, miny]) | |
| self.end_point = np.array([maxx, maxy]) | |
| self.i = 0 | |
| self.j = 0 | |
| self.new_line = False | |
| return self | |
| def flatten_dict(d, sep: str = None, parent_key: str = ''): | |
| items = {} | |
| for k, v in d.items(): | |
| if sep is None: | |
| new_key = (*parent_key, k) if parent_key else (k,) | |
| else: | |
| new_key = parent_key + sep + k if parent_key else k | |
| if isinstance(v, dict) and len(v) > 0: | |
| items.update(flatten_dict(v, parent_key=new_key, sep=sep).items()) | |
| else: | |
| items[new_key] = v | |
| return items | |
| def get_needed_names(regexp, all_products): | |
| return list(filter(lambda x: re.match(regexp, x), all_products)) | |
| class ProductnameIterator: | |
| def __init__(self, queries, all_product_names, shuffle=True, rng=random.Random(42)): | |
| self.queries = queries | |
| products = [] | |
| for query in self.queries: | |
| products.extend(get_needed_names(rf'products_hierarchy.{query}', all_product_names)) | |
| if shuffle: | |
| rng.shuffle(products) | |
| self.products_iterator = iter(products) | |
| def __iter__(self): | |
| return self | |
| def __next__(self,): | |
| return next(self.products_iterator) | |
| class ProductnameIteratorInfinite(ProductnameIterator): | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| self.products_iterator = cycle(self.products_iterator) | |
| class PositionIteratorGridColumns(PositionIterator2D): | |
| def __init__( | |
| self, | |
| obj_width, | |
| obj_depth, | |
| x_gap, | |
| y_gap, | |
| delta_x, | |
| delta_y, | |
| current_point, | |
| num_cols, | |
| seed, | |
| noise_std_x=0.0, | |
| noise_std_y=0.0, | |
| ): | |
| super().__init__(seed) | |
| self.obj_w = obj_width | |
| self.obj_d = obj_depth | |
| self.x_gap = x_gap | |
| self.y_gap = y_gap | |
| self.delta_x = delta_x | |
| self.delta_y = delta_y | |
| self.start_point = None | |
| self.end_point = None | |
| self.current_point = current_point | |
| self.num_cols = num_cols | |
| self.stop_iter = False | |
| self.cur_col = 0 | |
| self.cur_row = 0 | |
| self.noise_std_x = noise_std_x | |
| self.noise_std_y = noise_std_y | |
| def __next__(self): | |
| while not self.stop_iter: | |
| if self.num_cols <= 0: | |
| self.current_point[0] -= self.obj_w/2 | |
| self.current_point[1] -= self.obj_d/2 | |
| self.stop_iter = True | |
| break | |
| if self.current_point[0] + self.obj_w/2 < self.end_point[0]: | |
| x = self.current_point[0] | |
| y = self.current_point[1] | |
| self.current_point[1] += self.obj_d + self.y_gap | |
| self.cur_row += 1 | |
| if self.current_point[1] + self.obj_d/2 >= self.end_point[1]: | |
| self.current_point[0] += self.obj_w + self.x_gap | |
| self.current_point[1] = self.start_point[1] + self.obj_d/2 | |
| self.num_cols -= 1 | |
| self.cur_col += 1 | |
| self.cur_row = 0 | |
| if self.noise_std_x > 0 or self.noise_std_y > 0: | |
| p = self.rng.normal([x, y], [self.noise_std_x, self.noise_std_y]) | |
| else: | |
| p = np.array([x, y]) | |
| return p | |
| elif self.current_point[0] + self.obj_w/2 >= self.end_point[0]: | |
| self.current_point[0] -= self.obj_w/2 | |
| self.current_point[1] -= self.obj_d/2 | |
| self.stop_iter = True | |
| break | |
| raise StopIteration | |
| def __call__(self, support): | |
| if support.polygon != self.polygon: | |
| self.polygon = support.polygon | |
| minx, miny, maxx, maxy = self.polygon.bounds | |
| self.start_point = np.array([minx, miny]) | |
| self.end_point = np.array([maxx, maxy]) | |
| if self.current_point[0] == -1: | |
| self.current_point[0] = minx + self.delta_x + self.obj_w/2 | |
| else: | |
| self.current_point[0] += self.obj_w/2 | |
| if self.current_point[1] == -1: | |
| self.current_point[1] = miny + self.delta_y + self.obj_d/2 | |
| else: | |
| self.current_point[1] += self.obj_d/2 | |
| return self | |
| def update(self, *args, **kwargs): | |
| pass | |
| def object_id_generator(base_name, pos_generator: PositionIteratorGridColumns): | |
| while True: | |
| yield f"{base_name}{pos_generator.cur_col}:{pos_generator.cur_row}" | |
| def is_valid_cell(x, y, N, M): | |
| if x < 0 or y < 0 or x >= N or y >= M: | |
| return False | |
| return True | |
| def find_paths_util(maze, source, destination, visited, path, paths): | |
| """Find paths using Breadth First Search algorith """ | |
| # Done if destination is found | |
| if source == destination: | |
| paths.append(path[:]) # append copy of current path | |
| return paths | |
| # mark current cell as visited | |
| N = len(maze) | |
| M = len(maze[0]) | |
| x, y = source | |
| visited[x][y] = True | |
| # if current cell is a valid and open cell, | |
| if is_valid_cell(x, y, N, M) and maze[x][y] == 0: | |
| # Using Breadth First Search on path extension in all direction | |
| # go right (x, y) --> (x + 1, y) | |
| if x + 1 < N and (not visited[x + 1][y]): | |
| path.append((x + 1, y)) | |
| find_paths_util(maze,(x + 1, y), destination, visited, path, paths) | |
| path.pop() | |
| # go left (x, y) --> (x - 1, y) | |
| if x - 1 >= 0 and (not visited[x - 1][y]): | |
| path.append((x - 1, y)) | |
| find_paths_util(maze, (x - 1, y), destination, visited, path, paths) | |
| path.pop() | |
| # go up (x, y) --> (x, y + 1) | |
| if y + 1 < M and (not visited[x][y + 1]): | |
| path.append((x, y + 1)) | |
| find_paths_util(maze, (x, y + 1), destination, visited, path, paths) | |
| path.pop() | |
| # go down (x, y) --> (x, y - 1) | |
| if y - 1 >= 0 and (not visited[x][y - 1]): | |
| path.append((x, y - 1)) | |
| find_paths_util(maze, (x, y - 1), destination, visited, path, paths) | |
| path.pop() | |
| # Unmark current cell as visited | |
| visited[x][y] = False | |
| return paths | |
| def find_paths(maze, source, destination): | |
| """ Sets up and searches for paths""" | |
| N = len(maze) # size of Maze is N x N | |
| M = len(maze[0]) | |
| # 2D matrix to keep track of cells involved in current path | |
| visited = [[False]*M for _ in range(N)] | |
| path = [source] | |
| paths = [] | |
| paths = find_paths_util(maze, source, destination, visited, path, paths) | |
| return paths | |
| class RectFixture: | |
| name: str = None | |
| x: float = 0 | |
| y: float = 0 | |
| l: float = 1.55 | |
| w: float = 0.6 | |
| orientation: str = 'horizontal' | |
| occupancy_width: float = 0.0 | |
| asset_name: str = None | |
| # horizontal: y+ y- | |
| # vertical: x+ x- | |
| def __post_init__(self): | |
| if not self.orientation in ['horizontal', 'vertical']: | |
| raise RuntimeError(f"Wrong orientation: {self.orientation}") | |
| def make_from_asset(cls, asset: Asset, | |
| name=None, x=0, y=0, | |
| orientation='horizontal', | |
| occupancy_width=0.2, | |
| asset_name=None): | |
| extents = asset.trimesh_scene.extents | |
| return cls(name, x, y, l = extents[0], w=extents[1], orientation=orientation, | |
| occupancy_width=occupancy_width, asset_name=asset_name) | |
| def get_polygon(self): | |
| if self.orientation == 'horizontal': | |
| polygon = [ | |
| [self.x - self.l / 2, self.y - self.w / 2], | |
| [self.x + self.l / 2, self.y - self.w / 2], | |
| [self.x + self.l / 2, self.y + self.w / 2], | |
| [self.x - self.l / 2, self.y + self.w / 2], | |
| ] | |
| occupancy_polygon = [ | |
| [polygon[0][0], polygon[0][1] - self.occupancy_width], | |
| [polygon[1][0], polygon[1][1] - self.occupancy_width], | |
| [polygon[2][0], polygon[2][1] + self.occupancy_width], | |
| [polygon[3][0], polygon[3][1] + self.occupancy_width] | |
| ] | |
| elif self.orientation == 'vertical': | |
| polygon = [ | |
| [self.x - self.w / 2, self.y - self.l / 2], | |
| [self.x + self.w / 2, self.y - self.l / 2], | |
| [self.x + self.w / 2, self.y + self.l / 2], | |
| [self.x - self.w / 2, self.y + self.l / 2], | |
| ] | |
| occupancy_polygon = [ | |
| [polygon[0][0] - self.occupancy_width, polygon[0][1]], | |
| [polygon[1][0] + self.occupancy_width, polygon[1][1]], | |
| [polygon[2][0] + self.occupancy_width, polygon[2][1]], | |
| [polygon[3][0] - self.occupancy_width, polygon[3][1]] | |
| ] | |
| else: | |
| raise RuntimeError("Wrong orientation") | |
| return np.array(polygon), np.array(occupancy_polygon) | |
| def is_valid(self, size_x, size_y): | |
| polygon, occupancy_polygon = self.get_polygon() | |
| if np.any(occupancy_polygon[:, 0] > size_x) or np.any(occupancy_polygon[:, 0] < 0) or \ | |
| np.any(occupancy_polygon[:, 1] > size_y) or np.any(occupancy_polygon[:, 1] < 0): | |
| return False | |
| return True | |
| def draw(self, axes, show_occupancy=True, facecolor='skyblue', | |
| edgecolor='blue', linewidth=2 | |
| ): | |
| polygon, occupancy_polygon = self.get_polygon() | |
| if show_occupancy: | |
| axes.fill(occupancy_polygon[:, 0], occupancy_polygon[:, 1], facecolor='gray', edgecolor='black', linewidth=linewidth) | |
| axes.fill(polygon[:, 0], polygon[:, 1], facecolor=facecolor, edgecolor=edgecolor, linewidth=linewidth) | |
| def check_overlap(l1, r1, l2, r2): | |
| if r2[0] < l1[0] or r1[0] < l2[0]: | |
| return False | |
| if r2[1] < l1[1] or r1[1] < l2[1]: | |
| return False | |
| return True | |
| def check_shelfs_overlap(s1: RectFixture, s2: RectFixture): | |
| poly1, occup1 = s1.get_polygon() | |
| poly2, occup2 = s2.get_polygon() | |
| if check_overlap(poly1[0], poly1[2], occup2[0], occup2[2]): | |
| return True | |
| if check_overlap(occup1[0], occup1[2], poly2[0], poly2[2]): | |
| return True | |
| if s1.orientation != s2.orientation: # !!! | |
| if check_overlap(occup1[0], occup1[2], occup2[0], occup2[2]): | |
| return True | |
| return False | |
| def check_collisions(new_shelf: RectFixture, shelves_list: list): | |
| for shelf in shelves_list: | |
| if check_shelfs_overlap(new_shelf, shelf): | |
| return True | |
| return False | |