Spyspook's picture
initial commit
ce82348 verified
# src/layout_generator/planner.py
"""
Шаг пайплайна для математического расчета планограммы.
Парсит граф от LLM и подбирает реальные 3D-модели для абстрактных категорий,
опираясь на иерархию продуктов (products_hierarchy) из конфигурации проекта.
"""
import logging
import random
from typing import Any, Dict, List
from omegaconf import DictConfig
from dsynth.scene_gen.utils import RectFixture
from .base import BaseStep, LayoutContext
logger: logging.Logger = logging.getLogger(__name__)
class CalculatePlanogramStep(BaseStep):
"""Синтаксический анализатор графа и калькулятор умной выкладки."""
def _build_catalog_from_config(self, cfg: DictConfig) -> Dict[str, List[str]]:
"""
Собирает плоский словарь категорий и их SKU из конфига products_hierarchy.
Формирует полные составные ключи, совпадающие с теми, что лежат в кэше ОЗУ.
"""
catalog: Dict[str, List[str]] = {}
if not cfg:
return catalog
assets_cfg = cfg.get("assets", cfg.get("asset_config", {}))
hierarchy = assets_cfg.get("products_hierarchy", {})
for group_name, group_data in hierarchy.items():
for category_name, category_items in group_data.items():
if category_items:
catalog[category_name] = [
f"{group_name}.{category_name}.{item_name}"
for item_name in category_items.keys()
]
return catalog
def process(self, context: LayoutContext) -> LayoutContext:
"""
Выполняет расчет планограммы, динамически сопоставляя узлы графа
с доступными 3D-моделями в ОЗУ. Учитывает физические габариты товаров.
"""
logger.info("🧮 Шаг 2: Анализ графа и подбор 3D-моделей по конфигу products_hierarchy...")
sku_catalog: Dict[str, List[str]] = self._build_catalog_from_config(context.cfg)
available_assets_in_ram = set(context.product_assets_lib.keys())
if not context.layout_graph or "nodes" not in context.layout_graph:
logger.warning("⚠️ Граф от LLM не найден. Генерируем базовый граф-заглушку (4 объекта)...")
fallback_category: str = "BEER"
for cat, assets in sku_catalog.items():
if any(a in available_assets_in_ram for a in assets):
fallback_category = cat
break
context.layout_graph = {
"nodes": [
{"id": "fallback_fridge", "type": "large_showcase_fake", "items": {fallback_category: 5}},
{"id": "fallback_shelf_metal", "type": "shelf_metal", "items": {fallback_category: 10}},
{"id": "fallback_shelf_wood", "type": "shelf_low_res", "items": {fallback_category: 10}},
{"id": "fallback_island", "type": "small_shelf_two_sided", "items": {fallback_category: 15}}
],
"edges": [
{"source": "fallback_fridge", "target": "fallback_shelf_metal", "relation": "next_to"},
{"source": "fallback_shelf_metal", "target": "fallback_shelf_wood", "relation": "next_to"},
{"source": "fallback_shelf_wood", "target": "fallback_island", "relation": "opposite"}
]
}
stats = {k: len(v) for k, v in sku_catalog.items()}
logger.info(f"📦 Индекс товаров из конфига: {stats}")
graph: Dict[str, Any] = context.layout_graph
nodes_dict: Dict[str, RectFixture] = {}
product_filling: Dict[str, Any] = {}
fixed_zones: Dict[str, Any] = {"zone_0": {}}
for node in graph.get("nodes", []):
node_id: str = node["id"]
fixture_type: str = node["type"]
items: Dict[str, int] = node.get("items", {})
asset_obj = context.product_assets_lib.get(fixture_type)
if asset_obj is None:
asset_obj = context.product_assets_lib.get(f"fixtures.{fixture_type}")
if asset_obj:
try:
l, w = asset_obj.trimesh_scene.extents[0], asset_obj.trimesh_scene.extents[1]
except Exception:
l, w = 1.55, 0.6
else:
logger.warning(f"⚠️ Оборудование '{fixture_type}' не найдено в ОЗУ. Используем дефолтные габариты.")
l, w = 1.55, 0.6
rect = RectFixture(
name=f"zone_0.{node_id}", x=0, y=0, l=l, w=w,
occupancy_width=0.5, asset_name=fixture_type
)
nodes_dict[node_id] = rect
MAX_BOARDS: int = 5
GAP: float = 0.05
boards_space: List[float] = [l * 0.9 for _ in range(MAX_BOARDS)]
board_items: List[List[str]] = [[] for _ in range(MAX_BOARDS)]
# Переменные для динамического вычисления шага сетки
max_item_len: float = 0.1
max_item_depth: float = 0.1
for category, count in items.items():
mapped_assets: List[str] = sku_catalog.get(category, [])
valid_assets: List[str] = [a for a in mapped_assets if a in available_assets_in_ram]
if not valid_assets:
logger.warning(f"⚠️ Для категории '{category}' нет загруженных моделей! Полка останется пустой.")
continue
asset_dims: Dict[str, float] = {}
for a in valid_assets:
try:
obj = context.product_assets_lib[a]
asset_dims[a] = obj.trimesh_scene.extents[0]
except Exception:
asset_dims[a] = 0.2
placed_count: int = 0
while placed_count < count:
max_available_space: float = max(boards_space)
fit_assets: List[str] = [a for a in valid_assets if asset_dims[a] + GAP <= max_available_space]
if not fit_assets:
break
chosen_asset: str = random.choice(fit_assets)
chosen_len: float = asset_dims[chosen_asset]
for b_idx in range(MAX_BOARDS):
if boards_space[b_idx] >= chosen_len + GAP:
board_items[b_idx].append(chosen_asset)
boards_space[b_idx] -= (chosen_len + GAP)
# Динамически обновляем максимальные габариты для этой конкретной полки
max_item_len = max(max_item_len, chosen_len)
try:
obj_depth = context.product_assets_lib[chosen_asset].trimesh_scene.extents[1]
max_item_depth = max(max_item_depth, obj_depth)
except Exception:
pass
break
placed_count += 1
chunked_items: List[List[str]] = [board for board in board_items if board]
product_filling[f"zone_0.{node_id}"] = chunked_items
# Динамическая конфигурация сетки для физического движка
class DynamicShelfCfg:
shelf_asset = fixture_type
x_gap, y_gap = GAP, GAP
# Шаг выставляется точно по ширине самого большого товара на этой полке
delta_x = max_item_len + GAP
delta_y = max_item_depth + GAP
start_point_x, start_point_y = 0.0, 0.0
filling_type = "random"
noise_std_x, noise_std_y = 0.01, 0.01
rotation_lower, rotation_upper = -5, 5
fixed_zones["zone_0"][node_id] = DynamicShelfCfg()
context.nodes_dict = nodes_dict
context.product_filling = product_filling
context.fixed_zones = fixed_zones
total_items: int = sum([len(chunk) for chunks in product_filling.values() for chunk in chunks])
context.report_matrix = {"total_items_placed": total_items}
context.total_shelves = len(nodes_dict)
logger.info(f"✅ Планограмма готова: {len(nodes_dict)} полок, {total_items} товаров.")
return context