Spaces:
Running
Running
| import os | |
| import cv2 | |
| import numpy as np | |
| import uuid | |
| import shutil | |
| from fastapi import FastAPI, File, UploadFile, Query, HTTPException | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from typing import List, Dict, Any | |
| from suspicious_behavior.config import API_HOST, API_PORT | |
| from suspicious_behavior.pipeline.frame_analyzer import FrameAnalyzer | |
| from suspicious_behavior.pipeline.video_processor import VideoProcessor | |
| # Initialize FastAPI application | |
| app = FastAPI( | |
| title="SimShieldAI Suspicious Behavior API", | |
| description="Production-grade API for detecting fighting, violence, and running/sprinting in CCTV surveillance.", | |
| version="1.0.0" | |
| ) | |
| # Add CORS Middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Workspace directories for temporary assets | |
| WORKSPACE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| TEMP_DIR = os.path.join(WORKSPACE_DIR, "temp") | |
| os.makedirs(TEMP_DIR, exist_ok=True) | |
| # Lazy loading of FrameAnalyzer to speed up initial server startup | |
| analyzer = None | |
| def get_analyzer(): | |
| global analyzer | |
| if analyzer is None: | |
| analyzer = FrameAnalyzer(camera_id="api_camera_1") | |
| return analyzer | |
| async def health_check(): | |
| """ | |
| Checks system health and returns model status and execution device (CPU/GPU). | |
| """ | |
| try: | |
| # Initialize analyzer to verify models load successfully | |
| inst = get_analyzer() | |
| return { | |
| "status": "healthy", | |
| "device": str(inst.violence_engine.device), | |
| "yolo_model": str(inst.yolo.model_name), | |
| "violence_model": inst.violence_engine.labels, | |
| "timestamp": str(np.datetime64('now')) | |
| } | |
| except Exception as e: | |
| return JSONResponse( | |
| status_code=500, | |
| content={"status": "unhealthy", "error": str(e)} | |
| ) | |
| async def detect_image( | |
| file: UploadFile = File(..., description="JPEG/PNG image file containing the scene to analyze"), | |
| return_image: bool = Query(True, description="If true, returns base64 annotated image in the JSON response") | |
| ): | |
| """ | |
| Analyzes a single image for suspicious behavior (such as running poses). | |
| """ | |
| if not file.content_type.startswith("image/"): | |
| raise HTTPException(status_code=400, detail="Uploaded file must be an image.") | |
| try: | |
| contents = await file.read() | |
| nparr = np.frombuffer(contents, np.uint8) | |
| img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| if img is None: | |
| raise HTTPException(status_code=400, detail="Invalid image data.") | |
| # Process image using our frame analyzer | |
| # (VideoMAE is skipped for single frames if buffer is not full, which is normal) | |
| inst = get_analyzer() | |
| annotated_frame, alerts, metadata = inst.analyze(img, fps=10.0, output_base64=return_image) | |
| response_data = { | |
| "metadata": metadata, | |
| "alerts_triggered": [alert.model_dump() for alert in alerts] | |
| } | |
| if return_image and len(alerts) > 0: | |
| response_data["annotated_frame_base64"] = alerts[0].frame_image | |
| elif return_image: | |
| # If no alerts, manually encode the frame to base64 | |
| _, buffer = cv2.imencode('.jpg', annotated_frame) | |
| import base64 | |
| response_data["annotated_frame_base64"] = base64.b64encode(buffer).decode('utf-8') | |
| return response_data | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Image processing failed: {str(e)}") | |
| async def analyze_video( | |
| file: UploadFile = File(..., description="MP4, AVI, or MOV video file to process") | |
| ): | |
| """ | |
| Processes an entire video clip. Annotates the video with tracking skeletons | |
| and alert banners, and returns the fully processed video file for download. | |
| """ | |
| # Check video extension | |
| filename = file.filename | |
| ext = os.path.splitext(filename)[1].lower() | |
| if ext not in [".mp4", ".avi", ".mov", ".mkv"]: | |
| raise HTTPException(status_code=400, detail="File must be a valid video format (mp4, avi, mov, mkv)") | |
| # Define temporary file paths | |
| unique_id = str(uuid.uuid4()) | |
| input_path = os.path.join(TEMP_DIR, f"input_{unique_id}{ext}") | |
| output_path = os.path.join(TEMP_DIR, f"output_{unique_id}.mp4") | |
| try: | |
| # Save uploaded file to disk | |
| with open(input_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| # Initialize processor and frame analyzer | |
| processor = VideoProcessor(target_fps=10.0) | |
| metadata = processor.get_metadata(input_path) | |
| inst = get_analyzer() | |
| # Reset state (so frame indices start clean for this video) | |
| inst.frame_idx = 0 | |
| inst.violence_buffer.clear() | |
| inst.alert_manager.reset_cooldowns() | |
| processed_frames = [] | |
| all_metadata = [] | |
| all_alerts = [] | |
| print(f"[API] Processing video: {filename} ({metadata['frame_count']} frames at {metadata['fps']:.1f} FPS)...") | |
| # Iterate through sampled frames | |
| for f_idx, frame, timestamp in processor.extract_frames_generator(input_path): | |
| annotated, alerts, frame_meta = inst.analyze(frame, fps=10.0, output_base64=False) | |
| processed_frames.append(annotated) | |
| all_metadata.append(frame_meta) | |
| for alert in alerts: | |
| all_alerts.append(alert.model_dump()) | |
| # Compile processed frames back to output video | |
| # We process at 10.0 FPS, so output video should play back at 10.0 FPS | |
| processor.write_frames_to_video( | |
| processed_frames, | |
| output_path, | |
| fps=10.0, | |
| frame_size=(metadata['width'], metadata['height']) | |
| ) | |
| # Return the processed video file for immediate playback/download | |
| # FastAPI will handle cleaning up files if we use background tasks, but for simple VM, | |
| # we can return it and let the user delete it or keep a cache. | |
| return FileResponse( | |
| path=output_path, | |
| filename=f"annotated_{filename}", | |
| media_type="video/mp4" | |
| ) | |
| except Exception as e: | |
| # Cleanup input file if it was created | |
| if os.path.exists(input_path): | |
| os.remove(input_path) | |
| raise HTTPException(status_code=500, detail=f"Video processing failed: {str(e)}") | |
| finally: | |
| # Clean up input file after processing is complete | |
| if os.path.exists(input_path): | |
| try: | |
| os.remove(input_path) | |
| except Exception: | |
| pass | |
| if __name__ == "__main__": | |
| import uvicorn | |
| print(f"[API] Starting FastAPI server on {API_HOST}:{API_PORT}...") | |
| uvicorn.run("server:app", host=API_HOST, port=API_PORT, reload=True) | |