Spaces:
Sleeping
Sleeping
File size: 9,711 Bytes
ce82348 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 | # src/layout_generator/planner.py
"""
Шаг пайплайна для математического расчета планограммы.
Парсит граф от LLM и подбирает реальные 3D-модели для абстрактных категорий,
опираясь на иерархию продуктов (products_hierarchy) из конфигурации проекта.
"""
import logging
import random
from typing import Any, Dict, List
from omegaconf import DictConfig
from dsynth.scene_gen.utils import RectFixture
from .base import BaseStep, LayoutContext
logger: logging.Logger = logging.getLogger(__name__)
class CalculatePlanogramStep(BaseStep):
"""Синтаксический анализатор графа и калькулятор умной выкладки."""
def _build_catalog_from_config(self, cfg: DictConfig) -> Dict[str, List[str]]:
"""
Собирает плоский словарь категорий и их SKU из конфига products_hierarchy.
Формирует полные составные ключи, совпадающие с теми, что лежат в кэше ОЗУ.
"""
catalog: Dict[str, List[str]] = {}
if not cfg:
return catalog
assets_cfg = cfg.get("assets", cfg.get("asset_config", {}))
hierarchy = assets_cfg.get("products_hierarchy", {})
for group_name, group_data in hierarchy.items():
for category_name, category_items in group_data.items():
if category_items:
catalog[category_name] = [
f"{group_name}.{category_name}.{item_name}"
for item_name in category_items.keys()
]
return catalog
def process(self, context: LayoutContext) -> LayoutContext:
"""
Выполняет расчет планограммы, динамически сопоставляя узлы графа
с доступными 3D-моделями в ОЗУ. Учитывает физические габариты товаров.
"""
logger.info("🧮 Шаг 2: Анализ графа и подбор 3D-моделей по конфигу products_hierarchy...")
sku_catalog: Dict[str, List[str]] = self._build_catalog_from_config(context.cfg)
available_assets_in_ram = set(context.product_assets_lib.keys())
if not context.layout_graph or "nodes" not in context.layout_graph:
logger.warning("⚠️ Граф от LLM не найден. Генерируем базовый граф-заглушку (4 объекта)...")
fallback_category: str = "BEER"
for cat, assets in sku_catalog.items():
if any(a in available_assets_in_ram for a in assets):
fallback_category = cat
break
context.layout_graph = {
"nodes": [
{"id": "fallback_fridge", "type": "large_showcase_fake", "items": {fallback_category: 5}},
{"id": "fallback_shelf_metal", "type": "shelf_metal", "items": {fallback_category: 10}},
{"id": "fallback_shelf_wood", "type": "shelf_low_res", "items": {fallback_category: 10}},
{"id": "fallback_island", "type": "small_shelf_two_sided", "items": {fallback_category: 15}}
],
"edges": [
{"source": "fallback_fridge", "target": "fallback_shelf_metal", "relation": "next_to"},
{"source": "fallback_shelf_metal", "target": "fallback_shelf_wood", "relation": "next_to"},
{"source": "fallback_shelf_wood", "target": "fallback_island", "relation": "opposite"}
]
}
stats = {k: len(v) for k, v in sku_catalog.items()}
logger.info(f"📦 Индекс товаров из конфига: {stats}")
graph: Dict[str, Any] = context.layout_graph
nodes_dict: Dict[str, RectFixture] = {}
product_filling: Dict[str, Any] = {}
fixed_zones: Dict[str, Any] = {"zone_0": {}}
for node in graph.get("nodes", []):
node_id: str = node["id"]
fixture_type: str = node["type"]
items: Dict[str, int] = node.get("items", {})
asset_obj = context.product_assets_lib.get(fixture_type)
if asset_obj is None:
asset_obj = context.product_assets_lib.get(f"fixtures.{fixture_type}")
if asset_obj:
try:
l, w = asset_obj.trimesh_scene.extents[0], asset_obj.trimesh_scene.extents[1]
except Exception:
l, w = 1.55, 0.6
else:
logger.warning(f"⚠️ Оборудование '{fixture_type}' не найдено в ОЗУ. Используем дефолтные габариты.")
l, w = 1.55, 0.6
rect = RectFixture(
name=f"zone_0.{node_id}", x=0, y=0, l=l, w=w,
occupancy_width=0.5, asset_name=fixture_type
)
nodes_dict[node_id] = rect
MAX_BOARDS: int = 5
GAP: float = 0.05
boards_space: List[float] = [l * 0.9 for _ in range(MAX_BOARDS)]
board_items: List[List[str]] = [[] for _ in range(MAX_BOARDS)]
# Переменные для динамического вычисления шага сетки
max_item_len: float = 0.1
max_item_depth: float = 0.1
for category, count in items.items():
mapped_assets: List[str] = sku_catalog.get(category, [])
valid_assets: List[str] = [a for a in mapped_assets if a in available_assets_in_ram]
if not valid_assets:
logger.warning(f"⚠️ Для категории '{category}' нет загруженных моделей! Полка останется пустой.")
continue
asset_dims: Dict[str, float] = {}
for a in valid_assets:
try:
obj = context.product_assets_lib[a]
asset_dims[a] = obj.trimesh_scene.extents[0]
except Exception:
asset_dims[a] = 0.2
placed_count: int = 0
while placed_count < count:
max_available_space: float = max(boards_space)
fit_assets: List[str] = [a for a in valid_assets if asset_dims[a] + GAP <= max_available_space]
if not fit_assets:
break
chosen_asset: str = random.choice(fit_assets)
chosen_len: float = asset_dims[chosen_asset]
for b_idx in range(MAX_BOARDS):
if boards_space[b_idx] >= chosen_len + GAP:
board_items[b_idx].append(chosen_asset)
boards_space[b_idx] -= (chosen_len + GAP)
# Динамически обновляем максимальные габариты для этой конкретной полки
max_item_len = max(max_item_len, chosen_len)
try:
obj_depth = context.product_assets_lib[chosen_asset].trimesh_scene.extents[1]
max_item_depth = max(max_item_depth, obj_depth)
except Exception:
pass
break
placed_count += 1
chunked_items: List[List[str]] = [board for board in board_items if board]
product_filling[f"zone_0.{node_id}"] = chunked_items
# Динамическая конфигурация сетки для физического движка
class DynamicShelfCfg:
shelf_asset = fixture_type
x_gap, y_gap = GAP, GAP
# Шаг выставляется точно по ширине самого большого товара на этой полке
delta_x = max_item_len + GAP
delta_y = max_item_depth + GAP
start_point_x, start_point_y = 0.0, 0.0
filling_type = "random"
noise_std_x, noise_std_y = 0.01, 0.01
rotation_lower, rotation_upper = -5, 5
fixed_zones["zone_0"][node_id] = DynamicShelfCfg()
context.nodes_dict = nodes_dict
context.product_filling = product_filling
context.fixed_zones = fixed_zones
total_items: int = sum([len(chunk) for chunks in product_filling.values() for chunk in chunks])
context.report_matrix = {"total_items_placed": total_items}
context.total_shelves = len(nodes_dict)
logger.info(f"✅ Планограмма готова: {len(nodes_dict)} полок, {total_items} товаров.")
return context |