Spaces:
Sleeping
Sleeping
| import argparse | |
| import time | |
| from pathlib import Path | |
| import os | |
| import cv2 | |
| import torch | |
| import torch.backends.cudnn as cudnn | |
| from numpy import random | |
| import numpy as np | |
| import ffmpeg | |
| import gradio as gr | |
| from fastapi import FastAPI | |
| import uvicorn | |
| import shutil | |
| from models.experimental import attempt_load | |
| from utils.datasets import LoadStreams, LoadImages | |
| from utils.general import check_img_size, check_requirements, check_imshow, non_max_suppression, \ | |
| scale_coords, strip_optimizer, set_logging, increment_path | |
| from utils.plots import plot_one_box | |
| from utils.torch_utils import select_device, time_synchronized, TracedModel | |
| # Function to compute IoU between two boxes | |
| def compute_iou(box1, box2): | |
| x1, y1, x2, y2 = box1 | |
| x1_, y1_, x2_, y2_ = box2 | |
| xi1 = max(x1, x1_) | |
| yi1 = max(y1, y1_) | |
| xi2 = min(x2, x2_) | |
| yi2 = min(y2, y2_) | |
| inter_width = max(0, xi2 - xi1) | |
| inter_height = max(0, yi2 - yi1) | |
| inter_area = inter_width * inter_height | |
| box1_area = (x2 - x1) * (y2 - y1) | |
| box2_area = (x2_ - x1_) * (y2_ - y1_) | |
| union_area = box1_area + box2_area - inter_area | |
| return inter_area / union_area if union_area != 0 else 0.0 | |
| # Function to check if a scanner is moving based on centroid displacement | |
| def is_scanner_moving(prev_centroids, curr_box, scanner_id, threshold=2.0): | |
| x1, y1, x2, y2 = curr_box | |
| curr_centroid = ((x1 + x2) / 2, (y1 + y2) / 2) | |
| if scanner_id in prev_centroids: | |
| prev_x, prev_y = prev_centroids[scanner_id] | |
| distance = np.sqrt((curr_centroid[0] - prev_x)**2 + (curr_centroid[1] - prev_y)**2) | |
| return distance > threshold | |
| return False # Default to "not moving" if no previous centroid exists | |
| # Function to convert video to H.264 format | |
| def convert_to_h264(input_path): | |
| output_path = str(Path(input_path).with_suffix('')) + "_h264.mp4" | |
| try: | |
| stream = ffmpeg.input(input_path) | |
| stream = ffmpeg.output(stream, output_path, vcodec='libx264', acodec='aac', format='mp4', pix_fmt='yuv420p') | |
| ffmpeg.run(stream, cmd='/usr/bin/ffmpeg', overwrite_output=True) | |
| return output_path | |
| except ffmpeg.Error as e: | |
| stderr = e.stderr.decode('utf-8') if e.stderr else "Unknown FFmpeg error" | |
| print(f"FFmpeg error: {stderr}") | |
| return input_path | |
| # Detection function adapted from the second script | |
| def detect_video(video_path, weights, conf_thres=0.25, iou_thres=0.45, img_size=640, device='', save_dir='runs/detect/exp', trace=False): | |
| save_dir = Path(increment_path(Path(save_dir), exist_ok=True)) | |
| save_dir.mkdir(parents=True, exist_ok=True) | |
| # Initialize | |
| set_logging() | |
| device = select_device(device) | |
| half = device.type != 'cpu' | |
| # Load model | |
| model = attempt_load(weights, map_location=device) | |
| stride = int(model.stride.max()) | |
| imgsz = check_img_size(img_size, s=stride) | |
| if trace: | |
| model = TracedModel(model, device, img_size) | |
| if half: | |
| model.half() | |
| # Set Dataloader | |
| dataset = LoadImages(video_path, img_size=imgsz, stride=stride) | |
| # Get names and colors | |
| names = model.module.names if hasattr(model, 'module') else model.names | |
| colors = [[random.randint(0, 255) for _ in range(3)] for _ in names] | |
| # Initialize variables | |
| vid_path, vid_writer = None, None | |
| prev_centroids = {} | |
| scanner_id_counter = 0 | |
| product_scanning_status_global = "" | |
| payment_scanning_status_global = "" | |
| old_img_b, old_img_h, old_img_w = 0, 0, 0 | |
| for path, img, im0s, vid_cap in dataset: | |
| img = torch.from_numpy(img).to(device) | |
| img = img.half() if half else img.float() | |
| img /= 255.0 | |
| if img.ndimension() == 3: | |
| img = img.unsqueeze(0) | |
| # Warmup | |
| if device.type != 'cpu' and (old_img_b != img.shape[0] or old_img_h != img.shape[2] or old_img_w != img.shape[3]): | |
| old_img_b = img.shape[0] | |
| old_img_h = img.shape[2] | |
| old_img_w = img.shape[3] | |
| for _ in range(3): | |
| model(img)[0] | |
| # Inference | |
| with torch.no_grad(): | |
| pred = model(img, augment=False)[0] | |
| # Apply NMS | |
| pred = non_max_suppression(pred, conf_thres, iou_thres) | |
| # Process detections | |
| for i, det in enumerate(pred): | |
| p = Path(path) | |
| save_path = str(save_dir / p.name.replace('.mp4', '_output.mp4')) | |
| im0 = im0s | |
| if len(det): | |
| det[:, :4] = scale_coords(img.shape[2:], det[:, :4], im0.shape).round() | |
| item_boxes, scanner_data, phone_boxes = [], [], [] | |
| curr_scanner_boxes = [] | |
| # Process each detection | |
| for *xyxy, conf, cls in det: | |
| x1, y1, x2, y2 = map(int, xyxy) | |
| class_name = names[int(cls)] | |
| color = colors[int(cls)] | |
| if class_name.lower() == "item": | |
| item_boxes.append([x1, y1, x2, y2]) | |
| elif class_name.lower() == "phone": | |
| phone_boxes.append([x1, y1, x2, y2]) | |
| elif class_name.lower() == "scanner": | |
| curr_scanner_boxes.append([x1, y1, x2, y2]) | |
| plot_one_box(xyxy, im0, label=class_name, color=color, line_thickness=2) | |
| # Match scanner boxes with previous frames | |
| new_prev_centroids = {} | |
| if prev_centroids and curr_scanner_boxes: | |
| for curr_box in curr_scanner_boxes: | |
| curr_centroid = ((curr_box[0] + curr_box[2]) / 2, (curr_box[1] + curr_box[3]) / 2) | |
| best_match_id = min(prev_centroids.keys(), | |
| key=lambda k: np.sqrt((curr_centroid[0] - prev_centroids[k][0])**2 + | |
| (curr_centroid[1] - prev_centroids[k][1])**2), | |
| default=None) | |
| if best_match_id is not None: | |
| distance = np.sqrt((curr_centroid[0] - prev_centroids[best_match_id][0])**2 + | |
| (curr_centroid[1] - prev_centroids[best_match_id][1])**2) | |
| if distance < 50: | |
| scanner_id = best_match_id | |
| else: | |
| scanner_id = scanner_id_counter | |
| scanner_id_counter += 1 | |
| else: | |
| scanner_id = scanner_id_counter | |
| scanner_id_counter += 1 | |
| is_moving = is_scanner_moving(prev_centroids, curr_box, scanner_id, threshold=2.0) | |
| movement_status = "Scanning" if is_moving else "Idle" | |
| scanner_data.append([curr_box, movement_status, scanner_id]) | |
| new_prev_centroids[scanner_id] = curr_centroid | |
| elif curr_scanner_boxes: | |
| for curr_box in curr_scanner_boxes: | |
| scanner_id = scanner_id_counter | |
| scanner_id_counter += 1 | |
| movement_status = "Idle" | |
| curr_centroid = ((curr_box[0] + curr_box[2]) / 2, (curr_box[1] + curr_box[3]) / 2) | |
| scanner_data.append([curr_box, movement_status, scanner_id]) | |
| new_prev_centroids[scanner_id] = curr_centroid | |
| prev_centroids = new_prev_centroids | |
| # Redraw scanner boxes with movement status | |
| for scanner_box, movement_status, scanner_id in scanner_data: | |
| x1, y1, x2, y2 = scanner_box | |
| label = f"scanner {movement_status} (ID: {scanner_id})" | |
| plot_one_box([x1, y1, x2, y2], im0, label=label, color=colors[names.index("scanner")], line_thickness=2) | |
| # Check for overlaps only if scanning status hasn't been set | |
| if not product_scanning_status_global: | |
| for item_box in item_boxes: | |
| iou = compute_iou(scanner_box, item_box) | |
| if movement_status == "Scanning" and iou > 0.02: | |
| product_scanning_status_global = "Product scanning is finished" | |
| print(f"Product scanning finished at frame {i}") | |
| if not payment_scanning_status_global: | |
| for phone_box in phone_boxes: | |
| iou = compute_iou(scanner_box, phone_box) | |
| if movement_status == "Scanning" and iou > 0.02: | |
| payment_scanning_status_global = "Payment scanning is finished" | |
| print(f"Payment scanning finished at frame {i}") | |
| # Display persistent labels | |
| if product_scanning_status_global: | |
| cv2.putText(im0, product_scanning_status_global, (10, 30), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.9, colors[names.index("scanner")], 2) | |
| if payment_scanning_status_global: | |
| cv2.putText(im0, payment_scanning_status_global, (10, 60), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.9, colors[names.index("scanner")], 2) | |
| # Write frame to video | |
| if vid_path != save_path: | |
| vid_path = save_path | |
| if isinstance(vid_writer, cv2.VideoWriter): | |
| vid_writer.release() | |
| fps = vid_cap.get(cv2.CAP_PROP_FPS) if vid_cap else 30 | |
| w, h = im0.shape[1], im0.shape[0] | |
| vid_writer = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)) | |
| vid_writer.write(im0) | |
| # Cleanup | |
| if isinstance(vid_writer, cv2.VideoWriter): | |
| vid_writer.release() | |
| # Convert to H.264 | |
| output_h264 = str(Path(save_path).with_name(f"{Path(save_path).stem}_h264.mp4")) | |
| try: | |
| stream = ffmpeg.input(save_path) | |
| stream = ffmpeg.output(stream, output_h264, vcodec='libx264', acodec='aac', format='mp4', pix_fmt='yuv420p') | |
| ffmpeg.run(stream, cmd='/usr/bin/ffmpeg', overwrite_output=True) | |
| os.remove(save_path) | |
| return output_h264 | |
| except ffmpeg.Error as e: | |
| stderr = e.stderr.decode('utf-8') if e.stderr else "Unknown FFmpeg error" | |
| print(f"FFmpeg error: {stderr}") | |
| return save_path | |
| # Gradio interface | |
| def gradio_interface(video, conf_thres, iou_thres): | |
| weights = "/home/myominhtet/Desktop/deepsortfromscratch/yolov7/best.pt" | |
| img_size = 640 | |
| # Create a stable directory for video files | |
| stable_dir = "/home/myominhtet/Desktop/deepsortfromscratch/videos" | |
| os.makedirs(stable_dir, exist_ok=True) | |
| # Copy the uploaded video to a stable path | |
| stable_path = os.path.join(stable_dir, f"input_{Path(video).name}") | |
| shutil.copy(video, stable_path) | |
| print(f"Copied video to: {stable_path}") | |
| # Verify the copied file | |
| print(f"Stable path exists: {os.path.exists(stable_path)}") | |
| print(f"Stable path readable: {os.access(stable_path, os.R_OK)}") | |
| video = convert_to_h264(stable_path) | |
| output_video = detect_video(video, weights, conf_thres, iou_thres, img_size) | |
| return output_video if output_video else "Error processing video." | |
| # Set up Gradio interface | |
| interface = gr.Interface( | |
| fn=gradio_interface, | |
| inputs=[ | |
| gr.Video(label="Upload Video"), | |
| gr.Slider(0, 1, value=0.25, step=0.05, label="Confidence Threshold"), | |
| gr.Slider(0, 1, value=0.45, step=0.05, label="IoU Threshold"), | |
| ], | |
| outputs=gr.Video(label="Processed Video"), | |
| title="Retail Shop Monitoring", | |
| # description="Upload a video to check " | |
| ) | |
| # Set up FastAPI app | |
| app = FastAPI() | |
| app = gr.mount_gradio_app(app, interface, path="/") | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |