import os import cv2 import numpy as np import uuid import shutil from fastapi import FastAPI, File, UploadFile, Query, HTTPException from fastapi.responses import FileResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import List, Dict, Any from suspicious_behavior.config import API_HOST, API_PORT from suspicious_behavior.pipeline.frame_analyzer import FrameAnalyzer from suspicious_behavior.pipeline.video_processor import VideoProcessor # Initialize FastAPI application app = FastAPI( title="SimShieldAI Suspicious Behavior API", description="Production-grade API for detecting fighting, violence, and running/sprinting in CCTV surveillance.", version="1.0.0" ) # Add CORS Middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Workspace directories for temporary assets WORKSPACE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) TEMP_DIR = os.path.join(WORKSPACE_DIR, "temp") os.makedirs(TEMP_DIR, exist_ok=True) # Lazy loading of FrameAnalyzer to speed up initial server startup analyzer = None def get_analyzer(): global analyzer if analyzer is None: analyzer = FrameAnalyzer(camera_id="api_camera_1") return analyzer @app.get("/api/health") async def health_check(): """ Checks system health and returns model status and execution device (CPU/GPU). """ try: # Initialize analyzer to verify models load successfully inst = get_analyzer() return { "status": "healthy", "device": str(inst.violence_engine.device), "yolo_model": str(inst.yolo.model_name), "violence_model": inst.violence_engine.labels, "timestamp": str(np.datetime64('now')) } except Exception as e: return JSONResponse( status_code=500, content={"status": "unhealthy", "error": str(e)} ) @app.post("/api/detect") async def detect_image( file: UploadFile = File(..., description="JPEG/PNG image file containing the scene to analyze"), return_image: bool = Query(True, description="If true, returns base64 annotated image in the JSON response") ): """ Analyzes a single image for suspicious behavior (such as running poses). """ if not file.content_type.startswith("image/"): raise HTTPException(status_code=400, detail="Uploaded file must be an image.") try: contents = await file.read() nparr = np.frombuffer(contents, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if img is None: raise HTTPException(status_code=400, detail="Invalid image data.") # Process image using our frame analyzer # (VideoMAE is skipped for single frames if buffer is not full, which is normal) inst = get_analyzer() annotated_frame, alerts, metadata = inst.analyze(img, fps=10.0, output_base64=return_image) response_data = { "metadata": metadata, "alerts_triggered": [alert.model_dump() for alert in alerts] } if return_image and len(alerts) > 0: response_data["annotated_frame_base64"] = alerts[0].frame_image elif return_image: # If no alerts, manually encode the frame to base64 _, buffer = cv2.imencode('.jpg', annotated_frame) import base64 response_data["annotated_frame_base64"] = base64.b64encode(buffer).decode('utf-8') return response_data except Exception as e: raise HTTPException(status_code=500, detail=f"Image processing failed: {str(e)}") @app.post("/api/analyze-video") async def analyze_video( file: UploadFile = File(..., description="MP4, AVI, or MOV video file to process") ): """ Processes an entire video clip. Annotates the video with tracking skeletons and alert banners, and returns the fully processed video file for download. """ # Check video extension filename = file.filename ext = os.path.splitext(filename)[1].lower() if ext not in [".mp4", ".avi", ".mov", ".mkv"]: raise HTTPException(status_code=400, detail="File must be a valid video format (mp4, avi, mov, mkv)") # Define temporary file paths unique_id = str(uuid.uuid4()) input_path = os.path.join(TEMP_DIR, f"input_{unique_id}{ext}") output_path = os.path.join(TEMP_DIR, f"output_{unique_id}.mp4") try: # Save uploaded file to disk with open(input_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # Initialize processor and frame analyzer processor = VideoProcessor(target_fps=10.0) metadata = processor.get_metadata(input_path) inst = get_analyzer() # Reset state (so frame indices start clean for this video) inst.frame_idx = 0 inst.violence_buffer.clear() inst.alert_manager.reset_cooldowns() processed_frames = [] all_metadata = [] all_alerts = [] print(f"[API] Processing video: {filename} ({metadata['frame_count']} frames at {metadata['fps']:.1f} FPS)...") # Iterate through sampled frames for f_idx, frame, timestamp in processor.extract_frames_generator(input_path): annotated, alerts, frame_meta = inst.analyze(frame, fps=10.0, output_base64=False) processed_frames.append(annotated) all_metadata.append(frame_meta) for alert in alerts: all_alerts.append(alert.model_dump()) # Compile processed frames back to output video # We process at 10.0 FPS, so output video should play back at 10.0 FPS processor.write_frames_to_video( processed_frames, output_path, fps=10.0, frame_size=(metadata['width'], metadata['height']) ) # Return the processed video file for immediate playback/download # FastAPI will handle cleaning up files if we use background tasks, but for simple VM, # we can return it and let the user delete it or keep a cache. return FileResponse( path=output_path, filename=f"annotated_{filename}", media_type="video/mp4" ) except Exception as e: # Cleanup input file if it was created if os.path.exists(input_path): os.remove(input_path) raise HTTPException(status_code=500, detail=f"Video processing failed: {str(e)}") finally: # Clean up input file after processing is complete if os.path.exists(input_path): try: os.remove(input_path) except Exception: pass if __name__ == "__main__": import uvicorn print(f"[API] Starting FastAPI server on {API_HOST}:{API_PORT}...") uvicorn.run("server:app", host=API_HOST, port=API_PORT, reload=True)