MuhammadTayyab0143's picture
Initial commit: set up project skeleton and deploy workflows
59bcdd1
Raw
History Blame Contribute Delete
7.05 kB
import os
import cv2
import numpy as np
import uuid
import shutil
from fastapi import FastAPI, File, UploadFile, Query, HTTPException
from fastapi.responses import FileResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Dict, Any
from suspicious_behavior.config import API_HOST, API_PORT
from suspicious_behavior.pipeline.frame_analyzer import FrameAnalyzer
from suspicious_behavior.pipeline.video_processor import VideoProcessor
# Initialize FastAPI application
app = FastAPI(
title="SimShieldAI Suspicious Behavior API",
description="Production-grade API for detecting fighting, violence, and running/sprinting in CCTV surveillance.",
version="1.0.0"
)
# Add CORS Middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Workspace directories for temporary assets
WORKSPACE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
TEMP_DIR = os.path.join(WORKSPACE_DIR, "temp")
os.makedirs(TEMP_DIR, exist_ok=True)
# Lazy loading of FrameAnalyzer to speed up initial server startup
analyzer = None
def get_analyzer():
global analyzer
if analyzer is None:
analyzer = FrameAnalyzer(camera_id="api_camera_1")
return analyzer
@app.get("/api/health")
async def health_check():
"""
Checks system health and returns model status and execution device (CPU/GPU).
"""
try:
# Initialize analyzer to verify models load successfully
inst = get_analyzer()
return {
"status": "healthy",
"device": str(inst.violence_engine.device),
"yolo_model": str(inst.yolo.model_name),
"violence_model": inst.violence_engine.labels,
"timestamp": str(np.datetime64('now'))
}
except Exception as e:
return JSONResponse(
status_code=500,
content={"status": "unhealthy", "error": str(e)}
)
@app.post("/api/detect")
async def detect_image(
file: UploadFile = File(..., description="JPEG/PNG image file containing the scene to analyze"),
return_image: bool = Query(True, description="If true, returns base64 annotated image in the JSON response")
):
"""
Analyzes a single image for suspicious behavior (such as running poses).
"""
if not file.content_type.startswith("image/"):
raise HTTPException(status_code=400, detail="Uploaded file must be an image.")
try:
contents = await file.read()
nparr = np.frombuffer(contents, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
raise HTTPException(status_code=400, detail="Invalid image data.")
# Process image using our frame analyzer
# (VideoMAE is skipped for single frames if buffer is not full, which is normal)
inst = get_analyzer()
annotated_frame, alerts, metadata = inst.analyze(img, fps=10.0, output_base64=return_image)
response_data = {
"metadata": metadata,
"alerts_triggered": [alert.model_dump() for alert in alerts]
}
if return_image and len(alerts) > 0:
response_data["annotated_frame_base64"] = alerts[0].frame_image
elif return_image:
# If no alerts, manually encode the frame to base64
_, buffer = cv2.imencode('.jpg', annotated_frame)
import base64
response_data["annotated_frame_base64"] = base64.b64encode(buffer).decode('utf-8')
return response_data
except Exception as e:
raise HTTPException(status_code=500, detail=f"Image processing failed: {str(e)}")
@app.post("/api/analyze-video")
async def analyze_video(
file: UploadFile = File(..., description="MP4, AVI, or MOV video file to process")
):
"""
Processes an entire video clip. Annotates the video with tracking skeletons
and alert banners, and returns the fully processed video file for download.
"""
# Check video extension
filename = file.filename
ext = os.path.splitext(filename)[1].lower()
if ext not in [".mp4", ".avi", ".mov", ".mkv"]:
raise HTTPException(status_code=400, detail="File must be a valid video format (mp4, avi, mov, mkv)")
# Define temporary file paths
unique_id = str(uuid.uuid4())
input_path = os.path.join(TEMP_DIR, f"input_{unique_id}{ext}")
output_path = os.path.join(TEMP_DIR, f"output_{unique_id}.mp4")
try:
# Save uploaded file to disk
with open(input_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Initialize processor and frame analyzer
processor = VideoProcessor(target_fps=10.0)
metadata = processor.get_metadata(input_path)
inst = get_analyzer()
# Reset state (so frame indices start clean for this video)
inst.frame_idx = 0
inst.violence_buffer.clear()
inst.alert_manager.reset_cooldowns()
processed_frames = []
all_metadata = []
all_alerts = []
print(f"[API] Processing video: {filename} ({metadata['frame_count']} frames at {metadata['fps']:.1f} FPS)...")
# Iterate through sampled frames
for f_idx, frame, timestamp in processor.extract_frames_generator(input_path):
annotated, alerts, frame_meta = inst.analyze(frame, fps=10.0, output_base64=False)
processed_frames.append(annotated)
all_metadata.append(frame_meta)
for alert in alerts:
all_alerts.append(alert.model_dump())
# Compile processed frames back to output video
# We process at 10.0 FPS, so output video should play back at 10.0 FPS
processor.write_frames_to_video(
processed_frames,
output_path,
fps=10.0,
frame_size=(metadata['width'], metadata['height'])
)
# Return the processed video file for immediate playback/download
# FastAPI will handle cleaning up files if we use background tasks, but for simple VM,
# we can return it and let the user delete it or keep a cache.
return FileResponse(
path=output_path,
filename=f"annotated_{filename}",
media_type="video/mp4"
)
except Exception as e:
# Cleanup input file if it was created
if os.path.exists(input_path):
os.remove(input_path)
raise HTTPException(status_code=500, detail=f"Video processing failed: {str(e)}")
finally:
# Clean up input file after processing is complete
if os.path.exists(input_path):
try:
os.remove(input_path)
except Exception:
pass
if __name__ == "__main__":
import uvicorn
print(f"[API] Starting FastAPI server on {API_HOST}:{API_PORT}...")
uvicorn.run("server:app", host=API_HOST, port=API_PORT, reload=True)