Spaces:
Running
Running
| # import os | |
| # import cv2 | |
| # import torch | |
| # import numpy as np | |
| # import uuid | |
| # import threading | |
| # import gradio as gr | |
| # from fastapi import FastAPI, UploadFile, File, BackgroundTasks, HTTPException | |
| # from fastapi.responses import FileResponse | |
| # from collections import deque | |
| # from pytorchvideo.models.hub import slowfast_r50 | |
| # from ultralytics import YOLO | |
| # import torch.nn as nn | |
| # # --- SETUP & DIRECTORIES --- | |
| # UPLOAD_DIR = "uploads" | |
| # OUTPUT_DIR = "outputs" | |
| # MODEL_PATH = "models/best_slowfast_theft.pth" | |
| # os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| # os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| # JOBS = {} # Track progress | |
| # # --- MODEL LOADING --- | |
| # DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| # yolo = YOLO("yolov8n.pt") | |
| # def load_slowfast(): | |
| # model = slowfast_r50(pretrained=False) | |
| # in_features = model.blocks[-1].proj.in_features | |
| # model.blocks[-1].proj = nn.Sequential( | |
| # nn.Dropout(p=0.5), | |
| # nn.Linear(in_features, 2) | |
| # ) | |
| # if os.path.exists(MODEL_PATH): | |
| # ckpt = torch.load(MODEL_PATH, map_location=DEVICE) | |
| # model.load_state_dict(ckpt["model"] if "model" in ckpt else ckpt) | |
| # model.to(DEVICE).eval() | |
| # return model | |
| # detector_model = load_slowfast() | |
| # # --- DETECTION LOGIC --- | |
| # def process_video_logic(job_id, input_path, output_path): | |
| # cap = cv2.VideoCapture(input_path) | |
| # total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # fps = int(cap.get(cv2.CAP_PROP_FPS)) | |
| # w, h = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| # out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h)) | |
| # frame_buffer = deque(maxlen=32) | |
| # curr = 0 | |
| # while cap.isOpened(): | |
| # ret, frame = cap.read() | |
| # if not ret: break | |
| # curr += 1 | |
| # JOBS[job_id]["progress"] = int((curr/total_frames)*100) | |
| # # Basic YOLO logic (Simplified for speed) | |
| # results = yolo(frame, verbose=False) | |
| # for r in results: | |
| # for box in r.boxes: | |
| # if int(box.cls[0]) == 0: | |
| # x1, y1, x2, y2 = map(int, box.xyxy[0]) | |
| # cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) | |
| # out.write(frame) | |
| # cap.release() | |
| # out.release() | |
| # JOBS[job_id]["status"] = "completed" | |
| # # --- FASTAPI ENDPOINTS --- | |
| # app = FastAPI() | |
| # @app.post("/api/detect") | |
| # async def api_detect(background_tasks: BackgroundTasks, file: UploadFile = File(...)): | |
| # job_id = str(uuid.uuid4()) | |
| # in_p = os.path.join(UPLOAD_DIR, f"{job_id}.mp4") | |
| # out_p = os.path.join(OUTPUT_DIR, f"{job_id}.mp4") | |
| # with open(in_p, "wb") as f: f.write(await file.read()) | |
| # JOBS[job_id] = {"progress": 0, "status": "processing", "file": out_p} | |
| # background_tasks.add_task(process_video_logic, job_id, in_p, out_p) | |
| # return {"job_id": job_id} | |
| # @app.get("/api/progress/{job_id}") | |
| # async def api_progress(job_id: str): | |
| # return JOBS.get(job_id, {"error": "not found"}) | |
| # # --- GRADIO FRONTEND --- | |
| # def web_ui_process(video_input): | |
| # if video_input is None: return None | |
| # job_id = str(uuid.uuid4()) | |
| # out_p = os.path.join(OUTPUT_DIR, f"{job_id}.mp4") | |
| # # Run the processing (Sync for Gradio UI to show progress) | |
| # JOBS[job_id] = {"progress": 0, "status": "processing"} | |
| # process_video_logic(job_id, video_input, out_p) | |
| # return out_p | |
| # with gr.Blocks(title="Theft Detection System") as demo: | |
| # gr.Markdown("# 🛡️ AI Theft Detection System") | |
| # with gr.Row(): | |
| # video_in = gr.Video(label="Upload Video") | |
| # video_out = gr.Video(label="Processed Result") | |
| # btn = gr.Button("Detect Theft") | |
| # btn.click(web_ui_process, inputs=video_in, outputs=video_out) | |
| # # --- MOUNT FASTAPI TO GRADIO --- | |
| # # This allows both to run on the same port on Hugging Face | |
| # app = gr.mount_gradio_app(app, demo, path="/") | |
| # if __name__ == "__main__": | |
| # import uvicorn | |
| # uvicorn.run(app, host="0.0.0.0", port=7860) | |
| import os | |
| import cv2 | |
| import torch | |
| import numpy as np | |
| import uuid | |
| import torch.nn as nn | |
| import gradio as gr | |
| from collections import deque | |
| from pytorchvideo.models.hub import slowfast_r50 | |
| from ultralytics import YOLO | |
| # --- CONFIG & DIRECTORIES --- | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| MODEL_PATH = "models/best_slowfast_theft.pth" | |
| os.makedirs("uploads", exist_ok=True) | |
| os.makedirs("outputs", exist_ok=True) | |
| os.makedirs("models", exist_ok=True) | |
| # --- HEATMAP CLASS --- | |
| class Heatmap: | |
| def __init__(self, h, w, decay=0.92): | |
| self.m = np.zeros((h, w), np.float32) | |
| self.decay = decay | |
| self.h, self.w = h, w | |
| def add(self, bbox, intensity=1.0, poly_mask=None): | |
| x1, y1, x2, y2 = [int(v) for v in bbox] | |
| x1, y1 = max(0, x1), max(0, y1) | |
| x2, y2 = min(self.w-1, x2), min(self.h-1, y2) | |
| if x2 <= x1 or y2 <= y1: return | |
| cx, cy = (x1 + x2) // 2, (y1 + y2) // 2 | |
| rx, ry = max(1, (x2 - x1) // 2), max(1, (y2 - y1) // 2) | |
| blob = np.zeros((self.h, self.w), np.float32) | |
| cv2.ellipse(blob, (cx, cy), (rx, ry), 0, 0, 360, intensity, -1) | |
| blob = cv2.GaussianBlur(blob, (0, 0), rx * 0.6, sigmaY=ry * 0.6) | |
| if poly_mask is not None: blob *= poly_mask | |
| self.m = np.clip(self.m + blob, 0, 10.0) | |
| def step(self): self.m *= self.decay | |
| def overlay(self, frame, alpha=0.45, poly_mask=None): | |
| norm = np.clip(self.m / 10.0, 0, 1) | |
| coloured = cv2.applyColorMap((norm * 255).astype(np.uint8), cv2.COLORMAP_JET) | |
| mask3 = np.stack([(norm > 0.05).astype(np.float32)] * 3, -1) | |
| if poly_mask is not None: mask3 *= np.stack([poly_mask] * 3, -1) | |
| return (coloured * mask3 * alpha + frame * (1 - mask3 * alpha)).astype(np.uint8) | |
| # --- LOAD MODELS --- | |
| print("Loading Models...") | |
| yolo = YOLO("yolov8n.pt") | |
| sf_model = slowfast_r50(pretrained=False) | |
| sf_model.blocks[-1].proj = nn.Sequential(nn.Dropout(p=0.5), nn.Linear(sf_model.blocks[-1].proj.in_features, 2)) | |
| if os.path.exists(MODEL_PATH): | |
| ckpt = torch.load(MODEL_PATH, map_location=DEVICE) | |
| sf_model.load_state_dict(ckpt["model"] if "model" in ckpt else ckpt) | |
| sf_model.to(DEVICE).eval() | |
| # --- CORE LOGIC --- | |
| def process_video(video_path, roi_image): | |
| if not video_path: return None | |
| cap = cv2.VideoCapture(video_path) | |
| w, h = int(cap.get(3)), int(cap.get(4)) | |
| fps = int(cap.get(5)) | |
| output_path = f"outputs/out_{uuid.uuid4()}.mp4" | |
| out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)) | |
| heatmap = Heatmap(h, w) | |
| # Process ROI Mask from Sketch | |
| poly_mask = None | |
| if roi_image is not None and 'layers' in roi_image: | |
| # Use the sketch layer to create a mask | |
| mask_layer = roi_image['layers'][0] | |
| mask_layer = cv2.resize(mask_layer, (w, h)) | |
| gray = cv2.cvtColor(mask_layer, cv2.COLOR_BGR2GRAY) | |
| _, poly_mask = cv2.threshold(gray, 10, 1.0, cv2.THRESH_BINARY) | |
| poly_mask = poly_mask.astype(np.float32) | |
| person_buffers = {} | |
| prediction_buffers = {} | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: break | |
| heatmap.step() | |
| results = yolo.track(frame, persist=True, verbose=False, classes=[0]) | |
| global_theft = False | |
| if results[0].boxes.id is not None: | |
| boxes = results[0].boxes.xyxy.cpu().numpy() | |
| ids = results[0].boxes.id.cpu().numpy().astype(int) | |
| for box, track_id in zip(boxes, ids): | |
| x1, y1, x2, y2 = map(int, box) | |
| cx, cy = (x1 + x2) // 2, (y1 + y2) // 2 | |
| # ROI Check | |
| if poly_mask is not None and poly_mask[cy, cx] == 0: continue | |
| heatmap.add(box, poly_mask=poly_mask) | |
| if track_id not in person_buffers: | |
| person_buffers[track_id] = deque(maxlen=32) | |
| prediction_buffers[track_id] = deque(maxlen=10) | |
| crop = frame[y1:y2, x1:x2] | |
| if crop.size > 0: person_buffers[track_id].append(crop) | |
| current_score = 0.0 | |
| if len(person_buffers[track_id]) == 32: | |
| processed = [cv2.resize(f, (224, 224))[:,:,::-1]/255.0 for f in person_buffers[track_id]] | |
| clip = torch.tensor(np.transpose(np.array(processed), (3,0,1,2))).float().unsqueeze(0).to(DEVICE) | |
| with torch.no_grad(): | |
| out_sf = sf_model([clip[:, :, ::4, :, :], clip]) | |
| current_score = torch.softmax(out_sf, dim=1)[0][1].item() | |
| prediction_buffers[track_id].append(current_score) | |
| current_score = np.mean(prediction_buffers[track_id]) | |
| if current_score > 0.6: global_theft = True | |
| color = (0, 0, 255) if current_score > 0.6 else (0, 255, 0) | |
| cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) | |
| cv2.putText(frame, f"ID:{track_id} {current_score:.2f}", (x1, y1-10), 0, 0.5, color, 2) | |
| # Fancy Overlays | |
| frame = heatmap.overlay(frame, poly_mask=poly_mask) | |
| overlay = frame.copy() | |
| cv2.rectangle(overlay, (0,0), (w, 80), (0,0,0), -1) | |
| cv2.addWeighted(overlay, 0.5, frame, 0.5, 0, frame) | |
| status = "!!! THEFT DETECTED !!!" if global_theft else "Monitoring Area..." | |
| scolor = (0,0,255) if global_theft else (0,255,0) | |
| cv2.putText(frame, status, (20, 50), 0, 1.0, scolor, 3) | |
| out.write(frame) | |
| cap.release() | |
| out.release() | |
| return output_path | |
| # --- GRADIO UI --- | |
| with gr.Blocks(theme=gr.themes.Soft(), title="Theft Detection Pro") as demo: | |
| gr.Markdown("# 🛡️ AI Theft Detection & Heatmap System") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| video_input = gr.Video(label="1. Upload Video", height=400) | |
| gr.Markdown("### 2. Draw ROI (Optional)\nDraw on the image below to monitor a specific area.") | |
| # This handles getting the first frame automatically | |
| roi_input = gr.ImageMask(label="Draw Region of Interest", height=400) | |
| def get_first_frame(vid): | |
| if vid is None: return None | |
| cap = cv2.VideoCapture(vid) | |
| ret, frame = cap.read() | |
| cap.release() | |
| if ret: return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| return None | |
| video_input.change(get_first_frame, inputs=video_input, outputs=roi_input) | |
| submit_btn = gr.Button("🚀 Start Processing", variant="primary") | |
| with gr.Column(scale=1): | |
| video_output = gr.Video(label="3. Detection Result", height=800) | |
| submit_btn.click( | |
| fn=process_video, | |
| inputs=[video_input, roi_input], | |
| outputs=video_output | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |