planolyzer / app.py
chelleboyer's picture
Move PP_background.jpg to data folder
2be9d49
import chainlit as cl
import cv2
import numpy as np
import json
import os
import logging
from pathlib import Path
from PIL import Image, ImageDraw
import io
import torch
from transformers import CLIPProcessor, CLIPModel
from huggingface_hub import hf_hub_download
import requests
# Set up logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler() # Only use console logging
]
)
logger = logging.getLogger(__name__)
# Print environment information
logger.info(f"Python version: {os.sys.version}")
logger.info(f"Current working directory: {os.getcwd()}")
logger.info(f"Directory contents: {os.listdir('.')}")
def download_file_from_github(url, local_path):
"""Download a file from GitHub and save it locally."""
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(local_path), exist_ok=True)
response = requests.get(url)
response.raise_for_status()
with open(local_path, 'wb') as f:
f.write(response.content)
logger.info(f"Downloaded {url} to {local_path}")
def download_all_required_files():
"""Download all required files from GitHub repository."""
base_url = "https://raw.githubusercontent.com/chelleboyer/planolyzer/main"
files_to_download = {
f"{base_url}/data/product_positions_adjusted_v10.json": "data/product_positions_adjusted_v10.json",
f"{base_url}/data/shelf_overlay_adjusted_v10.jpg": "data/shelf_overlay_adjusted_v10.jpg",
f"{base_url}/data/planogram001/planogram.png": "data/planogram001/planogram.png",
f"{base_url}/data/planogram001/empty-space.png": "data/planogram001/empty-space.png",
f"{base_url}/data/test_shelf_image_cig_003.png": "data/test_shelf_image_cig_003.png",
f"{base_url}/data/PP_backgound.jpg": "data/PP_backgound.jpg" # Updated path to data folder
}
for url, local_path in files_to_download.items():
try:
download_file_from_github(url, local_path)
except Exception as e:
logger.error(f"Failed to download {url}: {str(e)}")
raise
# Initialize the application by downloading required files
try:
download_all_required_files()
logger.info("Successfully downloaded all required files")
except Exception as e:
logger.error(f"Failed to initialize application: {str(e)}")
raise
# Use relative paths
BASE_DIR = Path(__file__).parent
PLANOGRAM_JSON = BASE_DIR / 'data' / 'product_positions_adjusted_v10.json'
PLANOGRAM_IMAGE = BASE_DIR / 'data' / 'shelf_overlay_adjusted_v10.jpg'
REFERENCE_IMAGE = BASE_DIR / 'data' / 'planogram001' / "planogram.png"
# Initialize CLIP model and processor with a smaller model
device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Using device: {device}")
model_id = "openai/clip-vit-base-patch16" # Smaller model variant
logger.info(f"Loading CLIP model: {model_id}")
model = CLIPModel.from_pretrained(model_id).to(device)
processor = CLIPProcessor.from_pretrained(model_id)
logger.info("CLIP model and processor initialized successfully")
def compress_image(image, max_size=(400, 400)):
"""Compress image while maintaining aspect ratio."""
if isinstance(image, np.ndarray):
image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
# Calculate new dimensions
ratio = min(max_size[0]/image.size[0], max_size[1]/image.size[1])
new_size = tuple(int(dim * ratio) for dim in image.size)
# Resize image
return image.resize(new_size, Image.Resampling.LANCZOS)
def validate_image(image, name="image"):
"""Validate that an image is properly loaded and has valid dimensions."""
try:
if image is None:
raise ValueError(f"Failed to load {name}")
if isinstance(image, np.ndarray):
if image.size == 0:
raise ValueError(f"{name} is empty")
if len(image.shape) != 3:
raise ValueError(f"{name} must be a color image (3 channels)")
logger.info(f"{name} shape: {image.shape}")
elif isinstance(image, Image.Image):
if image.size[0] == 0 or image.size[1] == 0:
raise ValueError(f"{name} has invalid dimensions")
logger.info(f"{name} size: {image.size}")
else:
raise ValueError(f"{name} must be a numpy array or PIL Image")
return True
except Exception as e:
logger.error(f"Error validating {name}: {str(e)}", exc_info=True)
raise
# Load planogram metadata
try:
with open(PLANOGRAM_JSON, 'r') as f:
planogram_data = json.load(f)
if not planogram_data:
raise ValueError("Planogram data is empty")
logger.info(f"Successfully loaded planogram data from {PLANOGRAM_JSON}")
except FileNotFoundError:
logger.error(f"Planogram JSON file not found at {PLANOGRAM_JSON}")
raise
except json.JSONDecodeError:
logger.error(f"Invalid JSON in planogram file {PLANOGRAM_JSON}")
raise
# Load and compress planogram image
try:
planogram_image = cv2.imread(str(PLANOGRAM_IMAGE))
validate_image(planogram_image, "planogram image")
planogram_image = compress_image(planogram_image)
logger.info(f"Successfully loaded and compressed planogram image from {PLANOGRAM_IMAGE}")
except Exception as e:
logger.error(f"Error loading planogram image: {str(e)}")
raise
def load_reference_image():
"""Load and preprocess the reference planogram image."""
try:
ref_img = Image.open(REFERENCE_IMAGE)
if ref_img is None:
raise FileNotFoundError(f"Reference image {REFERENCE_IMAGE} not found")
return compress_image(ref_img)
except Exception as e:
print(f"Error loading reference image: {e}")
return None
def compare_images(ref_img, uploaded_img):
"""Compare the uploaded image with the reference image using CLIP."""
try:
# Convert OpenCV image to PIL Image and compress
if isinstance(uploaded_img, np.ndarray):
uploaded_img = Image.fromarray(cv2.cvtColor(uploaded_img, cv2.COLOR_BGR2RGB))
uploaded_img = compress_image(uploaded_img)
# Process images with CLIP
inputs = processor(
images=[ref_img, uploaded_img],
return_tensors="pt",
padding=True
).to(device)
# Get image features
with torch.no_grad():
image_features = model.get_image_features(**inputs)
image_features = image_features / image_features.norm(dim=1, keepdim=True)
# Calculate similarity
similarity = torch.nn.functional.cosine_similarity(
image_features[0].unsqueeze(0),
image_features[1].unsqueeze(0)
).item()
# Calculate difference percentage (inverse of similarity)
diff_percentage = (1 - similarity) * 100
return {
'similarity_score': similarity,
'difference_percentage': diff_percentage,
'is_similar': similarity > 0.85
}
except Exception as e:
logger.error(f"Error in CLIP comparison: {str(e)}")
raise
@cl.on_chat_start
async def start():
"""Initialize the chat session."""
ref_img = load_reference_image()
if ref_img is None:
await cl.Message(
content="Error: Reference planogram image not found. Please ensure planogram.png exists in the project directory."
).send()
return
# Create a welcome message with instructions
welcome_msg = """
# Welcome to Planolyzer! πŸ›οΈ
## Quick Start:
1. Download the test image below
2. Upload it back to see how the system works
3. Try creating your own test image by adding empty spaces to the reference planogram
"""
# Send welcome message
await cl.Message(content=welcome_msg).send()
# Send reference planogram
await cl.Message(
content="## Reference Planogram:",
elements=[
cl.Image(
name="planogram",
path=str(REFERENCE_IMAGE),
display="inline",
size="medium"
),
cl.File(
name="planogram.png",
path=str(REFERENCE_IMAGE),
display="inline"
),
cl.Image(
name="empty_space",
path=str(BASE_DIR / 'data' / 'planogram001' / 'empty-space.png'),
display="inline",
size="small"
),
cl.File(
name="empty-space.png",
path=str(BASE_DIR / 'data' / 'planogram001' / 'empty-space.png'),
display="inline"
)
]
).send()
# Send test image
await cl.Message(
content="## Test Image:",
elements=[
cl.Image(
name="test_shelf",
path=str(BASE_DIR / 'data' / 'test_shelf_image_cig_003.png'),
display="inline",
size="medium"
),
cl.File(
name="test_shelf_image_cig_003.png",
path=str(BASE_DIR / 'data' / 'test_shelf_image_cig_003.png'),
display="inline"
)
]
).send()
# Send additional instructions
await cl.Message(
content="Try downloading and uploading the test image first to see how the system works!"
).send()
@cl.on_message
async def main(message: cl.Message):
"""Handle incoming messages and image uploads."""
logger.info("Received message")
# If there are no elements and no message content, do nothing
if not message.elements and not message.content:
logger.info("No elements or content in message, returning")
return
# If there are elements, process them regardless of message content
if message.elements:
logger.info(f"Processing message with {len(message.elements)} elements")
# Get the uploaded image
uploaded_image = message.elements[0]
logger.info(f"Uploaded image mime type: {uploaded_image.mime}")
if not uploaded_image.mime.startswith('image/'):
logger.warning(f"Invalid mime type: {uploaded_image.mime}")
await cl.Message(
content="Please upload a valid image file."
).send()
return
try:
# Send initial processing message
logger.info("Sending initial processing message")
processing_msg = await cl.Message(
content="πŸ”„ Processing your image... This may take a few moments as we analyze the shelf layout."
).send()
logger.info("Initial processing message sent")
# Convert uploaded image to OpenCV format
img_path = uploaded_image.path
logger.info(f"Reading image from path: {img_path}")
uploaded_img = cv2.imread(img_path)
if uploaded_img is None:
logger.error(f"Failed to read image from path: {img_path}")
await cl.Message(content="❌ Error: Could not read the uploaded image. Please try again with a different image.").send()
return
logger.info(f"Successfully read image, shape: {uploaded_img.shape}")
# Validate uploaded image
try:
validate_image(uploaded_img, "uploaded image")
logger.info("Uploaded image validation successful")
except ValueError as e:
logger.error(f"Invalid uploaded image: {str(e)}")
await cl.Message(content=f"❌ Error: {str(e)}").send()
return
# Load reference image
logger.info("Loading reference image")
ref_img = load_reference_image()
if ref_img is None:
logger.error("Failed to load reference image")
await cl.Message(content="❌ Error: Reference planogram image not found.").send()
return
# Validate reference image
try:
validate_image(ref_img, "reference image")
logger.info("Reference image validation successful")
except ValueError as e:
logger.error(f"Invalid reference image: {str(e)}")
await cl.Message(content=f"❌ Error: {str(e)}").send()
return
# Compare images using CLIP
await cl.Message(content="πŸ”„ Comparing with reference planogram...").send()
logger.info("Starting CLIP comparison")
try:
comparison_result = compare_images(ref_img, uploaded_img)
logger.info(f"CLIP comparison completed with result: {comparison_result}")
except Exception as e:
logger.error(f"CLIP comparison failed: {str(e)}", exc_info=True)
await cl.Message(content="❌ Error during image comparison. Please try again.").send()
return
# Prepare response
if comparison_result['is_similar']:
# Image is accepted, proceed with empty space analysis
await cl.Message(content="βœ… Image accepted! Analyzing empty spaces...").send()
logger.info("Starting empty space analysis")
try:
# Perform empty space analysis
analysis_result = check_empty_spaces(uploaded_img)
logger.info("Empty space analysis completed successfully")
await cl.Message(content=analysis_result).send()
except Exception as e:
logger.error(f"Empty space analysis failed: {str(e)}", exc_info=True)
await cl.Message(content="❌ Error during empty space analysis. Please try again.").send()
return
else:
# Image is rejected
response = f"❌ No go! Image rejected.\n"
response += f"Similarity score: {comparison_result['similarity_score']:.2f}\n"
response += f"Difference percentage: {comparison_result['difference_percentage']:.2f}%\n\n"
response += "Please upload a different image that better matches the reference planogram."
await cl.Message(content=response).send()
except Exception as e:
logger.error(f"Error processing image: {str(e)}", exc_info=True)
await cl.Message(content=f"❌ Error processing image: {str(e)}").send()
else:
# Handle text-only messages
await cl.Message(
content="πŸ‘‹ Hi! I'm here to help you analyze planogram images. Please upload an image to get started. You can use the test image provided above to try out the system!"
).send()
def check_empty_spaces(shelf_img):
try:
logger.info("Starting empty space analysis")
logger.info(f"Shelf image shape: {shelf_img.shape}")
shelf_hsv = cv2.cvtColor(shelf_img, cv2.COLOR_BGR2HSV)
logger.info(f"Converted to HSV, shape: {shelf_hsv.shape}")
report_lines = []
total_spots = len(planogram_data)
empty_spots = 0
debug_lines = [] # For debugging output
for item in planogram_data:
x, y, w, h = item['x'], item['y'], item['width'], item['height']
h_img, w_img = shelf_hsv.shape[:2]
x = max(0, min(x, w_img - 1))
y = max(0, min(y, h_img - 1))
w = min(w, w_img - x)
h = min(h, h_img - y)
logger.info(f"Processing item {item['name']}: x={x}, y={y}, w={w}, h={h}")
if w <= 0 or h <= 0:
logger.warning(f"Invalid dimensions for {item['name']}: x={x}, y={y}, w={w}, h={h}")
continue
shelf_crop = shelf_hsv[y:y+h, x:x+w]
avg_brightness = np.mean(shelf_crop[:, :, 2])
avg_saturation = np.mean(shelf_crop[:, :, 1])
debug_line = f"{item['name']} (SKU {item['sku']}): Brightness={avg_brightness:.1f}, Saturation={avg_saturation:.1f}"
# New rule: low brightness and moderate/low saturation
if avg_brightness < 70 and avg_saturation < 160:
empty_spots += 1
report_lines.append(f"❌ {item['name']} (SKU {item['sku']}) is missing!")
debug_line += " <-- Detected as empty"
debug_lines.append(debug_line)
for line in debug_lines:
logger.info(line)
if not report_lines:
result = "βœ… All spots look filled! Nice work!"
else:
summary = f"\n\nπŸ“Š Summary: {empty_spots} out of {total_spots} spots are empty ({empty_spots/total_spots*100:.1f}%)"
result = "\n".join(report_lines) + summary
logger.info("Empty space analysis completed")
return result
except Exception as e:
logger.error(f"Error in check_empty_spaces: {str(e)}", exc_info=True)
raise