newcustomyolo / interfacetest2.py
Myogyi's picture
Update interfacetest2.py
f6cf90e
import argparse
import time
from pathlib import Path
import os
import cv2
import torch
import torch.backends.cudnn as cudnn
from numpy import random
import numpy as np
import ffmpeg
import gradio as gr
from fastapi import FastAPI
import uvicorn
import shutil
from models.experimental import attempt_load
from utils.datasets import LoadStreams, LoadImages
from utils.general import check_img_size, check_requirements, check_imshow, non_max_suppression, \
scale_coords, strip_optimizer, set_logging, increment_path
from utils.plots import plot_one_box
from utils.torch_utils import select_device, time_synchronized, TracedModel
# Function to compute IoU between two boxes
def compute_iou(box1, box2):
x1, y1, x2, y2 = box1
x1_, y1_, x2_, y2_ = box2
xi1 = max(x1, x1_)
yi1 = max(y1, y1_)
xi2 = min(x2, x2_)
yi2 = min(y2, y2_)
inter_width = max(0, xi2 - xi1)
inter_height = max(0, yi2 - yi1)
inter_area = inter_width * inter_height
box1_area = (x2 - x1) * (y2 - y1)
box2_area = (x2_ - x1_) * (y2_ - y1_)
union_area = box1_area + box2_area - inter_area
return inter_area / union_area if union_area != 0 else 0.0
# Function to check if a scanner is moving based on centroid displacement
def is_scanner_moving(prev_centroids, curr_box, scanner_id, threshold=2.0):
x1, y1, x2, y2 = curr_box
curr_centroid = ((x1 + x2) / 2, (y1 + y2) / 2)
if scanner_id in prev_centroids:
prev_x, prev_y = prev_centroids[scanner_id]
distance = np.sqrt((curr_centroid[0] - prev_x)**2 + (curr_centroid[1] - prev_y)**2)
return distance > threshold
return False # Default to "not moving" if no previous centroid exists
# Function to convert video to H.264 format
def convert_to_h264(input_path):
output_path = str(Path(input_path).with_suffix('')) + "_h264.mp4"
try:
stream = ffmpeg.input(input_path)
stream = ffmpeg.output(stream, output_path, vcodec='libx264', acodec='aac', format='mp4', pix_fmt='yuv420p')
ffmpeg.run(stream, cmd='/usr/bin/ffmpeg', overwrite_output=True)
return output_path
except ffmpeg.Error as e:
stderr = e.stderr.decode('utf-8') if e.stderr else "Unknown FFmpeg error"
print(f"FFmpeg error: {stderr}")
return input_path
# Detection function adapted from the second script
def detect_video(video_path, weights, conf_thres=0.25, iou_thres=0.45, img_size=640, device='', save_dir='runs/detect/exp', trace=False):
save_dir = Path(increment_path(Path(save_dir), exist_ok=True))
save_dir.mkdir(parents=True, exist_ok=True)
# Initialize
set_logging()
device = select_device(device)
half = device.type != 'cpu'
# Load model
model = attempt_load(weights, map_location=device)
stride = int(model.stride.max())
imgsz = check_img_size(img_size, s=stride)
if trace:
model = TracedModel(model, device, img_size)
if half:
model.half()
# Set Dataloader
dataset = LoadImages(video_path, img_size=imgsz, stride=stride)
# Get names and colors
names = model.module.names if hasattr(model, 'module') else model.names
colors = [[random.randint(0, 255) for _ in range(3)] for _ in names]
# Initialize variables
vid_path, vid_writer = None, None
prev_centroids = {}
scanner_id_counter = 0
product_scanning_status_global = ""
payment_scanning_status_global = ""
old_img_b, old_img_h, old_img_w = 0, 0, 0
for path, img, im0s, vid_cap in dataset:
img = torch.from_numpy(img).to(device)
img = img.half() if half else img.float()
img /= 255.0
if img.ndimension() == 3:
img = img.unsqueeze(0)
# Warmup
if device.type != 'cpu' and (old_img_b != img.shape[0] or old_img_h != img.shape[2] or old_img_w != img.shape[3]):
old_img_b = img.shape[0]
old_img_h = img.shape[2]
old_img_w = img.shape[3]
for _ in range(3):
model(img)[0]
# Inference
with torch.no_grad():
pred = model(img, augment=False)[0]
# Apply NMS
pred = non_max_suppression(pred, conf_thres, iou_thres)
# Process detections
for i, det in enumerate(pred):
p = Path(path)
save_path = str(save_dir / p.name.replace('.mp4', '_output.mp4'))
im0 = im0s
if len(det):
det[:, :4] = scale_coords(img.shape[2:], det[:, :4], im0.shape).round()
item_boxes, scanner_data, phone_boxes = [], [], []
curr_scanner_boxes = []
# Process each detection
for *xyxy, conf, cls in det:
x1, y1, x2, y2 = map(int, xyxy)
class_name = names[int(cls)]
color = colors[int(cls)]
if class_name.lower() == "item":
item_boxes.append([x1, y1, x2, y2])
elif class_name.lower() == "phone":
phone_boxes.append([x1, y1, x2, y2])
elif class_name.lower() == "scanner":
curr_scanner_boxes.append([x1, y1, x2, y2])
plot_one_box(xyxy, im0, label=class_name, color=color, line_thickness=2)
# Match scanner boxes with previous frames
new_prev_centroids = {}
if prev_centroids and curr_scanner_boxes:
for curr_box in curr_scanner_boxes:
curr_centroid = ((curr_box[0] + curr_box[2]) / 2, (curr_box[1] + curr_box[3]) / 2)
best_match_id = min(prev_centroids.keys(),
key=lambda k: np.sqrt((curr_centroid[0] - prev_centroids[k][0])**2 +
(curr_centroid[1] - prev_centroids[k][1])**2),
default=None)
if best_match_id is not None:
distance = np.sqrt((curr_centroid[0] - prev_centroids[best_match_id][0])**2 +
(curr_centroid[1] - prev_centroids[best_match_id][1])**2)
if distance < 50:
scanner_id = best_match_id
else:
scanner_id = scanner_id_counter
scanner_id_counter += 1
else:
scanner_id = scanner_id_counter
scanner_id_counter += 1
is_moving = is_scanner_moving(prev_centroids, curr_box, scanner_id, threshold=2.0)
movement_status = "Scanning" if is_moving else "Idle"
scanner_data.append([curr_box, movement_status, scanner_id])
new_prev_centroids[scanner_id] = curr_centroid
elif curr_scanner_boxes:
for curr_box in curr_scanner_boxes:
scanner_id = scanner_id_counter
scanner_id_counter += 1
movement_status = "Idle"
curr_centroid = ((curr_box[0] + curr_box[2]) / 2, (curr_box[1] + curr_box[3]) / 2)
scanner_data.append([curr_box, movement_status, scanner_id])
new_prev_centroids[scanner_id] = curr_centroid
prev_centroids = new_prev_centroids
# Redraw scanner boxes with movement status
for scanner_box, movement_status, scanner_id in scanner_data:
x1, y1, x2, y2 = scanner_box
label = f"scanner {movement_status} (ID: {scanner_id})"
plot_one_box([x1, y1, x2, y2], im0, label=label, color=colors[names.index("scanner")], line_thickness=2)
# Check for overlaps only if scanning status hasn't been set
if not product_scanning_status_global:
for item_box in item_boxes:
iou = compute_iou(scanner_box, item_box)
if movement_status == "Scanning" and iou > 0.02:
product_scanning_status_global = "Product scanning is finished"
print(f"Product scanning finished at frame {i}")
if not payment_scanning_status_global:
for phone_box in phone_boxes:
iou = compute_iou(scanner_box, phone_box)
if movement_status == "Scanning" and iou > 0.02:
payment_scanning_status_global = "Payment scanning is finished"
print(f"Payment scanning finished at frame {i}")
# Display persistent labels
if product_scanning_status_global:
cv2.putText(im0, product_scanning_status_global, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, colors[names.index("scanner")], 2)
if payment_scanning_status_global:
cv2.putText(im0, payment_scanning_status_global, (10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, colors[names.index("scanner")], 2)
# Write frame to video
if vid_path != save_path:
vid_path = save_path
if isinstance(vid_writer, cv2.VideoWriter):
vid_writer.release()
fps = vid_cap.get(cv2.CAP_PROP_FPS) if vid_cap else 30
w, h = im0.shape[1], im0.shape[0]
vid_writer = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
vid_writer.write(im0)
# Cleanup
if isinstance(vid_writer, cv2.VideoWriter):
vid_writer.release()
# Convert to H.264
output_h264 = str(Path(save_path).with_name(f"{Path(save_path).stem}_h264.mp4"))
try:
stream = ffmpeg.input(save_path)
stream = ffmpeg.output(stream, output_h264, vcodec='libx264', acodec='aac', format='mp4', pix_fmt='yuv420p')
ffmpeg.run(stream, cmd='/usr/bin/ffmpeg', overwrite_output=True)
os.remove(save_path)
return output_h264
except ffmpeg.Error as e:
stderr = e.stderr.decode('utf-8') if e.stderr else "Unknown FFmpeg error"
print(f"FFmpeg error: {stderr}")
return save_path
# Gradio interface
def gradio_interface(video, conf_thres, iou_thres):
weights = "/home/myominhtet/Desktop/deepsortfromscratch/yolov7/best.pt"
img_size = 640
# Create a stable directory for video files
stable_dir = "/home/myominhtet/Desktop/deepsortfromscratch/videos"
os.makedirs(stable_dir, exist_ok=True)
# Copy the uploaded video to a stable path
stable_path = os.path.join(stable_dir, f"input_{Path(video).name}")
shutil.copy(video, stable_path)
print(f"Copied video to: {stable_path}")
# Verify the copied file
print(f"Stable path exists: {os.path.exists(stable_path)}")
print(f"Stable path readable: {os.access(stable_path, os.R_OK)}")
video = convert_to_h264(stable_path)
output_video = detect_video(video, weights, conf_thres, iou_thres, img_size)
return output_video if output_video else "Error processing video."
# Set up Gradio interface
interface = gr.Interface(
fn=gradio_interface,
inputs=[
gr.Video(label="Upload Video"),
gr.Slider(0, 1, value=0.25, step=0.05, label="Confidence Threshold"),
gr.Slider(0, 1, value=0.45, step=0.05, label="IoU Threshold"),
],
outputs=gr.Video(label="Processed Video"),
title="Retail Shop Monitoring",
# description="Upload a video to check "
)
# Set up FastAPI app
app = FastAPI()
app = gr.mount_gradio_app(app, interface, path="/")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)