Spaces:
Sleeping
Sleeping
| import chainlit as cl | |
| import cv2 | |
| import numpy as np | |
| import json | |
| import os | |
| import logging | |
| from pathlib import Path | |
| from PIL import Image, ImageDraw | |
| import io | |
| import torch | |
| from transformers import CLIPProcessor, CLIPModel | |
| from huggingface_hub import hf_hub_download | |
| import requests | |
| # Set up logging | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler() # Only use console logging | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Print environment information | |
| logger.info(f"Python version: {os.sys.version}") | |
| logger.info(f"Current working directory: {os.getcwd()}") | |
| logger.info(f"Directory contents: {os.listdir('.')}") | |
| def download_file_from_github(url, local_path): | |
| """Download a file from GitHub and save it locally.""" | |
| # Create directory if it doesn't exist | |
| os.makedirs(os.path.dirname(local_path), exist_ok=True) | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| with open(local_path, 'wb') as f: | |
| f.write(response.content) | |
| logger.info(f"Downloaded {url} to {local_path}") | |
| def download_all_required_files(): | |
| """Download all required files from GitHub repository.""" | |
| base_url = "https://raw.githubusercontent.com/chelleboyer/planolyzer/main" | |
| files_to_download = { | |
| f"{base_url}/data/product_positions_adjusted_v10.json": "data/product_positions_adjusted_v10.json", | |
| f"{base_url}/data/shelf_overlay_adjusted_v10.jpg": "data/shelf_overlay_adjusted_v10.jpg", | |
| f"{base_url}/data/planogram001/planogram.png": "data/planogram001/planogram.png", | |
| f"{base_url}/data/planogram001/empty-space.png": "data/planogram001/empty-space.png", | |
| f"{base_url}/data/test_shelf_image_cig_003.png": "data/test_shelf_image_cig_003.png", | |
| f"{base_url}/data/PP_backgound.jpg": "data/PP_backgound.jpg" # Updated path to data folder | |
| } | |
| for url, local_path in files_to_download.items(): | |
| try: | |
| download_file_from_github(url, local_path) | |
| except Exception as e: | |
| logger.error(f"Failed to download {url}: {str(e)}") | |
| raise | |
| # Initialize the application by downloading required files | |
| try: | |
| download_all_required_files() | |
| logger.info("Successfully downloaded all required files") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize application: {str(e)}") | |
| raise | |
| # Use relative paths | |
| BASE_DIR = Path(__file__).parent | |
| PLANOGRAM_JSON = BASE_DIR / 'data' / 'product_positions_adjusted_v10.json' | |
| PLANOGRAM_IMAGE = BASE_DIR / 'data' / 'shelf_overlay_adjusted_v10.jpg' | |
| REFERENCE_IMAGE = BASE_DIR / 'data' / 'planogram001' / "planogram.png" | |
| # Initialize CLIP model and processor with a smaller model | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| logger.info(f"Using device: {device}") | |
| model_id = "openai/clip-vit-base-patch16" # Smaller model variant | |
| logger.info(f"Loading CLIP model: {model_id}") | |
| model = CLIPModel.from_pretrained(model_id).to(device) | |
| processor = CLIPProcessor.from_pretrained(model_id) | |
| logger.info("CLIP model and processor initialized successfully") | |
| def compress_image(image, max_size=(400, 400)): | |
| """Compress image while maintaining aspect ratio.""" | |
| if isinstance(image, np.ndarray): | |
| image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) | |
| # Calculate new dimensions | |
| ratio = min(max_size[0]/image.size[0], max_size[1]/image.size[1]) | |
| new_size = tuple(int(dim * ratio) for dim in image.size) | |
| # Resize image | |
| return image.resize(new_size, Image.Resampling.LANCZOS) | |
| def validate_image(image, name="image"): | |
| """Validate that an image is properly loaded and has valid dimensions.""" | |
| try: | |
| if image is None: | |
| raise ValueError(f"Failed to load {name}") | |
| if isinstance(image, np.ndarray): | |
| if image.size == 0: | |
| raise ValueError(f"{name} is empty") | |
| if len(image.shape) != 3: | |
| raise ValueError(f"{name} must be a color image (3 channels)") | |
| logger.info(f"{name} shape: {image.shape}") | |
| elif isinstance(image, Image.Image): | |
| if image.size[0] == 0 or image.size[1] == 0: | |
| raise ValueError(f"{name} has invalid dimensions") | |
| logger.info(f"{name} size: {image.size}") | |
| else: | |
| raise ValueError(f"{name} must be a numpy array or PIL Image") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error validating {name}: {str(e)}", exc_info=True) | |
| raise | |
| # Load planogram metadata | |
| try: | |
| with open(PLANOGRAM_JSON, 'r') as f: | |
| planogram_data = json.load(f) | |
| if not planogram_data: | |
| raise ValueError("Planogram data is empty") | |
| logger.info(f"Successfully loaded planogram data from {PLANOGRAM_JSON}") | |
| except FileNotFoundError: | |
| logger.error(f"Planogram JSON file not found at {PLANOGRAM_JSON}") | |
| raise | |
| except json.JSONDecodeError: | |
| logger.error(f"Invalid JSON in planogram file {PLANOGRAM_JSON}") | |
| raise | |
| # Load and compress planogram image | |
| try: | |
| planogram_image = cv2.imread(str(PLANOGRAM_IMAGE)) | |
| validate_image(planogram_image, "planogram image") | |
| planogram_image = compress_image(planogram_image) | |
| logger.info(f"Successfully loaded and compressed planogram image from {PLANOGRAM_IMAGE}") | |
| except Exception as e: | |
| logger.error(f"Error loading planogram image: {str(e)}") | |
| raise | |
| def load_reference_image(): | |
| """Load and preprocess the reference planogram image.""" | |
| try: | |
| ref_img = Image.open(REFERENCE_IMAGE) | |
| if ref_img is None: | |
| raise FileNotFoundError(f"Reference image {REFERENCE_IMAGE} not found") | |
| return compress_image(ref_img) | |
| except Exception as e: | |
| print(f"Error loading reference image: {e}") | |
| return None | |
| def compare_images(ref_img, uploaded_img): | |
| """Compare the uploaded image with the reference image using CLIP.""" | |
| try: | |
| # Convert OpenCV image to PIL Image and compress | |
| if isinstance(uploaded_img, np.ndarray): | |
| uploaded_img = Image.fromarray(cv2.cvtColor(uploaded_img, cv2.COLOR_BGR2RGB)) | |
| uploaded_img = compress_image(uploaded_img) | |
| # Process images with CLIP | |
| inputs = processor( | |
| images=[ref_img, uploaded_img], | |
| return_tensors="pt", | |
| padding=True | |
| ).to(device) | |
| # Get image features | |
| with torch.no_grad(): | |
| image_features = model.get_image_features(**inputs) | |
| image_features = image_features / image_features.norm(dim=1, keepdim=True) | |
| # Calculate similarity | |
| similarity = torch.nn.functional.cosine_similarity( | |
| image_features[0].unsqueeze(0), | |
| image_features[1].unsqueeze(0) | |
| ).item() | |
| # Calculate difference percentage (inverse of similarity) | |
| diff_percentage = (1 - similarity) * 100 | |
| return { | |
| 'similarity_score': similarity, | |
| 'difference_percentage': diff_percentage, | |
| 'is_similar': similarity > 0.85 | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in CLIP comparison: {str(e)}") | |
| raise | |
| async def start(): | |
| """Initialize the chat session.""" | |
| ref_img = load_reference_image() | |
| if ref_img is None: | |
| await cl.Message( | |
| content="Error: Reference planogram image not found. Please ensure planogram.png exists in the project directory." | |
| ).send() | |
| return | |
| # Create a welcome message with instructions | |
| welcome_msg = """ | |
| # Welcome to Planolyzer! ποΈ | |
| ## Quick Start: | |
| 1. Download the test image below | |
| 2. Upload it back to see how the system works | |
| 3. Try creating your own test image by adding empty spaces to the reference planogram | |
| """ | |
| # Send welcome message | |
| await cl.Message(content=welcome_msg).send() | |
| # Send reference planogram | |
| await cl.Message( | |
| content="## Reference Planogram:", | |
| elements=[ | |
| cl.Image( | |
| name="planogram", | |
| path=str(REFERENCE_IMAGE), | |
| display="inline", | |
| size="medium" | |
| ), | |
| cl.File( | |
| name="planogram.png", | |
| path=str(REFERENCE_IMAGE), | |
| display="inline" | |
| ), | |
| cl.Image( | |
| name="empty_space", | |
| path=str(BASE_DIR / 'data' / 'planogram001' / 'empty-space.png'), | |
| display="inline", | |
| size="small" | |
| ), | |
| cl.File( | |
| name="empty-space.png", | |
| path=str(BASE_DIR / 'data' / 'planogram001' / 'empty-space.png'), | |
| display="inline" | |
| ) | |
| ] | |
| ).send() | |
| # Send test image | |
| await cl.Message( | |
| content="## Test Image:", | |
| elements=[ | |
| cl.Image( | |
| name="test_shelf", | |
| path=str(BASE_DIR / 'data' / 'test_shelf_image_cig_003.png'), | |
| display="inline", | |
| size="medium" | |
| ), | |
| cl.File( | |
| name="test_shelf_image_cig_003.png", | |
| path=str(BASE_DIR / 'data' / 'test_shelf_image_cig_003.png'), | |
| display="inline" | |
| ) | |
| ] | |
| ).send() | |
| # Send additional instructions | |
| await cl.Message( | |
| content="Try downloading and uploading the test image first to see how the system works!" | |
| ).send() | |
| async def main(message: cl.Message): | |
| """Handle incoming messages and image uploads.""" | |
| logger.info("Received message") | |
| # If there are no elements and no message content, do nothing | |
| if not message.elements and not message.content: | |
| logger.info("No elements or content in message, returning") | |
| return | |
| # If there are elements, process them regardless of message content | |
| if message.elements: | |
| logger.info(f"Processing message with {len(message.elements)} elements") | |
| # Get the uploaded image | |
| uploaded_image = message.elements[0] | |
| logger.info(f"Uploaded image mime type: {uploaded_image.mime}") | |
| if not uploaded_image.mime.startswith('image/'): | |
| logger.warning(f"Invalid mime type: {uploaded_image.mime}") | |
| await cl.Message( | |
| content="Please upload a valid image file." | |
| ).send() | |
| return | |
| try: | |
| # Send initial processing message | |
| logger.info("Sending initial processing message") | |
| processing_msg = await cl.Message( | |
| content="π Processing your image... This may take a few moments as we analyze the shelf layout." | |
| ).send() | |
| logger.info("Initial processing message sent") | |
| # Convert uploaded image to OpenCV format | |
| img_path = uploaded_image.path | |
| logger.info(f"Reading image from path: {img_path}") | |
| uploaded_img = cv2.imread(img_path) | |
| if uploaded_img is None: | |
| logger.error(f"Failed to read image from path: {img_path}") | |
| await cl.Message(content="β Error: Could not read the uploaded image. Please try again with a different image.").send() | |
| return | |
| logger.info(f"Successfully read image, shape: {uploaded_img.shape}") | |
| # Validate uploaded image | |
| try: | |
| validate_image(uploaded_img, "uploaded image") | |
| logger.info("Uploaded image validation successful") | |
| except ValueError as e: | |
| logger.error(f"Invalid uploaded image: {str(e)}") | |
| await cl.Message(content=f"β Error: {str(e)}").send() | |
| return | |
| # Load reference image | |
| logger.info("Loading reference image") | |
| ref_img = load_reference_image() | |
| if ref_img is None: | |
| logger.error("Failed to load reference image") | |
| await cl.Message(content="β Error: Reference planogram image not found.").send() | |
| return | |
| # Validate reference image | |
| try: | |
| validate_image(ref_img, "reference image") | |
| logger.info("Reference image validation successful") | |
| except ValueError as e: | |
| logger.error(f"Invalid reference image: {str(e)}") | |
| await cl.Message(content=f"β Error: {str(e)}").send() | |
| return | |
| # Compare images using CLIP | |
| await cl.Message(content="π Comparing with reference planogram...").send() | |
| logger.info("Starting CLIP comparison") | |
| try: | |
| comparison_result = compare_images(ref_img, uploaded_img) | |
| logger.info(f"CLIP comparison completed with result: {comparison_result}") | |
| except Exception as e: | |
| logger.error(f"CLIP comparison failed: {str(e)}", exc_info=True) | |
| await cl.Message(content="β Error during image comparison. Please try again.").send() | |
| return | |
| # Prepare response | |
| if comparison_result['is_similar']: | |
| # Image is accepted, proceed with empty space analysis | |
| await cl.Message(content="β Image accepted! Analyzing empty spaces...").send() | |
| logger.info("Starting empty space analysis") | |
| try: | |
| # Perform empty space analysis | |
| analysis_result = check_empty_spaces(uploaded_img) | |
| logger.info("Empty space analysis completed successfully") | |
| await cl.Message(content=analysis_result).send() | |
| except Exception as e: | |
| logger.error(f"Empty space analysis failed: {str(e)}", exc_info=True) | |
| await cl.Message(content="β Error during empty space analysis. Please try again.").send() | |
| return | |
| else: | |
| # Image is rejected | |
| response = f"β No go! Image rejected.\n" | |
| response += f"Similarity score: {comparison_result['similarity_score']:.2f}\n" | |
| response += f"Difference percentage: {comparison_result['difference_percentage']:.2f}%\n\n" | |
| response += "Please upload a different image that better matches the reference planogram." | |
| await cl.Message(content=response).send() | |
| except Exception as e: | |
| logger.error(f"Error processing image: {str(e)}", exc_info=True) | |
| await cl.Message(content=f"β Error processing image: {str(e)}").send() | |
| else: | |
| # Handle text-only messages | |
| await cl.Message( | |
| content="π Hi! I'm here to help you analyze planogram images. Please upload an image to get started. You can use the test image provided above to try out the system!" | |
| ).send() | |
| def check_empty_spaces(shelf_img): | |
| try: | |
| logger.info("Starting empty space analysis") | |
| logger.info(f"Shelf image shape: {shelf_img.shape}") | |
| shelf_hsv = cv2.cvtColor(shelf_img, cv2.COLOR_BGR2HSV) | |
| logger.info(f"Converted to HSV, shape: {shelf_hsv.shape}") | |
| report_lines = [] | |
| total_spots = len(planogram_data) | |
| empty_spots = 0 | |
| debug_lines = [] # For debugging output | |
| for item in planogram_data: | |
| x, y, w, h = item['x'], item['y'], item['width'], item['height'] | |
| h_img, w_img = shelf_hsv.shape[:2] | |
| x = max(0, min(x, w_img - 1)) | |
| y = max(0, min(y, h_img - 1)) | |
| w = min(w, w_img - x) | |
| h = min(h, h_img - y) | |
| logger.info(f"Processing item {item['name']}: x={x}, y={y}, w={w}, h={h}") | |
| if w <= 0 or h <= 0: | |
| logger.warning(f"Invalid dimensions for {item['name']}: x={x}, y={y}, w={w}, h={h}") | |
| continue | |
| shelf_crop = shelf_hsv[y:y+h, x:x+w] | |
| avg_brightness = np.mean(shelf_crop[:, :, 2]) | |
| avg_saturation = np.mean(shelf_crop[:, :, 1]) | |
| debug_line = f"{item['name']} (SKU {item['sku']}): Brightness={avg_brightness:.1f}, Saturation={avg_saturation:.1f}" | |
| # New rule: low brightness and moderate/low saturation | |
| if avg_brightness < 70 and avg_saturation < 160: | |
| empty_spots += 1 | |
| report_lines.append(f"β {item['name']} (SKU {item['sku']}) is missing!") | |
| debug_line += " <-- Detected as empty" | |
| debug_lines.append(debug_line) | |
| for line in debug_lines: | |
| logger.info(line) | |
| if not report_lines: | |
| result = "β All spots look filled! Nice work!" | |
| else: | |
| summary = f"\n\nπ Summary: {empty_spots} out of {total_spots} spots are empty ({empty_spots/total_spots*100:.1f}%)" | |
| result = "\n".join(report_lines) + summary | |
| logger.info("Empty space analysis completed") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error in check_empty_spaces: {str(e)}", exc_info=True) | |
| raise |