File size: 17,234 Bytes
34d3234
 
 
 
 
 
 
44c4536
34d3234
 
 
 
 
 
 
5ebebc8
 
cbc4795
 
fbe9ea3
cbc4795
5ebebc8
34d3234
 
5ebebc8
 
 
 
 
34d3234
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5718f74
2be9d49
34d3234
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5ebebc8
34d3234
5ebebc8
34d3234
 
5ebebc8
34d3234
a519d4c
34d3234
 
 
 
 
 
 
 
 
 
 
 
 
b605fb0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34d3234
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cbc4795
 
34d3234
 
cbc4795
34d3234
 
 
 
cbc4795
34d3234
 
cbc4795
 
34d3234
cbc4795
34d3234
 
 
 
 
 
a519d4c
cbc4795
a519d4c
 
 
cbc4795
a519d4c
34d3234
 
67694da
34d3234
 
67694da
da3175d
34d3234
b605fb0
cbc4795
 
b605fb0
 
 
cbc4795
b605fb0
 
da3175d
b605fb0
34d3234
 
67694da
34d3234
 
67694da
da3175d
34d3234
b605fb0
 
 
 
cbc4795
b605fb0
 
da3175d
b605fb0
34d3234
 
da3175d
67694da
cbc4795
 
 
 
 
da3175d
cbc4795
34d3234
 
 
 
da3175d
67694da
34d3234
cbc4795
 
 
 
da3175d
cbc4795
 
da3175d
cbc4795
34d3234
 
 
 
 
 
da3175d
34d3234
 
67694da
da3175d
34d3234
 
 
 
 
 
 
 
67694da
cbc4795
 
34d3234
cbc4795
 
34d3234
 
 
 
 
 
 
 
 
 
 
 
 
cbc4795
 
 
34d3234
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67694da
34d3234
 
67694da
 
 
 
34d3234
 
67694da
34d3234
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
import chainlit as cl
import cv2
import numpy as np
import json
import os
import logging
from pathlib import Path
from PIL import Image, ImageDraw
import io
import torch
from transformers import CLIPProcessor, CLIPModel
from huggingface_hub import hf_hub_download
import requests

# Set up logging
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()  # Only use console logging
    ]
)
logger = logging.getLogger(__name__)

# Print environment information
logger.info(f"Python version: {os.sys.version}")
logger.info(f"Current working directory: {os.getcwd()}")
logger.info(f"Directory contents: {os.listdir('.')}")

def download_file_from_github(url, local_path):
    """Download a file from GitHub and save it locally."""
    # Create directory if it doesn't exist
    os.makedirs(os.path.dirname(local_path), exist_ok=True)
    
    response = requests.get(url)
    response.raise_for_status()
    with open(local_path, 'wb') as f:
        f.write(response.content)
    logger.info(f"Downloaded {url} to {local_path}")

def download_all_required_files():
    """Download all required files from GitHub repository."""
    base_url = "https://raw.githubusercontent.com/chelleboyer/planolyzer/main"
    files_to_download = {
        f"{base_url}/data/product_positions_adjusted_v10.json": "data/product_positions_adjusted_v10.json",
        f"{base_url}/data/shelf_overlay_adjusted_v10.jpg": "data/shelf_overlay_adjusted_v10.jpg",
        f"{base_url}/data/planogram001/planogram.png": "data/planogram001/planogram.png",
        f"{base_url}/data/planogram001/empty-space.png": "data/planogram001/empty-space.png",
        f"{base_url}/data/test_shelf_image_cig_003.png": "data/test_shelf_image_cig_003.png",
        f"{base_url}/data/PP_backgound.jpg": "data/PP_backgound.jpg"  # Updated path to data folder
    }
    
    for url, local_path in files_to_download.items():
        try:
            download_file_from_github(url, local_path)
        except Exception as e:
            logger.error(f"Failed to download {url}: {str(e)}")
            raise

# Initialize the application by downloading required files
try:
    download_all_required_files()
    logger.info("Successfully downloaded all required files")
except Exception as e:
    logger.error(f"Failed to initialize application: {str(e)}")
    raise

# Use relative paths
BASE_DIR = Path(__file__).parent
PLANOGRAM_JSON = BASE_DIR / 'data' / 'product_positions_adjusted_v10.json'
PLANOGRAM_IMAGE = BASE_DIR / 'data' / 'shelf_overlay_adjusted_v10.jpg'
REFERENCE_IMAGE = BASE_DIR / 'data' / 'planogram001' / "planogram.png"

# Initialize CLIP model and processor with a smaller model
device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Using device: {device}")
model_id = "openai/clip-vit-base-patch16"  # Smaller model variant
logger.info(f"Loading CLIP model: {model_id}")
model = CLIPModel.from_pretrained(model_id).to(device)
processor = CLIPProcessor.from_pretrained(model_id)
logger.info("CLIP model and processor initialized successfully")

def compress_image(image, max_size=(400, 400)):
    """Compress image while maintaining aspect ratio."""
    if isinstance(image, np.ndarray):
        image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    
    # Calculate new dimensions
    ratio = min(max_size[0]/image.size[0], max_size[1]/image.size[1])
    new_size = tuple(int(dim * ratio) for dim in image.size)
    
    # Resize image
    return image.resize(new_size, Image.Resampling.LANCZOS)

def validate_image(image, name="image"):
    """Validate that an image is properly loaded and has valid dimensions."""
    try:
        if image is None:
            raise ValueError(f"Failed to load {name}")
        
        if isinstance(image, np.ndarray):
            if image.size == 0:
                raise ValueError(f"{name} is empty")
            if len(image.shape) != 3:
                raise ValueError(f"{name} must be a color image (3 channels)")
            logger.info(f"{name} shape: {image.shape}")
        elif isinstance(image, Image.Image):
            if image.size[0] == 0 or image.size[1] == 0:
                raise ValueError(f"{name} has invalid dimensions")
            logger.info(f"{name} size: {image.size}")
        else:
            raise ValueError(f"{name} must be a numpy array or PIL Image")
        
        return True
    except Exception as e:
        logger.error(f"Error validating {name}: {str(e)}", exc_info=True)
        raise

# Load planogram metadata
try:
    with open(PLANOGRAM_JSON, 'r') as f:
        planogram_data = json.load(f)
    if not planogram_data:
        raise ValueError("Planogram data is empty")
    logger.info(f"Successfully loaded planogram data from {PLANOGRAM_JSON}")
except FileNotFoundError:
    logger.error(f"Planogram JSON file not found at {PLANOGRAM_JSON}")
    raise
except json.JSONDecodeError:
    logger.error(f"Invalid JSON in planogram file {PLANOGRAM_JSON}")
    raise

# Load and compress planogram image
try:
    planogram_image = cv2.imread(str(PLANOGRAM_IMAGE))
    validate_image(planogram_image, "planogram image")
    planogram_image = compress_image(planogram_image)
    logger.info(f"Successfully loaded and compressed planogram image from {PLANOGRAM_IMAGE}")
except Exception as e:
    logger.error(f"Error loading planogram image: {str(e)}")
    raise

def load_reference_image():
    """Load and preprocess the reference planogram image."""
    try:
        ref_img = Image.open(REFERENCE_IMAGE)
        if ref_img is None:
            raise FileNotFoundError(f"Reference image {REFERENCE_IMAGE} not found")
        return compress_image(ref_img)
    except Exception as e:
        print(f"Error loading reference image: {e}")
        return None

def compare_images(ref_img, uploaded_img):
    """Compare the uploaded image with the reference image using CLIP."""
    try:
        # Convert OpenCV image to PIL Image and compress
        if isinstance(uploaded_img, np.ndarray):
            uploaded_img = Image.fromarray(cv2.cvtColor(uploaded_img, cv2.COLOR_BGR2RGB))
        uploaded_img = compress_image(uploaded_img)
        
        # Process images with CLIP
        inputs = processor(
            images=[ref_img, uploaded_img],
            return_tensors="pt",
            padding=True
        ).to(device)
        
        # Get image features
        with torch.no_grad():
            image_features = model.get_image_features(**inputs)
            image_features = image_features / image_features.norm(dim=1, keepdim=True)
        
        # Calculate similarity
        similarity = torch.nn.functional.cosine_similarity(
            image_features[0].unsqueeze(0),
            image_features[1].unsqueeze(0)
        ).item()
        
        # Calculate difference percentage (inverse of similarity)
        diff_percentage = (1 - similarity) * 100
        
        return {
            'similarity_score': similarity,
            'difference_percentage': diff_percentage,
            'is_similar': similarity > 0.85
        }
    except Exception as e:
        logger.error(f"Error in CLIP comparison: {str(e)}")
        raise

@cl.on_chat_start
async def start():
    """Initialize the chat session."""
    ref_img = load_reference_image()
    if ref_img is None:
        await cl.Message(
            content="Error: Reference planogram image not found. Please ensure planogram.png exists in the project directory."
        ).send()
        return
    
    # Create a welcome message with instructions
    welcome_msg = """
# Welcome to Planolyzer! πŸ›οΈ

## Quick Start:
1. Download the test image below
2. Upload it back to see how the system works
3. Try creating your own test image by adding empty spaces to the reference planogram
"""
    
    # Send welcome message
    await cl.Message(content=welcome_msg).send()
    
    # Send reference planogram
    await cl.Message(
        content="## Reference Planogram:",
        elements=[
            cl.Image(
                name="planogram",
                path=str(REFERENCE_IMAGE),
                display="inline",
                size="medium"
            ),
            cl.File(
                name="planogram.png",
                path=str(REFERENCE_IMAGE),
                display="inline"
            ),
            cl.Image(
                name="empty_space",
                path=str(BASE_DIR / 'data' / 'planogram001' / 'empty-space.png'),
                display="inline",
                size="small"
            ),
            cl.File(
                name="empty-space.png",
                path=str(BASE_DIR / 'data' / 'planogram001' / 'empty-space.png'),
                display="inline"
            )
        ]
    ).send()
    
    # Send test image
    await cl.Message(
        content="## Test Image:",
        elements=[
            cl.Image(
                name="test_shelf",
                path=str(BASE_DIR / 'data' / 'test_shelf_image_cig_003.png'),
                display="inline",
                size="medium"
            ),
            cl.File(
                name="test_shelf_image_cig_003.png",
                path=str(BASE_DIR / 'data' / 'test_shelf_image_cig_003.png'),
                display="inline"
            )
        ]
    ).send()
    
    # Send additional instructions
    await cl.Message(
        content="Try downloading and uploading the test image first to see how the system works!"
    ).send()

@cl.on_message
async def main(message: cl.Message):
    """Handle incoming messages and image uploads."""
    logger.info("Received message")
    
    # If there are no elements and no message content, do nothing
    if not message.elements and not message.content:
        logger.info("No elements or content in message, returning")
        return

    # If there are elements, process them regardless of message content
    if message.elements:
        logger.info(f"Processing message with {len(message.elements)} elements")
        # Get the uploaded image
        uploaded_image = message.elements[0]
        logger.info(f"Uploaded image mime type: {uploaded_image.mime}")
        
        if not uploaded_image.mime.startswith('image/'):
            logger.warning(f"Invalid mime type: {uploaded_image.mime}")
            await cl.Message(
                content="Please upload a valid image file."
            ).send()
            return

        try:
            # Send initial processing message
            logger.info("Sending initial processing message")
            processing_msg = await cl.Message(
                content="πŸ”„ Processing your image... This may take a few moments as we analyze the shelf layout."
            ).send()
            logger.info("Initial processing message sent")

            # Convert uploaded image to OpenCV format
            img_path = uploaded_image.path
            logger.info(f"Reading image from path: {img_path}")
            uploaded_img = cv2.imread(img_path)
            if uploaded_img is None:
                logger.error(f"Failed to read image from path: {img_path}")
                await cl.Message(content="❌ Error: Could not read the uploaded image. Please try again with a different image.").send()
                return
            
            logger.info(f"Successfully read image, shape: {uploaded_img.shape}")
            
            # Validate uploaded image
            try:
                validate_image(uploaded_img, "uploaded image")
                logger.info("Uploaded image validation successful")
            except ValueError as e:
                logger.error(f"Invalid uploaded image: {str(e)}")
                await cl.Message(content=f"❌ Error: {str(e)}").send()
                return

            # Load reference image
            logger.info("Loading reference image")
            ref_img = load_reference_image()
            if ref_img is None:
                logger.error("Failed to load reference image")
                await cl.Message(content="❌ Error: Reference planogram image not found.").send()
                return
            
            # Validate reference image
            try:
                validate_image(ref_img, "reference image")
                logger.info("Reference image validation successful")
            except ValueError as e:
                logger.error(f"Invalid reference image: {str(e)}")
                await cl.Message(content=f"❌ Error: {str(e)}").send()
                return

            # Compare images using CLIP
            await cl.Message(content="πŸ”„ Comparing with reference planogram...").send()
            logger.info("Starting CLIP comparison")
            try:
                comparison_result = compare_images(ref_img, uploaded_img)
                logger.info(f"CLIP comparison completed with result: {comparison_result}")
            except Exception as e:
                logger.error(f"CLIP comparison failed: {str(e)}", exc_info=True)
                await cl.Message(content="❌ Error during image comparison. Please try again.").send()
                return

            # Prepare response
            if comparison_result['is_similar']:
                # Image is accepted, proceed with empty space analysis
                await cl.Message(content="βœ… Image accepted! Analyzing empty spaces...").send()
                logger.info("Starting empty space analysis")
                
                try:
                    # Perform empty space analysis
                    analysis_result = check_empty_spaces(uploaded_img)
                    logger.info("Empty space analysis completed successfully")
                    await cl.Message(content=analysis_result).send()
                except Exception as e:
                    logger.error(f"Empty space analysis failed: {str(e)}", exc_info=True)
                    await cl.Message(content="❌ Error during empty space analysis. Please try again.").send()
                    return
            else:
                # Image is rejected
                response = f"❌ No go! Image rejected.\n"
                response += f"Similarity score: {comparison_result['similarity_score']:.2f}\n"
                response += f"Difference percentage: {comparison_result['difference_percentage']:.2f}%\n\n"
                response += "Please upload a different image that better matches the reference planogram."
                await cl.Message(content=response).send()

        except Exception as e:
            logger.error(f"Error processing image: {str(e)}", exc_info=True)
            await cl.Message(content=f"❌ Error processing image: {str(e)}").send()
    else:
        # Handle text-only messages
        await cl.Message(
            content="πŸ‘‹ Hi! I'm here to help you analyze planogram images. Please upload an image to get started. You can use the test image provided above to try out the system!"
        ).send()

def check_empty_spaces(shelf_img):
    try:
        logger.info("Starting empty space analysis")
        logger.info(f"Shelf image shape: {shelf_img.shape}")
        
        shelf_hsv = cv2.cvtColor(shelf_img, cv2.COLOR_BGR2HSV)
        logger.info(f"Converted to HSV, shape: {shelf_hsv.shape}")
        
        report_lines = []
        total_spots = len(planogram_data)
        empty_spots = 0

        debug_lines = []  # For debugging output

        for item in planogram_data:
            x, y, w, h = item['x'], item['y'], item['width'], item['height']
            h_img, w_img = shelf_hsv.shape[:2]
            x = max(0, min(x, w_img - 1))
            y = max(0, min(y, h_img - 1))
            w = min(w, w_img - x)
            h = min(h, h_img - y)
            
            logger.info(f"Processing item {item['name']}: x={x}, y={y}, w={w}, h={h}")
            
            if w <= 0 or h <= 0:
                logger.warning(f"Invalid dimensions for {item['name']}: x={x}, y={y}, w={w}, h={h}")
                continue

            shelf_crop = shelf_hsv[y:y+h, x:x+w]
            avg_brightness = np.mean(shelf_crop[:, :, 2])
            avg_saturation = np.mean(shelf_crop[:, :, 1])

            debug_line = f"{item['name']} (SKU {item['sku']}): Brightness={avg_brightness:.1f}, Saturation={avg_saturation:.1f}"

            # New rule: low brightness and moderate/low saturation
            if avg_brightness < 70 and avg_saturation < 160:
                empty_spots += 1
                report_lines.append(f"❌ {item['name']} (SKU {item['sku']}) is missing!")
                debug_line += " <-- Detected as empty"
            debug_lines.append(debug_line)

        for line in debug_lines:
            logger.info(line)

        if not report_lines:
            result = "βœ… All spots look filled! Nice work!"
        else:
            summary = f"\n\nπŸ“Š Summary: {empty_spots} out of {total_spots} spots are empty ({empty_spots/total_spots*100:.1f}%)"
            result = "\n".join(report_lines) + summary

        logger.info("Empty space analysis completed")
        return result

    except Exception as e:
        logger.error(f"Error in check_empty_spaces: {str(e)}", exc_info=True)
        raise