Spaces:
Running
Running
| # -*- coding: utf-8 -*- | |
| """Shelf detection and grouping utilities.""" | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from typing import List, Tuple, Dict, Any | |
| import numpy as np | |
| from PIL import Image, ImageDraw | |
| class ShelfMetadata: | |
| shelf_id: int | |
| num_items: int | |
| y_range: Tuple[int, int] | |
| confidence: float | |
| status: str | |
| class ShelfInventoryProcessor: | |
| def __init__( | |
| self, | |
| model, | |
| overlap_threshold: float = 0.5, | |
| min_box_height: int = 20, | |
| min_items_per_shelf: int = 8, | |
| merge_overlap_threshold: float = 0.3, | |
| ) -> None: | |
| self.model = model | |
| self.overlap_threshold = overlap_threshold | |
| self.min_box_height = min_box_height | |
| self.min_items_per_shelf = min_items_per_shelf | |
| self.merge_overlap_threshold = merge_overlap_threshold | |
| # ---------- Geometry Utilities ---------- | |
| def vertical_overlap(range1: Tuple[float, float], range2: Tuple[float, float]) -> float: | |
| inter = min(range1[1], range2[1]) - max(range1[0], range2[0]) | |
| if inter <= 0: | |
| return 0.0 | |
| h1 = range1[1] - range1[0] | |
| return inter / h1 if h1 > 0 else 0.0 | |
| # ---------- Inference ---------- | |
| def run_inference(self, image: Image.Image) -> Tuple[np.ndarray | None, Image.Image, ImageDraw.ImageDraw]: | |
| results = self.model.predict(image, verbose=False)[0] | |
| img = image.convert("RGB") | |
| draw = ImageDraw.Draw(img) | |
| if not results.boxes: | |
| return None, img, draw | |
| boxes = results.boxes.xyxy.cpu().numpy() | |
| boxes = boxes[np.argsort(boxes[:, 1])] # top → bottom | |
| return boxes, img, draw | |
| # ---------- Initial Shelf Grouping ---------- | |
| def group_boxes_into_shelves(self, boxes: np.ndarray) -> List[List[np.ndarray]]: | |
| shelves: List[List[np.ndarray]] = [] | |
| for box in boxes: | |
| x1, y1, x2, y2 = box | |
| box_h = y2 - y1 | |
| if box_h < self.min_box_height: | |
| continue | |
| matched = False | |
| for shelf in shelves: | |
| s_y1 = np.median([b[1] for b in shelf]) | |
| s_y2 = np.median([b[3] for b in shelf]) | |
| inter = min(y2, s_y2) - max(y1, s_y1) | |
| overlap_ratio = inter / box_h if box_h > 0 else 0 | |
| if overlap_ratio > self.overlap_threshold: | |
| shelf.append(box) | |
| matched = True | |
| break | |
| if not matched: | |
| shelves.append([box]) | |
| return shelves | |
| # ---------- Shelf Object Builder ---------- | |
| def build_shelf_objects(self, shelves: List[List[np.ndarray]]) -> List[Dict[str, Any]]: | |
| shelf_objs: List[Dict[str, Any]] = [] | |
| for shelf in shelves: | |
| ys = [b[1] for b in shelf] + [b[3] for b in shelf] | |
| shelf_objs.append({ | |
| "boxes": shelf, | |
| "y_range": (min(ys), max(ys)), | |
| }) | |
| return shelf_objs | |
| # ---------- Post-processing Merge ---------- | |
| def merge_weak_shelves(self, shelf_objs: List[Dict[str, Any]]) -> List[List[np.ndarray]]: | |
| merged: List[List[np.ndarray]] = [] | |
| used = [False] * len(shelf_objs) | |
| for i in range(len(shelf_objs)): | |
| if used[i]: | |
| continue | |
| cur_boxes = shelf_objs[i]["boxes"] | |
| cur_range = shelf_objs[i]["y_range"] | |
| for j in range(i + 1, len(shelf_objs)): | |
| if used[j]: | |
| continue | |
| overlap = self.vertical_overlap(cur_range, shelf_objs[j]["y_range"]) | |
| if ( | |
| overlap > self.merge_overlap_threshold | |
| and ( | |
| len(cur_boxes) < self.min_items_per_shelf | |
| or len(shelf_objs[j]["boxes"]) < self.min_items_per_shelf | |
| ) | |
| ): | |
| cur_boxes.extend(shelf_objs[j]["boxes"]) | |
| used[j] = True | |
| merged.append(cur_boxes) | |
| used[i] = True | |
| return merged | |
| # ---------- Annotation & Metadata ---------- | |
| def annotate_and_build_metadata( | |
| self, | |
| shelves: List[List[np.ndarray]], | |
| draw: ImageDraw.ImageDraw, | |
| ) -> Tuple[List[np.ndarray], List[ShelfMetadata], List[Dict[str, Any]]]: | |
| final_boxes: List[np.ndarray] = [] | |
| shelf_metadata: List[ShelfMetadata] = [] | |
| object_metadata: List[Dict[str, Any]] = [] | |
| avg_items = float(np.mean([len(s) for s in shelves])) if shelves else 1.0 | |
| box_counter = 0 | |
| for shelf_id, shelf in enumerate(shelves, start=1): | |
| ys = [b[1] for b in shelf] + [b[3] for b in shelf] | |
| min_y, max_y = min(ys), max(ys) | |
| num_items = len(shelf) | |
| confidence = round(num_items / avg_items, 2) | |
| shelf_metadata.append( | |
| ShelfMetadata( | |
| shelf_id=shelf_id, | |
| num_items=num_items, | |
| y_range=(int(min_y), int(max_y)), | |
| confidence=confidence, | |
| status="stable" if confidence >= 0.5 else "unstable", | |
| ) | |
| ) | |
| for b in shelf: | |
| draw.rectangle([b[0], b[1], b[2], b[3]], outline="red", width=2) | |
| draw.text((b[0], b[1] - 10), f"S{shelf_id}", fill="red") | |
| final_boxes.append(b) | |
| object_metadata.append( | |
| { | |
| "box_id": box_counter, | |
| "shelf_id": shelf_id, | |
| "box": [int(v) for v in b], | |
| } | |
| ) | |
| box_counter += 1 | |
| return final_boxes, shelf_metadata, object_metadata | |
| # ---------- Crop Utilities ---------- | |
| def crop_annotated_image_by_object( | |
| self, | |
| annotated_img: Image.Image, | |
| boxes: List[np.ndarray], | |
| box_id: int | None = None, | |
| padding: int = 5, | |
| ) -> Image.Image | Dict[int, Image.Image]: | |
| width, height = annotated_img.size | |
| def _safe_crop(x1, y1, x2, y2): | |
| x1 = max(0, int(x1 - padding)) | |
| y1 = max(0, int(y1 - padding)) | |
| x2 = min(width, int(x2 + padding)) | |
| y2 = min(height, int(y2 + padding)) | |
| return annotated_img.crop((x1, y1, x2, y2)) | |
| if box_id is not None: | |
| if box_id < 0 or box_id >= len(boxes): | |
| raise IndexError(f"Box ID {box_id} out of range") | |
| x1, y1, x2, y2 = boxes[box_id] | |
| return _safe_crop(x1, y1, x2, y2) | |
| cropped: Dict[int, Image.Image] = {} | |
| for i, (x1, y1, x2, y2) in enumerate(boxes): | |
| cropped[i] = _safe_crop(x1, y1, x2, y2) | |
| return cropped | |
| # ---------- Run Full Pipeline ---------- | |
| def run(self, image: Image.Image): | |
| boxes, img, draw = self.run_inference(image) | |
| if boxes is None: | |
| return [], [], [], 0, img | |
| shelves = self.group_boxes_into_shelves(boxes) | |
| shelf_objs = self.build_shelf_objects(shelves) | |
| merged_shelves = self.merge_weak_shelves(shelf_objs) | |
| final_boxes, shelf_metadata, object_metadata = self.annotate_and_build_metadata( | |
| merged_shelves, draw | |
| ) | |
| return final_boxes, shelf_metadata, object_metadata, len(merged_shelves), img | |