Spaces:
Sleeping
Sleeping
| # src/layout_generator/planner.py | |
| """ | |
| Шаг пайплайна для математического расчета планограммы. | |
| Парсит граф от LLM и подбирает реальные 3D-модели для абстрактных категорий, | |
| опираясь на иерархию продуктов (products_hierarchy) из конфигурации проекта. | |
| """ | |
| import logging | |
| import random | |
| from typing import Any, Dict, List | |
| from omegaconf import DictConfig | |
| from dsynth.scene_gen.utils import RectFixture | |
| from .base import BaseStep, LayoutContext | |
| logger: logging.Logger = logging.getLogger(__name__) | |
| class CalculatePlanogramStep(BaseStep): | |
| """Синтаксический анализатор графа и калькулятор умной выкладки.""" | |
| def _build_catalog_from_config(self, cfg: DictConfig) -> Dict[str, List[str]]: | |
| """ | |
| Собирает плоский словарь категорий и их SKU из конфига products_hierarchy. | |
| Формирует полные составные ключи, совпадающие с теми, что лежат в кэше ОЗУ. | |
| """ | |
| catalog: Dict[str, List[str]] = {} | |
| if not cfg: | |
| return catalog | |
| assets_cfg = cfg.get("assets", cfg.get("asset_config", {})) | |
| hierarchy = assets_cfg.get("products_hierarchy", {}) | |
| for group_name, group_data in hierarchy.items(): | |
| for category_name, category_items in group_data.items(): | |
| if category_items: | |
| catalog[category_name] = [ | |
| f"{group_name}.{category_name}.{item_name}" | |
| for item_name in category_items.keys() | |
| ] | |
| return catalog | |
| def process(self, context: LayoutContext) -> LayoutContext: | |
| """ | |
| Выполняет расчет планограммы, динамически сопоставляя узлы графа | |
| с доступными 3D-моделями в ОЗУ. Учитывает физические габариты товаров. | |
| """ | |
| logger.info("🧮 Шаг 2: Анализ графа и подбор 3D-моделей по конфигу products_hierarchy...") | |
| sku_catalog: Dict[str, List[str]] = self._build_catalog_from_config(context.cfg) | |
| available_assets_in_ram = set(context.product_assets_lib.keys()) | |
| if not context.layout_graph or "nodes" not in context.layout_graph: | |
| logger.warning("⚠️ Граф от LLM не найден. Генерируем базовый граф-заглушку (4 объекта)...") | |
| fallback_category: str = "BEER" | |
| for cat, assets in sku_catalog.items(): | |
| if any(a in available_assets_in_ram for a in assets): | |
| fallback_category = cat | |
| break | |
| context.layout_graph = { | |
| "nodes": [ | |
| {"id": "fallback_fridge", "type": "large_showcase_fake", "items": {fallback_category: 5}}, | |
| {"id": "fallback_shelf_metal", "type": "shelf_metal", "items": {fallback_category: 10}}, | |
| {"id": "fallback_shelf_wood", "type": "shelf_low_res", "items": {fallback_category: 10}}, | |
| {"id": "fallback_island", "type": "small_shelf_two_sided", "items": {fallback_category: 15}} | |
| ], | |
| "edges": [ | |
| {"source": "fallback_fridge", "target": "fallback_shelf_metal", "relation": "next_to"}, | |
| {"source": "fallback_shelf_metal", "target": "fallback_shelf_wood", "relation": "next_to"}, | |
| {"source": "fallback_shelf_wood", "target": "fallback_island", "relation": "opposite"} | |
| ] | |
| } | |
| stats = {k: len(v) for k, v in sku_catalog.items()} | |
| logger.info(f"📦 Индекс товаров из конфига: {stats}") | |
| graph: Dict[str, Any] = context.layout_graph | |
| nodes_dict: Dict[str, RectFixture] = {} | |
| product_filling: Dict[str, Any] = {} | |
| fixed_zones: Dict[str, Any] = {"zone_0": {}} | |
| for node in graph.get("nodes", []): | |
| node_id: str = node["id"] | |
| fixture_type: str = node["type"] | |
| items: Dict[str, int] = node.get("items", {}) | |
| asset_obj = context.product_assets_lib.get(fixture_type) | |
| if asset_obj is None: | |
| asset_obj = context.product_assets_lib.get(f"fixtures.{fixture_type}") | |
| if asset_obj: | |
| try: | |
| l, w = asset_obj.trimesh_scene.extents[0], asset_obj.trimesh_scene.extents[1] | |
| except Exception: | |
| l, w = 1.55, 0.6 | |
| else: | |
| logger.warning(f"⚠️ Оборудование '{fixture_type}' не найдено в ОЗУ. Используем дефолтные габариты.") | |
| l, w = 1.55, 0.6 | |
| rect = RectFixture( | |
| name=f"zone_0.{node_id}", x=0, y=0, l=l, w=w, | |
| occupancy_width=0.5, asset_name=fixture_type | |
| ) | |
| nodes_dict[node_id] = rect | |
| MAX_BOARDS: int = 5 | |
| GAP: float = 0.05 | |
| boards_space: List[float] = [l * 0.9 for _ in range(MAX_BOARDS)] | |
| board_items: List[List[str]] = [[] for _ in range(MAX_BOARDS)] | |
| # Переменные для динамического вычисления шага сетки | |
| max_item_len: float = 0.1 | |
| max_item_depth: float = 0.1 | |
| for category, count in items.items(): | |
| mapped_assets: List[str] = sku_catalog.get(category, []) | |
| valid_assets: List[str] = [a for a in mapped_assets if a in available_assets_in_ram] | |
| if not valid_assets: | |
| logger.warning(f"⚠️ Для категории '{category}' нет загруженных моделей! Полка останется пустой.") | |
| continue | |
| asset_dims: Dict[str, float] = {} | |
| for a in valid_assets: | |
| try: | |
| obj = context.product_assets_lib[a] | |
| asset_dims[a] = obj.trimesh_scene.extents[0] | |
| except Exception: | |
| asset_dims[a] = 0.2 | |
| placed_count: int = 0 | |
| while placed_count < count: | |
| max_available_space: float = max(boards_space) | |
| fit_assets: List[str] = [a for a in valid_assets if asset_dims[a] + GAP <= max_available_space] | |
| if not fit_assets: | |
| break | |
| chosen_asset: str = random.choice(fit_assets) | |
| chosen_len: float = asset_dims[chosen_asset] | |
| for b_idx in range(MAX_BOARDS): | |
| if boards_space[b_idx] >= chosen_len + GAP: | |
| board_items[b_idx].append(chosen_asset) | |
| boards_space[b_idx] -= (chosen_len + GAP) | |
| # Динамически обновляем максимальные габариты для этой конкретной полки | |
| max_item_len = max(max_item_len, chosen_len) | |
| try: | |
| obj_depth = context.product_assets_lib[chosen_asset].trimesh_scene.extents[1] | |
| max_item_depth = max(max_item_depth, obj_depth) | |
| except Exception: | |
| pass | |
| break | |
| placed_count += 1 | |
| chunked_items: List[List[str]] = [board for board in board_items if board] | |
| product_filling[f"zone_0.{node_id}"] = chunked_items | |
| # Динамическая конфигурация сетки для физического движка | |
| class DynamicShelfCfg: | |
| shelf_asset = fixture_type | |
| x_gap, y_gap = GAP, GAP | |
| # Шаг выставляется точно по ширине самого большого товара на этой полке | |
| delta_x = max_item_len + GAP | |
| delta_y = max_item_depth + GAP | |
| start_point_x, start_point_y = 0.0, 0.0 | |
| filling_type = "random" | |
| noise_std_x, noise_std_y = 0.01, 0.01 | |
| rotation_lower, rotation_upper = -5, 5 | |
| fixed_zones["zone_0"][node_id] = DynamicShelfCfg() | |
| context.nodes_dict = nodes_dict | |
| context.product_filling = product_filling | |
| context.fixed_zones = fixed_zones | |
| total_items: int = sum([len(chunk) for chunks in product_filling.values() for chunk in chunks]) | |
| context.report_matrix = {"total_items_placed": total_items} | |
| context.total_shelves = len(nodes_dict) | |
| logger.info(f"✅ Планограмма готова: {len(nodes_dict)} полок, {total_items} товаров.") | |
| return context |