File size: 9,711 Bytes
ce82348
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# src/layout_generator/planner.py

"""

Шаг пайплайна для математического расчета планограммы.

Парсит граф от LLM и подбирает реальные 3D-модели для абстрактных категорий,

опираясь на иерархию продуктов (products_hierarchy) из конфигурации проекта.

"""
import logging
import random
from typing import Any, Dict, List

from omegaconf import DictConfig

from dsynth.scene_gen.utils import RectFixture
from .base import BaseStep, LayoutContext

logger: logging.Logger = logging.getLogger(__name__)

class CalculatePlanogramStep(BaseStep):
    """Синтаксический анализатор графа и калькулятор умной выкладки."""

    def _build_catalog_from_config(self, cfg: DictConfig) -> Dict[str, List[str]]:
        """

        Собирает плоский словарь категорий и их SKU из конфига products_hierarchy.

        Формирует полные составные ключи, совпадающие с теми, что лежат в кэше ОЗУ.

        """
        catalog: Dict[str, List[str]] = {}
        if not cfg:
            return catalog
            
        assets_cfg = cfg.get("assets", cfg.get("asset_config", {}))
        hierarchy = assets_cfg.get("products_hierarchy", {})
        
        for group_name, group_data in hierarchy.items():
            for category_name, category_items in group_data.items():
                if category_items:
                    catalog[category_name] = [
                        f"{group_name}.{category_name}.{item_name}" 
                        for item_name in category_items.keys()
                    ]
                    
        return catalog

    def process(self, context: LayoutContext) -> LayoutContext:
        """

        Выполняет расчет планограммы, динамически сопоставляя узлы графа 

        с доступными 3D-моделями в ОЗУ. Учитывает физические габариты товаров.

        """
        logger.info("🧮 Шаг 2: Анализ графа и подбор 3D-моделей по конфигу products_hierarchy...")
        
        sku_catalog: Dict[str, List[str]] = self._build_catalog_from_config(context.cfg)
        available_assets_in_ram = set(context.product_assets_lib.keys())
        
        if not context.layout_graph or "nodes" not in context.layout_graph:
            logger.warning("⚠️ Граф от LLM не найден. Генерируем базовый граф-заглушку (4 объекта)...")
            
            fallback_category: str = "BEER" 
            for cat, assets in sku_catalog.items():
                if any(a in available_assets_in_ram for a in assets):
                    fallback_category = cat
                    break
            
            context.layout_graph = {
                "nodes": [
                    {"id": "fallback_fridge", "type": "large_showcase_fake", "items": {fallback_category: 5}},
                    {"id": "fallback_shelf_metal", "type": "shelf_metal", "items": {fallback_category: 10}},
                    {"id": "fallback_shelf_wood", "type": "shelf_low_res", "items": {fallback_category: 10}},
                    {"id": "fallback_island", "type": "small_shelf_two_sided", "items": {fallback_category: 15}}
                ],
                "edges": [
                    {"source": "fallback_fridge", "target": "fallback_shelf_metal", "relation": "next_to"},
                    {"source": "fallback_shelf_metal", "target": "fallback_shelf_wood", "relation": "next_to"},
                    {"source": "fallback_shelf_wood", "target": "fallback_island", "relation": "opposite"}
                ]
            }

        stats = {k: len(v) for k, v in sku_catalog.items()}
        logger.info(f"📦 Индекс товаров из конфига: {stats}")

        graph: Dict[str, Any] = context.layout_graph
        nodes_dict: Dict[str, RectFixture] = {}
        product_filling: Dict[str, Any] = {}
        fixed_zones: Dict[str, Any] = {"zone_0": {}}

        for node in graph.get("nodes", []):
            node_id: str = node["id"]
            fixture_type: str = node["type"]
            items: Dict[str, int] = node.get("items", {})
            
            asset_obj = context.product_assets_lib.get(fixture_type)
            if asset_obj is None:
                asset_obj = context.product_assets_lib.get(f"fixtures.{fixture_type}")
                
            if asset_obj:
                try:
                    l, w = asset_obj.trimesh_scene.extents[0], asset_obj.trimesh_scene.extents[1]
                except Exception:
                    l, w = 1.55, 0.6
            else:
                logger.warning(f"⚠️ Оборудование '{fixture_type}' не найдено в ОЗУ. Используем дефолтные габариты.")
                l, w = 1.55, 0.6

            rect = RectFixture(
                name=f"zone_0.{node_id}", x=0, y=0, l=l, w=w, 
                occupancy_width=0.5, asset_name=fixture_type
            )
            nodes_dict[node_id] = rect

            MAX_BOARDS: int = 5
            GAP: float = 0.05
            boards_space: List[float] = [l * 0.9 for _ in range(MAX_BOARDS)]
            board_items: List[List[str]] = [[] for _ in range(MAX_BOARDS)]

            # Переменные для динамического вычисления шага сетки
            max_item_len: float = 0.1
            max_item_depth: float = 0.1

            for category, count in items.items():
                mapped_assets: List[str] = sku_catalog.get(category, [])
                valid_assets: List[str] = [a for a in mapped_assets if a in available_assets_in_ram]
                
                if not valid_assets:
                    logger.warning(f"⚠️ Для категории '{category}' нет загруженных моделей! Полка останется пустой.")
                    continue
                    
                asset_dims: Dict[str, float] = {}
                for a in valid_assets:
                    try:
                        obj = context.product_assets_lib[a]
                        asset_dims[a] = obj.trimesh_scene.extents[0]
                    except Exception:
                        asset_dims[a] = 0.2
                        
                placed_count: int = 0
                while placed_count < count:
                    max_available_space: float = max(boards_space)
                    fit_assets: List[str] = [a for a in valid_assets if asset_dims[a] + GAP <= max_available_space]
                    
                    if not fit_assets:
                        break
                        
                    chosen_asset: str = random.choice(fit_assets)
                    chosen_len: float = asset_dims[chosen_asset]
                    
                    for b_idx in range(MAX_BOARDS):
                        if boards_space[b_idx] >= chosen_len + GAP:
                            board_items[b_idx].append(chosen_asset)
                            boards_space[b_idx] -= (chosen_len + GAP)
                            
                            # Динамически обновляем максимальные габариты для этой конкретной полки
                            max_item_len = max(max_item_len, chosen_len)
                            try:
                                obj_depth = context.product_assets_lib[chosen_asset].trimesh_scene.extents[1]
                                max_item_depth = max(max_item_depth, obj_depth)
                            except Exception:
                                pass
                                
                            break
                            
                    placed_count += 1
            
            chunked_items: List[List[str]] = [board for board in board_items if board]
            product_filling[f"zone_0.{node_id}"] = chunked_items

            # Динамическая конфигурация сетки для физического движка
            class DynamicShelfCfg:
                shelf_asset = fixture_type
                x_gap, y_gap = GAP, GAP
                # Шаг выставляется точно по ширине самого большого товара на этой полке
                delta_x = max_item_len + GAP
                delta_y = max_item_depth + GAP
                start_point_x, start_point_y = 0.0, 0.0
                filling_type = "random"
                noise_std_x, noise_std_y = 0.01, 0.01
                rotation_lower, rotation_upper = -5, 5
                
            fixed_zones["zone_0"][node_id] = DynamicShelfCfg()

        context.nodes_dict = nodes_dict
        context.product_filling = product_filling
        context.fixed_zones = fixed_zones
        
        total_items: int = sum([len(chunk) for chunks in product_filling.values() for chunk in chunks])
        context.report_matrix = {"total_items_placed": total_items}
        context.total_shelves = len(nodes_dict)
        
        logger.info(f"✅ Планограмма готова: {len(nodes_dict)} полок, {total_items} товаров.")
        return context