File size: 7,429 Bytes
11306a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# -*- coding: utf-8 -*-
"""Shelf detection and grouping utilities."""

from __future__ import annotations

from dataclasses import dataclass
from typing import List, Tuple, Dict, Any

import numpy as np
from PIL import Image, ImageDraw


@dataclass
class ShelfMetadata:
    shelf_id: int
    num_items: int
    y_range: Tuple[int, int]
    confidence: float
    status: str


class ShelfInventoryProcessor:
    def __init__(
        self,
        model,
        overlap_threshold: float = 0.5,
        min_box_height: int = 20,
        min_items_per_shelf: int = 8,
        merge_overlap_threshold: float = 0.3,
    ) -> None:
        self.model = model
        self.overlap_threshold = overlap_threshold
        self.min_box_height = min_box_height
        self.min_items_per_shelf = min_items_per_shelf
        self.merge_overlap_threshold = merge_overlap_threshold

    # ---------- Geometry Utilities ----------

    @staticmethod
    def vertical_overlap(range1: Tuple[float, float], range2: Tuple[float, float]) -> float:
        inter = min(range1[1], range2[1]) - max(range1[0], range2[0])
        if inter <= 0:
            return 0.0
        h1 = range1[1] - range1[0]
        return inter / h1 if h1 > 0 else 0.0

    # ---------- Inference ----------

    def run_inference(self, image: Image.Image) -> Tuple[np.ndarray | None, Image.Image, ImageDraw.ImageDraw]:
        results = self.model.predict(image, verbose=False)[0]
        img = image.convert("RGB")
        draw = ImageDraw.Draw(img)

        if not results.boxes:
            return None, img, draw

        boxes = results.boxes.xyxy.cpu().numpy()
        boxes = boxes[np.argsort(boxes[:, 1])]  # top → bottom

        return boxes, img, draw

    # ---------- Initial Shelf Grouping ----------

    def group_boxes_into_shelves(self, boxes: np.ndarray) -> List[List[np.ndarray]]:
        shelves: List[List[np.ndarray]] = []

        for box in boxes:
            x1, y1, x2, y2 = box
            box_h = y2 - y1

            if box_h < self.min_box_height:
                continue

            matched = False

            for shelf in shelves:
                s_y1 = np.median([b[1] for b in shelf])
                s_y2 = np.median([b[3] for b in shelf])

                inter = min(y2, s_y2) - max(y1, s_y1)
                overlap_ratio = inter / box_h if box_h > 0 else 0

                if overlap_ratio > self.overlap_threshold:
                    shelf.append(box)
                    matched = True
                    break

            if not matched:
                shelves.append([box])

        return shelves

    # ---------- Shelf Object Builder ----------

    def build_shelf_objects(self, shelves: List[List[np.ndarray]]) -> List[Dict[str, Any]]:
        shelf_objs: List[Dict[str, Any]] = []

        for shelf in shelves:
            ys = [b[1] for b in shelf] + [b[3] for b in shelf]
            shelf_objs.append({
                "boxes": shelf,
                "y_range": (min(ys), max(ys)),
            })

        return shelf_objs

    # ---------- Post-processing Merge ----------

    def merge_weak_shelves(self, shelf_objs: List[Dict[str, Any]]) -> List[List[np.ndarray]]:
        merged: List[List[np.ndarray]] = []
        used = [False] * len(shelf_objs)

        for i in range(len(shelf_objs)):
            if used[i]:
                continue

            cur_boxes = shelf_objs[i]["boxes"]
            cur_range = shelf_objs[i]["y_range"]

            for j in range(i + 1, len(shelf_objs)):
                if used[j]:
                    continue

                overlap = self.vertical_overlap(cur_range, shelf_objs[j]["y_range"])

                if (
                    overlap > self.merge_overlap_threshold
                    and (
                        len(cur_boxes) < self.min_items_per_shelf
                        or len(shelf_objs[j]["boxes"]) < self.min_items_per_shelf
                    )
                ):
                    cur_boxes.extend(shelf_objs[j]["boxes"])
                    used[j] = True

            merged.append(cur_boxes)
            used[i] = True

        return merged

    # ---------- Annotation & Metadata ----------

    def annotate_and_build_metadata(
        self,
        shelves: List[List[np.ndarray]],
        draw: ImageDraw.ImageDraw,
    ) -> Tuple[List[np.ndarray], List[ShelfMetadata], List[Dict[str, Any]]]:
        final_boxes: List[np.ndarray] = []
        shelf_metadata: List[ShelfMetadata] = []
        object_metadata: List[Dict[str, Any]] = []

        avg_items = float(np.mean([len(s) for s in shelves])) if shelves else 1.0
        box_counter = 0

        for shelf_id, shelf in enumerate(shelves, start=1):
            ys = [b[1] for b in shelf] + [b[3] for b in shelf]
            min_y, max_y = min(ys), max(ys)

            num_items = len(shelf)
            confidence = round(num_items / avg_items, 2)

            shelf_metadata.append(
                ShelfMetadata(
                    shelf_id=shelf_id,
                    num_items=num_items,
                    y_range=(int(min_y), int(max_y)),
                    confidence=confidence,
                    status="stable" if confidence >= 0.5 else "unstable",
                )
            )

            for b in shelf:
                draw.rectangle([b[0], b[1], b[2], b[3]], outline="red", width=2)
                draw.text((b[0], b[1] - 10), f"S{shelf_id}", fill="red")

                final_boxes.append(b)

                object_metadata.append(
                    {
                        "box_id": box_counter,
                        "shelf_id": shelf_id,
                        "box": [int(v) for v in b],
                    }
                )

                box_counter += 1

        return final_boxes, shelf_metadata, object_metadata

    # ---------- Crop Utilities ----------

    def crop_annotated_image_by_object(
        self,
        annotated_img: Image.Image,
        boxes: List[np.ndarray],
        box_id: int | None = None,
        padding: int = 5,
    ) -> Image.Image | Dict[int, Image.Image]:
        width, height = annotated_img.size

        def _safe_crop(x1, y1, x2, y2):
            x1 = max(0, int(x1 - padding))
            y1 = max(0, int(y1 - padding))
            x2 = min(width, int(x2 + padding))
            y2 = min(height, int(y2 + padding))
            return annotated_img.crop((x1, y1, x2, y2))

        if box_id is not None:
            if box_id < 0 or box_id >= len(boxes):
                raise IndexError(f"Box ID {box_id} out of range")

            x1, y1, x2, y2 = boxes[box_id]
            return _safe_crop(x1, y1, x2, y2)

        cropped: Dict[int, Image.Image] = {}
        for i, (x1, y1, x2, y2) in enumerate(boxes):
            cropped[i] = _safe_crop(x1, y1, x2, y2)

        return cropped

    # ---------- Run Full Pipeline ----------

    def run(self, image: Image.Image):
        boxes, img, draw = self.run_inference(image)

        if boxes is None:
            return [], [], [], 0, img

        shelves = self.group_boxes_into_shelves(boxes)
        shelf_objs = self.build_shelf_objects(shelves)
        merged_shelves = self.merge_weak_shelves(shelf_objs)
        final_boxes, shelf_metadata, object_metadata = self.annotate_and_build_metadata(
            merged_shelves, draw
        )

        return final_boxes, shelf_metadata, object_metadata, len(merged_shelves), img