# -*- coding: utf-8 -*- """Shelf detection and grouping utilities.""" from __future__ import annotations from dataclasses import dataclass from typing import List, Tuple, Dict, Any import numpy as np from PIL import Image, ImageDraw @dataclass class ShelfMetadata: shelf_id: int num_items: int y_range: Tuple[int, int] confidence: float status: str class ShelfInventoryProcessor: def __init__( self, model, overlap_threshold: float = 0.5, min_box_height: int = 20, min_items_per_shelf: int = 8, merge_overlap_threshold: float = 0.3, ) -> None: self.model = model self.overlap_threshold = overlap_threshold self.min_box_height = min_box_height self.min_items_per_shelf = min_items_per_shelf self.merge_overlap_threshold = merge_overlap_threshold # ---------- Geometry Utilities ---------- @staticmethod def vertical_overlap(range1: Tuple[float, float], range2: Tuple[float, float]) -> float: inter = min(range1[1], range2[1]) - max(range1[0], range2[0]) if inter <= 0: return 0.0 h1 = range1[1] - range1[0] return inter / h1 if h1 > 0 else 0.0 # ---------- Inference ---------- def run_inference(self, image: Image.Image) -> Tuple[np.ndarray | None, Image.Image, ImageDraw.ImageDraw]: results = self.model.predict(image, verbose=False)[0] img = image.convert("RGB") draw = ImageDraw.Draw(img) if not results.boxes: return None, img, draw boxes = results.boxes.xyxy.cpu().numpy() boxes = boxes[np.argsort(boxes[:, 1])] # top → bottom return boxes, img, draw # ---------- Initial Shelf Grouping ---------- def group_boxes_into_shelves(self, boxes: np.ndarray) -> List[List[np.ndarray]]: shelves: List[List[np.ndarray]] = [] for box in boxes: x1, y1, x2, y2 = box box_h = y2 - y1 if box_h < self.min_box_height: continue matched = False for shelf in shelves: s_y1 = np.median([b[1] for b in shelf]) s_y2 = np.median([b[3] for b in shelf]) inter = min(y2, s_y2) - max(y1, s_y1) overlap_ratio = inter / box_h if box_h > 0 else 0 if overlap_ratio > self.overlap_threshold: shelf.append(box) matched = True break if not matched: shelves.append([box]) return shelves # ---------- Shelf Object Builder ---------- def build_shelf_objects(self, shelves: List[List[np.ndarray]]) -> List[Dict[str, Any]]: shelf_objs: List[Dict[str, Any]] = [] for shelf in shelves: ys = [b[1] for b in shelf] + [b[3] for b in shelf] shelf_objs.append({ "boxes": shelf, "y_range": (min(ys), max(ys)), }) return shelf_objs # ---------- Post-processing Merge ---------- def merge_weak_shelves(self, shelf_objs: List[Dict[str, Any]]) -> List[List[np.ndarray]]: merged: List[List[np.ndarray]] = [] used = [False] * len(shelf_objs) for i in range(len(shelf_objs)): if used[i]: continue cur_boxes = shelf_objs[i]["boxes"] cur_range = shelf_objs[i]["y_range"] for j in range(i + 1, len(shelf_objs)): if used[j]: continue overlap = self.vertical_overlap(cur_range, shelf_objs[j]["y_range"]) if ( overlap > self.merge_overlap_threshold and ( len(cur_boxes) < self.min_items_per_shelf or len(shelf_objs[j]["boxes"]) < self.min_items_per_shelf ) ): cur_boxes.extend(shelf_objs[j]["boxes"]) used[j] = True merged.append(cur_boxes) used[i] = True return merged # ---------- Annotation & Metadata ---------- def annotate_and_build_metadata( self, shelves: List[List[np.ndarray]], draw: ImageDraw.ImageDraw, ) -> Tuple[List[np.ndarray], List[ShelfMetadata], List[Dict[str, Any]]]: final_boxes: List[np.ndarray] = [] shelf_metadata: List[ShelfMetadata] = [] object_metadata: List[Dict[str, Any]] = [] avg_items = float(np.mean([len(s) for s in shelves])) if shelves else 1.0 box_counter = 0 for shelf_id, shelf in enumerate(shelves, start=1): ys = [b[1] for b in shelf] + [b[3] for b in shelf] min_y, max_y = min(ys), max(ys) num_items = len(shelf) confidence = round(num_items / avg_items, 2) shelf_metadata.append( ShelfMetadata( shelf_id=shelf_id, num_items=num_items, y_range=(int(min_y), int(max_y)), confidence=confidence, status="stable" if confidence >= 0.5 else "unstable", ) ) for b in shelf: draw.rectangle([b[0], b[1], b[2], b[3]], outline="red", width=2) draw.text((b[0], b[1] - 10), f"S{shelf_id}", fill="red") final_boxes.append(b) object_metadata.append( { "box_id": box_counter, "shelf_id": shelf_id, "box": [int(v) for v in b], } ) box_counter += 1 return final_boxes, shelf_metadata, object_metadata # ---------- Crop Utilities ---------- def crop_annotated_image_by_object( self, annotated_img: Image.Image, boxes: List[np.ndarray], box_id: int | None = None, padding: int = 5, ) -> Image.Image | Dict[int, Image.Image]: width, height = annotated_img.size def _safe_crop(x1, y1, x2, y2): x1 = max(0, int(x1 - padding)) y1 = max(0, int(y1 - padding)) x2 = min(width, int(x2 + padding)) y2 = min(height, int(y2 + padding)) return annotated_img.crop((x1, y1, x2, y2)) if box_id is not None: if box_id < 0 or box_id >= len(boxes): raise IndexError(f"Box ID {box_id} out of range") x1, y1, x2, y2 = boxes[box_id] return _safe_crop(x1, y1, x2, y2) cropped: Dict[int, Image.Image] = {} for i, (x1, y1, x2, y2) in enumerate(boxes): cropped[i] = _safe_crop(x1, y1, x2, y2) return cropped # ---------- Run Full Pipeline ---------- def run(self, image: Image.Image): boxes, img, draw = self.run_inference(image) if boxes is None: return [], [], [], 0, img shelves = self.group_boxes_into_shelves(boxes) shelf_objs = self.build_shelf_objects(shelves) merged_shelves = self.merge_weak_shelves(shelf_objs) final_boxes, shelf_metadata, object_metadata = self.annotate_and_build_metadata( merged_shelves, draw ) return final_boxes, shelf_metadata, object_metadata, len(merged_shelves), img