Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| import cv2 | |
| import torch | |
| import torch.nn as nn | |
| import numpy as np | |
| import datetime | |
| import threading | |
| import httpx | |
| import time | |
| import asyncio | |
| from collections import deque | |
| from fastapi import FastAPI, UploadFile, File, BackgroundTasks, HTTPException, Query, Request | |
| from fastapi.responses import FileResponse, RedirectResponse, StreamingResponse | |
| from pydantic import BaseModel | |
| from pytorchvideo.models.hub import slowfast_r50 | |
| from ultralytics import YOLO | |
| from dotenv import load_dotenv | |
| load_dotenv(override=True) | |
| # --- DOCUMENTATION METADATA --- | |
| tags_metadata = [ | |
| { | |
| "name": "Live Stream Monitoring", | |
| "description": "Real-time AI surveillance for RTSP cameras.", | |
| }, | |
| { | |
| "name": "Recorded Video Analysis", | |
| "description": "Upload and process video files for theft detection.", | |
| }, | |
| ] | |
| description = """ | |
| <details> | |
| <summary><b> USER GUIDE & ENDPOINT MANUAL (Click to Expand)</b></summary> | |
| <br> | |
| ### Section 1: Live Stream Monitoring | |
| *Use these endpoints to manage and watch live AI security feeds.* | |
| 1. **`POST /stream/create` (Register Camera)** | |
| - **What it does:** Saves your camera's RTSP address into the system. | |
| - **How to use:** Enter a name, the RTSP link, and the location. | |
| 2. **`POST /stream/start/{id}` (Activate AI)** | |
| - **What it does:** Starts the AI processing. | |
| - **How to use:** Returns stream URL to view the stream. | |
| 3. **`PUT /stream/update/{id}` (Change Settings)** | |
| - **What it does:** Updates the name or RTSP link of an existing camera. | |
| 4. **`DELETE /stream/delete/{id}` (Remove Camera)** | |
| - **What it does:** Stops the stream and removes the camera from the list. | |
| 5. **`GET /cameras/{id}/frame` (Main Video Feed)** | |
| - **What it does:** The actual live video stream. | |
| 6. **`GET /stream/list_cameras` (Camera Directory)** | |
| - **What it does:** Displays all added cameras and their status. | |
| 7. **`POST /stream/stop/{id}` (Deactivate AI)** | |
| - **What it does:** Shuts down the AI for a camera. | |
| <br> | |
| ### Section 2: Recorded Video Analysis | |
| *Use these endpoints to scan uploaded files for theft incidents.* | |
| 1. **`POST /video/detect` (Upload for Analysis)** | |
| - **What it does:** Uploads a video file for a full AI scan. Returns a **job_id**. | |
| 2. **`GET /video/status/{job_id}` (Check Progress)** | |
| - **What it does:** Provides the scan percentage (0% to 100%). | |
| 3. **`GET /video/jobs` (Task History)** | |
| - **What it does:** Lists every video ever uploaded. | |
| 4. **`GET /video/download/{job_id}` (Get Result)** | |
| - **What it does:** Download the final processed video once status is 100%. | |
| </details> | |
| --- | |
| """ | |
| # --- CONFIGURATION --- | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| MODEL_PATH = "best_slowfast_theft.pth" | |
| DISCORD_WEBHOOK_URL = os.getenv("DISCORD_WEBHOOK_URL") | |
| UPLOAD_DIR = "uploads" | |
| OUTPUT_DIR = "outputs" | |
| CLIP_LEN = 32 | |
| IMG_SIZE = 224 | |
| THEFT_THRESHOLD = 0.6 | |
| os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| app = FastAPI(title="AI Theft Detection Backend", | |
| description=description, | |
| openapi_tags=tags_metadata, | |
| version="1.0.0") | |
| # --- DATABASE & STATE --- | |
| MOCK_DB = {"cameras": {}} | |
| jobs = {} | |
| # --- LOAD MODELS --- | |
| print(f"Initializing Models on {DEVICE}...") | |
| yolo = YOLO("yolov8n.pt") | |
| slowfast_model = slowfast_r50(pretrained=False) | |
| slowfast_model.blocks[-1].proj = nn.Sequential( | |
| nn.Dropout(p=0.5), | |
| nn.Linear(slowfast_model.blocks[-1].proj.in_features, 2) | |
| ) | |
| if os.path.exists(MODEL_PATH): | |
| ckpt = torch.load(MODEL_PATH, map_location=DEVICE) | |
| slowfast_model.load_state_dict(ckpt["model"] if "model" in ckpt else ckpt) | |
| slowfast_model = slowfast_model.to(DEVICE).eval() | |
| # --- SCHEMAS --- | |
| class CameraCreate(BaseModel): | |
| name: str; rtspUrl: str; location: str | |
| class CameraUpdate(BaseModel): | |
| name: str = None; rtspUrl: str = None; location: str = None | |
| # --- NOTIFICATION LOGIC --- | |
| import json | |
| async def send_discord_alert(source_name, score, crop_frame=None, mode="live"): | |
| if not DISCORD_WEBHOOK_URL or "YOUR_DISCORD" in DISCORD_WEBHOOK_URL: | |
| return | |
| now = datetime.datetime.now().strftime("%Y-%m-%d %I:%M:%S %p") | |
| filename = "theft_crop.jpg" | |
| footer_text = "Security AI Alert System • Recorded video" if mode == "recorded" else "Security AI Alert System • Live Monitoring" | |
| payload = { | |
| "username": "Surveillance Monitoring", | |
| "avatar_url": "https://cdn-icons-png.flaticon.com/512/2564/2564388.png", | |
| "embeds": [{ | |
| "title": "🚨 THEFT DETECTED", | |
| "description": f"AI System has flagged suspicious activity at **{source_name}**.", | |
| "color": 15548997, | |
| "fields": [ | |
| {"name": "📊 Confidence Score", "value": f"**{int(score*100)}%**", "inline": True}, | |
| {"name": "📍 Location", "value": f"**{source_name}**", "inline": True}, | |
| {"name": "⏱️ Timestamp", "value": f"**{now}**", "inline": False} | |
| ], | |
| "image": {"url": f"attachment://{filename}"}, | |
| "footer": {"text": footer_text} | |
| }] | |
| } | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| if crop_frame is not None: | |
| _, buffer = cv2.imencode('.jpg', crop_frame) | |
| files = { | |
| 'file': (filename, buffer.tobytes(), 'image/jpeg') | |
| } | |
| await client.post( | |
| DISCORD_WEBHOOK_URL, | |
| data={"payload_json": json.dumps(payload)}, | |
| files=files | |
| ) | |
| else: | |
| await client.post(DISCORD_WEBHOOK_URL, json=payload) | |
| except Exception as e: | |
| print(f"Discord Alert Error: {e}") | |
| # --- VISUALIZATION HELPERS --- | |
| def draw_corner_rect(img, pt1, pt2, color, thickness=2, r=15, d=25): | |
| x1, y1 = pt1 | |
| x2, y2 = pt2 | |
| cv2.line(img, (x1+r, y1), (x1+r+d, y1), color, thickness) | |
| cv2.line(img, (x1, y1+r), (x1, y1+r+d), color, thickness) | |
| cv2.ellipse(img, (x1+r, y1+r), (r,r), 180, 0, 90, color, thickness) | |
| cv2.line(img, (x2-r, y1), (x2-r-d, y1), color, thickness) | |
| cv2.line(img, (x2, y1+r), (x2, y1+r+d), color, thickness) | |
| cv2.ellipse(img, (x2-r, y1+r), (r,r), 270, 0, 90, color, thickness) | |
| cv2.line(img, (x1+r, y2), (x1+r+d, y2), color, thickness) | |
| cv2.line(img, (x1, y2-r), (x1, y2-r-d), color, thickness) | |
| cv2.ellipse(img, (x1+r, y2-r), (r,r), 90, 0, 90, color, thickness) | |
| cv2.line(img, (x2-r, y2), (x2-r-d, y2), color, thickness) | |
| cv2.line(img, (x2, y2-r), (x2, y2-r-d), color, thickness) | |
| cv2.ellipse(img, (x2-r, y2-r), (r,r), 0, 0, 90, color, thickness) | |
| def draw_security_card(frame, avg_prob, theft_flag, title="AI ANALYZER"): | |
| card_x, card_y = 35, 35 | |
| padding = 30 | |
| line_spacing = 45 | |
| card_w, card_h = 620, 260 | |
| orange = (0, 165, 255) | |
| bg_color = (25, 25, 25) | |
| overlay = frame.copy() | |
| cv2.rectangle(overlay, (card_x, card_y), (card_x + card_w, card_y + card_h), bg_color, -1) | |
| cv2.addWeighted(overlay, 0.85, frame, 0.15, 0, frame) | |
| cv2.rectangle(frame, (card_x, card_y), (card_x + card_w, card_y + card_h), orange, 3) | |
| status_label = "ALERT: THEFT DETECTED" if theft_flag else "STATUS: SYSTEM NORMAL" | |
| status_color = (0, 0, 255) if theft_flag else (0, 255, 0) | |
| now = datetime.datetime.now().strftime("%b %d, %Y | %I:%M:%S %p") | |
| confidence = f"AI CONFIDENCE: {int(avg_prob * 100)}%" | |
| curr_y = card_y + padding + 15 | |
| header_text = "THEFT DETECTION LIVE MONITORING" | |
| (tw, th), _ = cv2.getTextSize(header_text, cv2.FONT_HERSHEY_DUPLEX, 0.9, 2) | |
| cv2.putText(frame, header_text, (card_x + (card_w - tw) // 2, curr_y), | |
| cv2.FONT_HERSHEY_DUPLEX, 0.9, (255, 255, 255), 2, cv2.LINE_AA) | |
| curr_y += line_spacing + 5 | |
| cv2.putText(frame, f"SOURCE: {title.upper()}", (card_x + padding, curr_y), | |
| cv2.FONT_HERSHEY_DUPLEX, 0.8, (200, 200, 200), 1, cv2.LINE_AA) | |
| curr_y += line_spacing | |
| cv2.putText(frame, status_label, (card_x + padding, curr_y), | |
| cv2.FONT_HERSHEY_DUPLEX, 1.0, status_color, 2, cv2.LINE_AA) | |
| curr_y += line_spacing | |
| cv2.putText(frame, confidence, (card_x + padding, curr_y), | |
| cv2.FONT_HERSHEY_DUPLEX, 0.8, (255, 255, 255), 1, cv2.LINE_AA) | |
| curr_y += line_spacing | |
| cv2.putText(frame, now, (card_x + padding, curr_y), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (150, 150, 150), 1, cv2.LINE_AA) | |
| # --- PROCESSING CORE --- | |
| def preprocess(frames): | |
| processed = [cv2.resize(f, (IMG_SIZE, IMG_SIZE))[:, :, ::-1] / 255.0 for f in frames] | |
| clip = np.transpose(np.array(processed), (3, 0, 1, 2)) | |
| return torch.tensor(clip).float().unsqueeze(0).to(DEVICE) | |
| # --- BACKGROUND TASK: VIDEO FILE --- | |
| async def process_video_file(job_id, in_p, out_p): | |
| try: | |
| cap = cv2.VideoCapture(in_p) | |
| w, h = int(cap.get(3)), int(cap.get(4)) | |
| fps, total_frames = int(cap.get(5)), int(cap.get(7)) | |
| out = cv2.VideoWriter(out_p, cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h)) | |
| frame_buffer = deque(maxlen=CLIP_LEN) | |
| prediction_buffer = deque(maxlen=10) | |
| last_alert_time = 0 | |
| frame_counter = 0 | |
| source_name = jobs.get(job_id, {}).get("filename") or f"Recorded Video ({job_id})" | |
| while cap.isOpened(): | |
| if jobs.get(job_id, {}).get("stop_requested"): break | |
| ret, frame = cap.read() | |
| if not ret: break | |
| frame_counter += 1 | |
| theft_flag, avg_prob, current_crop = False, 0.0, None | |
| results = yolo(frame, verbose=False) | |
| for r in results: | |
| for box in r.boxes: | |
| if int(box.cls[0]) == 0: | |
| x1, y1, x2, y2 = map(int, box.xyxy[0]) | |
| current_crop = frame[y1:y2, x1:x2] | |
| if current_crop.size > 0: | |
| frame_buffer.append(current_crop) | |
| if len(frame_buffer) == CLIP_LEN: | |
| clip_ts = preprocess(frame_buffer) | |
| with torch.no_grad(): | |
| probs = torch.softmax(slowfast_model([clip_ts[:,:,::4,:,:], clip_ts]), dim=1) | |
| prediction_buffer.append(probs[0][1].item()) | |
| avg_prob = np.mean(prediction_buffer) | |
| if avg_prob > THEFT_THRESHOLD: | |
| theft_flag = True | |
| draw_corner_rect(frame, (x1,y1), (x2,y2), (0,0,255)) | |
| current_time = time.time() | |
| if current_time - last_alert_time > 30 and current_crop is not None: | |
| await send_discord_alert(source_name, avg_prob, current_crop, mode="recorded") | |
| last_alert_time = current_time | |
| else: | |
| draw_corner_rect(frame, (x1,y1), (x2,y2), (0,255,0)) | |
| draw_security_card(frame, avg_prob, theft_flag, "VIDEO ANALYSIS") | |
| out.write(frame) | |
| jobs[job_id]["progress"] = int((frame_counter / total_frames) * 100) | |
| cap.release(); out.release() | |
| jobs[job_id]["status"] = "completed" if not jobs[job_id].get("stop_requested") else "stopped" | |
| except Exception as e: | |
| jobs[job_id]["status"] = f"failed: {str(e)}" | |
| # --- RTSP CAMERA PIPELINE --- | |
| class CameraPipeline: | |
| def __init__(self, cam_id, name, url): | |
| self.cam_id = cam_id | |
| self.name = name | |
| self.url = url | |
| self.running = True | |
| self.latest_frame = None | |
| self.frame_buffer = deque(maxlen=CLIP_LEN) | |
| self.prediction_buffer = deque(maxlen=10) | |
| self.last_alert_time = 0 | |
| self.raw_frame = None | |
| self.capture_thread = threading.Thread(target=self._capture_loop, daemon=True) | |
| self.ai_thread = threading.Thread(target=self._ai_loop, daemon=True) | |
| self.capture_thread.start() | |
| self.ai_thread.start() | |
| def _capture_loop(self): | |
| os.environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp" | |
| cap = cv2.VideoCapture(self.url, cv2.CAP_FFMPEG) | |
| cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) | |
| while self.running: | |
| ret, frame = cap.read() | |
| if not ret: | |
| cap.release() | |
| time.sleep(2) | |
| cap = cv2.VideoCapture(self.url, cv2.CAP_FFMPEG) | |
| continue | |
| self.raw_frame = frame | |
| cap.release() | |
| def _ai_loop(self): | |
| while self.running: | |
| if self.raw_frame is None: | |
| time.sleep(0.01) | |
| continue | |
| frame = self.raw_frame.copy() | |
| theft_flag, avg_prob = False, 0.0 | |
| current_crop = None | |
| results = yolo(frame, verbose=False, conf=0.4) | |
| for r in results: | |
| for box in r.boxes: | |
| if int(box.cls[0]) == 0: | |
| x1, y1, x2, y2 = map(int, box.xyxy[0]) | |
| current_crop = frame[y1:y2, x1:x2] | |
| if current_crop.size > 0: | |
| p_frame = cv2.resize(current_crop, (IMG_SIZE, IMG_SIZE))[:,:,::-1]/255.0 | |
| self.frame_buffer.append(p_frame) | |
| if len(self.frame_buffer) == CLIP_LEN: | |
| clip = np.transpose(np.array(self.frame_buffer), (3,0,1,2)) | |
| clip_ts = torch.tensor(clip).float().unsqueeze(0).to(DEVICE) | |
| with torch.no_grad(): | |
| probs = torch.softmax(slowfast_model([clip_ts[:,:,::4,:,:], clip_ts]), dim=1) | |
| self.prediction_buffer.append(probs[0][1].item()) | |
| avg_prob = np.mean(prediction_buffer) | |
| is_theft = avg_prob > THEFT_THRESHOLD | |
| draw_corner_rect(frame, (x1,y1), (x2,y2), (0,0,255) if is_theft else (0,255,0)) | |
| if is_theft: | |
| theft_flag = True | |
| if theft_flag and (time.time() - self.last_alert_time > 30) and current_crop is not None: | |
| try: | |
| asyncio.run(send_discord_alert(self.name, avg_prob, current_crop, mode="live")) | |
| except RuntimeError: | |
| loop = asyncio.new_event_loop() | |
| loop.run_until_complete(send_discord_alert(self.name, avg_prob, current_crop, mode="live")) | |
| loop.close() | |
| self.last_alert_time = time.time() | |
| draw_security_card(frame, avg_prob, theft_flag, self.name) | |
| self.latest_frame = frame | |
| class StreamManager: | |
| def __init__(self): | |
| self.active_pipelines = {} | |
| def start_camera(self, cam_id, name, url): | |
| if cam_id in self.active_pipelines: | |
| return False | |
| self.active_pipelines[cam_id] = CameraPipeline(cam_id, name, url) | |
| return True | |
| def stop_camera(self, cam_id): | |
| if cam_id not in self.active_pipelines: | |
| return False | |
| self.active_pipelines[cam_id].running = False | |
| del self.active_pipelines[cam_id] | |
| return True | |
| manager = StreamManager() | |
| # --- ENDPOINTS --- | |
| async def root(): return RedirectResponse(url="/docs") | |
| # --- CAMERA ENDPOINTS --- | |
| def get_cameras(): return {"cameras": list(MOCK_DB["cameras"].values())} | |
| def create_camera(cam: CameraCreate): | |
| new_id = f"cam-{len(MOCK_DB['cameras']) + 1}" | |
| camera = {**cam.dict(), "id": new_id, "status": "offline", "isStreaming": False} | |
| MOCK_DB["cameras"][new_id] = camera | |
| return {"camera": camera} | |
| async def start_camera(id: str, request: Request): | |
| if id not in MOCK_DB["cameras"]: | |
| raise HTTPException(404) | |
| cam = MOCK_DB["cameras"][id] | |
| started = manager.start_camera(id, cam["name"], cam["rtspUrl"]) | |
| if started: | |
| cam["status"] = "online" | |
| cam["isStreaming"] = True | |
| # Generate URLs for the user | |
| local_url = f"{request.base_url}cameras/{id}/frame" | |
| return { | |
| "success": started, | |
| "job_id": id, | |
| "view_urls": { | |
| "stream_url": str(local_url) | |
| } | |
| } | |
| def update_camera(id: str, cam_data: CameraUpdate): | |
| if id not in MOCK_DB["cameras"]: | |
| raise HTTPException(status_code=404, detail="Camera not found") | |
| current_cam = MOCK_DB["cameras"][id] | |
| update_dict = cam_data.dict(exclude_unset=True) | |
| current_cam.update(update_dict) | |
| return {"message": "Camera updated successfully", "camera": current_cam} | |
| def delete_camera(id: str): | |
| if id not in MOCK_DB["cameras"]: | |
| raise HTTPException(status_code=404, detail="Camera not found") | |
| manager.stop_camera(id) | |
| del MOCK_DB["cameras"][id] | |
| return {"message": f"Camera {id} deleted successfully"} | |
| def stop_camera(id: str): | |
| if id not in MOCK_DB["cameras"]: | |
| raise HTTPException(404) | |
| stopped = manager.stop_camera(id) | |
| if stopped: | |
| MOCK_DB["cameras"][id]["status"] = "offline" | |
| MOCK_DB["cameras"][id]["isStreaming"] = False | |
| return {"success": stopped} | |
| def get_frame_legacy(id: str): | |
| if id not in manager.active_pipelines: | |
| raise HTTPException(404) | |
| def generate(): | |
| while id in manager.active_pipelines: | |
| frame = manager.active_pipelines[id].latest_frame | |
| if frame is not None: | |
| _, buffer = cv2.imencode('.jpg', frame) | |
| yield (b'--frame\r\nContent-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n') | |
| time.sleep(0.05) | |
| return StreamingResponse(generate(), media_type="multipart/x-mixed-replace; boundary=frame") | |
| # --- VIDEO PROCESSING ENDPOINTS --- | |
| async def detect(background_tasks: BackgroundTasks, file: UploadFile = File(...)): | |
| jid = str(uuid.uuid4()) | |
| in_p = os.path.join(UPLOAD_DIR, f"{jid}_{file.filename}") | |
| out_p = os.path.join(OUTPUT_DIR, f"result_{jid}.mp4") | |
| with open(in_p, "wb") as f: f.write(await file.read()) | |
| jobs[jid] = {"status": "processing", "progress": 0, "output_path": out_p, "stop_requested": False, "filename": file.filename} | |
| # We use a lambda to run the async function in background tasks | |
| background_tasks.add_task(lambda: asyncio.run(process_video_file(jid, in_p, out_p))) | |
| return {"job_id": jid} | |
| async def list_jobs(): | |
| return [{"job_id": j, "status": d["status"], "progress": f"{d['progress']}%"} for j, d in jobs.items()] | |
| async def get_status(job_id: str): | |
| return jobs.get(job_id, {"error": "Not found"}) | |
| async def download(job_id: str): | |
| if job_id in jobs and jobs[job_id]["status"] == "completed": | |
| return FileResponse(jobs[job_id]["output_path"]) | |
| raise HTTPException(400, "File not ready") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |