bug-detector / app.py
MuhammadTayyab0143's picture
Replace project with Retail Product Detection
bee67a2
Raw
History Blame Contribute Delete
8.89 kB
import gradio as gr
import cv2
import easyocr
import numpy as np
from rfdetr import RFDETRSmall
import tempfile
import supervision as sv
import pandas as pd
# Load models globally so they're only initialized once
model = RFDETRSmall(pretrain_weights="rfdetr_small_best.pth")
reader = easyocr.Reader(['en', 'es'], gpu=True)
CLASSES = {
0: "Diana Product",
1: "Gallo Product",
2: "Raptor bottel",
3: "Tortrix Product",
4: "cocacola pepsi",
5: "laky ice cream"
}
PRODUCT_KEYWORDS = {
"diana": "Diana Product",
"gallo": "Gallo Product",
"raptor": "Raptor bottel",
"tortrix": "Tortrix Product",
"coca": "cocacola pepsi",
"pepsi": "cocacola pepsi",
"laky": "laky ice cream",
}
box_annotator = sv.BoxAnnotator()
label_annotator = sv.LabelAnnotator()
def calculate_iou(boxA, boxB):
xA = max(boxA[0], boxB[0])
yA = max(boxA[1], boxB[1])
xB = min(boxA[2], boxB[2])
yB = min(boxA[3], boxB[3])
interArea = max(0, xB - xA) * max(0, yB - yA)
boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])
iou = interArea / float(boxAArea + boxBArea - interArea)
return iou
def process_frame(frame, conf_threshold):
"""Processes a single BGR frame, applying confidence, area, and OCR logic."""
height, width, _ = frame.shape
total_area = height * width
# Predict using RF-DETR
detections = model.predict(frame)
# Apply confidence filter
detections = detections[detections.confidence >= conf_threshold]
# Area filter (>30% rejection)
valid_indices = []
for i, bbox in enumerate(detections.xyxy):
x1, y1, x2, y2 = bbox
area = (x2 - x1) * (y2 - y1)
if area <= 0.30 * total_area:
valid_indices.append(i)
detections = detections[valid_indices]
# Resolve conflicting classes via EasyOCR
final_class_ids = detections.class_id.copy()
for i in range(len(detections)):
for j in range(i + 1, len(detections)):
if detections.class_id[i] == detections.class_id[j]:
continue
iou = calculate_iou(detections.xyxy[i], detections.xyxy[j])
if iou > 0.7:
# OCR on union crop
boxA = detections.xyxy[i]
boxB = detections.xyxy[j]
crop_x1 = int(min(boxA[0], boxB[0]))
crop_y1 = int(min(boxA[1], boxB[1]))
crop_x2 = int(max(boxA[2], boxB[2]))
crop_y2 = int(max(boxA[3], boxB[3]))
cropped_img = frame[max(0, crop_y1):min(height, crop_y2), max(0, crop_x1):min(width, crop_x2)]
# Try OCR if crop is valid
matched_class = None
if cropped_img.size > 0:
ocr_results = reader.readtext(cropped_img)
ocr_text = " ".join([res[1].lower() for res in ocr_results])
for keyword, product in PRODUCT_KEYWORDS.items():
if keyword in ocr_text:
matched_class = product
break
if matched_class:
# Match OCR text to class ID
class_id_matched = next((k for k, v in CLASSES.items() if v == matched_class), None)
if class_id_matched is not None:
final_class_ids[i] = class_id_matched
final_class_ids[j] = class_id_matched
else:
# Fallback: keep the one with higher confidence
if detections.confidence[i] > detections.confidence[j]:
final_class_ids[j] = final_class_ids[i]
else:
final_class_ids[i] = final_class_ids[j]
detections.class_id = final_class_ids
# Annotate frame
labels = [f"{CLASSES.get(class_id, 'Unknown')} {conf:.2f}" for class_id, conf in zip(detections.class_id, detections.confidence)]
annotated_frame = box_annotator.annotate(scene=frame.copy(), detections=detections)
annotated_frame = label_annotator.annotate(scene=annotated_frame, detections=detections, labels=labels)
# Generate summary
counts = {}
for class_id in detections.class_id:
name = CLASSES.get(class_id, "Unknown")
counts[name] = counts.get(name, 0) + 1
return annotated_frame, counts
def process_image(image, conf_threshold):
if image is None:
return None, pd.DataFrame(columns=["Class Name", "Count"])
bgr_image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
annotated_bgr, counts = process_frame(bgr_image, conf_threshold)
annotated_rgb = cv2.cvtColor(annotated_bgr, cv2.COLOR_BGR2RGB)
summary_data = [{"Class Name": name, "Count": count} for name, count in counts.items()]
df = pd.DataFrame(summary_data)
if df.empty:
df = pd.DataFrame(columns=["Class Name", "Count"])
return annotated_rgb, df
def process_video(video_path, conf_threshold):
if not video_path:
return None, pd.DataFrame(columns=["Class Name", "Count"])
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frame_interval = 15
out_fps = fps / frame_interval if fps > 0 else 2.0
temp_out = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
temp_out_path = temp_out.name
temp_out.close()
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(temp_out_path, fourcc, out_fps, (width, height))
frame_count = 0
max_counts = {}
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
annotated_frame, counts = process_frame(frame, conf_threshold)
out.write(annotated_frame)
# Keep track of the maximum count of each item seen simultaneously in any frame
for name, count in counts.items():
if count > max_counts.get(name, 0):
max_counts[name] = count
frame_count += 1
cap.release()
out.release()
summary_data = [{"Class Name": name, "Max Count (per frame)": count} for name, count in max_counts.items()]
df = pd.DataFrame(summary_data)
if df.empty:
df = pd.DataFrame(columns=["Class Name", "Max Count (per frame)"])
return temp_out_path, df
# Gradio Interface
theme = gr.themes.Soft(
primary_hue="blue",
secondary_hue="slate",
neutral_hue="slate",
).set(
body_background_fill="*neutral_950",
body_text_color="*neutral_100",
block_background_fill="*neutral_900",
block_label_text_color="*neutral_200",
)
with gr.Blocks(theme=theme) as app:
gr.Markdown("# 🛒 Retail Product Detection System — Demo")
gr.Markdown("### please upload product images/videos")
with gr.Tab("Image Detection"):
with gr.Row():
with gr.Column():
image_input = gr.Image(type="numpy", label="Upload Product Image")
img_conf_slider = gr.Slider(minimum=0.0, maximum=1.0, value=0.5, step=0.05, label="Confidence Threshold")
img_submit_btn = gr.Button("Detect Products", variant="primary")
with gr.Column():
image_output = gr.Image(type="numpy", label="Annotated Output")
img_summary_table = gr.Dataframe(headers=["Class Name", "Count"], label="Detection Summary")
img_submit_btn.click(
fn=process_image,
inputs=[image_input, img_conf_slider],
outputs=[image_output, img_summary_table]
)
with gr.Tab("Video Detection"):
with gr.Row():
with gr.Column():
video_input = gr.Video(label="Upload Counter Video")
vid_conf_slider = gr.Slider(minimum=0.0, maximum=1.0, value=0.5, step=0.05, label="Confidence Threshold")
vid_submit_btn = gr.Button("Detect Products in Video", variant="primary")
with gr.Column():
video_output = gr.Video(label="Annotated Output (15th frame intervals)")
vid_summary_table = gr.Dataframe(headers=["Class Name", "Max Count (per frame)"], label="Detection Summary")
vid_submit_btn.click(
fn=process_video,
inputs=[video_input, vid_conf_slider],
outputs=[video_output, vid_summary_table]
)
if __name__ == "__main__":
app.launch()