|
|
import gradio as gr |
|
|
from PIL import Image, ImageDraw |
|
|
import io |
|
|
from fpdf import FPDF |
|
|
import tempfile |
|
|
import cv2 |
|
|
import os |
|
|
import torch |
|
|
import ultralytics.nn.tasks as tasks |
|
|
from urllib.request import urlretrieve |
|
|
from ultralytics import YOLO |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
def custom_torch_safe_load(weight): |
|
|
from ultralytics.utils.checks import check_file |
|
|
file = check_file(weight) |
|
|
return torch.load(file, map_location='cpu', weights_only=False), file |
|
|
|
|
|
tasks.torch_safe_load = custom_torch_safe_load |
|
|
|
|
|
|
|
|
model_path = 'model/yolov8n.pt' |
|
|
|
|
|
|
|
|
if not os.path.exists(model_path): |
|
|
os.makedirs('model', exist_ok=True) |
|
|
url = 'https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt' |
|
|
urlretrieve(url, model_path) |
|
|
|
|
|
|
|
|
model = YOLO(model_path) |
|
|
|
|
|
def detect_empty_spots(img_width, img_height, bottle_bboxes, min_gap=50): |
|
|
empty_bboxes = [] |
|
|
x_coords = sorted([bbox[0] for bbox in bottle_bboxes] + [bbox[2] for bbox in bottle_bboxes]) |
|
|
x_coords = [0] + x_coords + [img_width] |
|
|
|
|
|
for i in range(len(x_coords) - 1): |
|
|
gap_start = x_coords[i] |
|
|
gap_end = x_coords[i + 1] |
|
|
if gap_end - gap_start > min_gap: |
|
|
empty_bboxes.append([gap_start, 0, gap_end, img_height]) |
|
|
|
|
|
return empty_bboxes |
|
|
|
|
|
def process_input(input_file): |
|
|
if input_file.lower().endswith(('.mp4', '.avi', '.mov')): |
|
|
cap = cv2.VideoCapture(input_file) |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, total_frames // 2) |
|
|
ret, frame = cap.read() |
|
|
cap.release() |
|
|
if not ret: |
|
|
raise ValueError("Could not read video frame.") |
|
|
temp_frame_path = tempfile.NamedTemporaryFile(suffix='.jpg', delete=False).name |
|
|
cv2.imwrite(temp_frame_path, frame) |
|
|
process_img_path = temp_frame_path |
|
|
else: |
|
|
process_img_path = input_file |
|
|
|
|
|
|
|
|
results = model.predict(process_img_path, conf=0.25, iou=0.45, agnostic_nms=False, max_det=1000) |
|
|
boxes = results[0].boxes |
|
|
class_names = results[0].names |
|
|
|
|
|
num_present = 0 |
|
|
bottle_bboxes = [] |
|
|
for box in boxes: |
|
|
cls_id = int(box.cls.item()) |
|
|
cls_name = class_names[cls_id] |
|
|
if cls_name == 'bottle': |
|
|
num_present += 1 |
|
|
bottle_bboxes.append(box.xyxy[0].cpu().numpy()) |
|
|
|
|
|
img = Image.open(process_img_path) |
|
|
img_width, img_height = img.size |
|
|
draw = ImageDraw.Draw(img) |
|
|
|
|
|
for bbox in bottle_bboxes: |
|
|
draw.rectangle(((bbox[0], bbox[1]), (bbox[2], bbox[3])), outline="green", width=3) |
|
|
|
|
|
empty_bboxes = detect_empty_spots(img_width, img_height, bottle_bboxes) |
|
|
num_empty = len(empty_bboxes) |
|
|
for bbox in empty_bboxes: |
|
|
draw.rectangle(((bbox[0], bbox[1]), (bbox[2], bbox[3])), outline="red", width=3) |
|
|
|
|
|
pdf = FPDF() |
|
|
pdf.add_page() |
|
|
pdf.set_font("Arial", size=12) |
|
|
pdf.cell(200, 10, txt="Wine Shop Inventory Report", ln=1, align='C') |
|
|
pdf.ln(5) |
|
|
pdf.cell(200, 10, txt=f"Number of bottles detected: {num_present}", ln=1) |
|
|
pdf.cell(200, 10, txt=f"Number of empty spots (inferred): {num_empty}", ln=1) |
|
|
pdf.ln(10) |
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.png') as tmp_annotated: |
|
|
img.save(tmp_annotated.name) |
|
|
pdf.image(tmp_annotated.name, x=10, y=pdf.get_y(), w=180) |
|
|
|
|
|
pdf.add_page() |
|
|
pdf.cell(200, 10, txt="Detected Bottles and Empty Spots", ln=1, align='C') |
|
|
pdf.ln(5) |
|
|
y_pos = pdf.get_y() |
|
|
|
|
|
for i, bbox in enumerate(bottle_bboxes): |
|
|
cropped_img = img.crop((bbox[0], bbox[1], bbox[2], bbox[3])) |
|
|
with tempfile.NamedTemporaryFile(suffix='.png') as tmp_crop: |
|
|
cropped_img.save(tmp_crop.name) |
|
|
pdf.image(tmp_crop.name, x=10, y=y_pos, w=90) |
|
|
y_pos += 100 |
|
|
pdf.cell(200, 10, txt=f"Bottle {i+1} (Location: x1={int(bbox[0])}, y1={int(bbox[1])}, x2={int(bbox[2])}, y2={int(bbox[3])})", ln=1) |
|
|
pdf.ln(5) |
|
|
if y_pos > 200: |
|
|
pdf.add_page() |
|
|
y_pos = 10 |
|
|
|
|
|
for i, bbox in enumerate(empty_bboxes): |
|
|
cropped_img = img.crop((bbox[0], bbox[1], bbox[2], bbox[3])) |
|
|
with tempfile.NamedTemporaryFile(suffix='.png') as tmp_crop: |
|
|
cropped_img.save(tmp_crop.name) |
|
|
pdf.image(tmp_crop.name, x=10, y=y_pos, w=90) |
|
|
y_pos += 100 |
|
|
pdf.cell(200, 10, txt=f"Empty Spot {i+1} (Location: x1={int(bbox[0])}, y1={int(bbox[1])}, x2={int(bbox[2])}, y2={int(bbox[3])})", ln=1) |
|
|
pdf.ln(5) |
|
|
if y_pos > 200: |
|
|
pdf.add_page() |
|
|
y_pos = 10 |
|
|
|
|
|
pdf_bytes = io.BytesIO() |
|
|
pdf.output(pdf_bytes) |
|
|
pdf_bytes.seek(0) |
|
|
|
|
|
if 'temp_frame_path' in locals(): |
|
|
os.remove(temp_frame_path) |
|
|
|
|
|
return pdf_bytes.getvalue() |
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown("# Wine Shop CCTV Analyzer") |
|
|
gr.Markdown("Upload a CCTV image or video to analyze stock and generate a PDF report.") |
|
|
input_file = gr.File(label="Upload Image/Video (jpg, png, mp4, etc.)") |
|
|
output_pdf = gr.File(label="Download PDF Report") |
|
|
button = gr.Button("Generate Report") |
|
|
button.click(process_input, inputs=input_file, outputs=output_pdf) |
|
|
|
|
|
demo.launch() |