DetectifAI-Backend / behavior_analysis /action_recognition.py
blacksinisterx's picture
deploy: batch update 5 file(s)
7838616 verified
# ============================================================
# FULLY FIXED ACTION RECOGNITION PIPELINE
# Supports:
# - fight_detection.pt (3D ResNet18, state_dict)
# - road_accident.pt (3D ResNet18, state_dict)
# - wallclimb.pt (YOLO, Ultralytics)
# ============================================================
from dataclasses import dataclass, asdict
import multiprocessing as mp
import torch
import cv2
import numpy as np
import os
import time
import json
import logging
from typing import List, Optional, Dict, Any
from torchvision.models.video import r3d_18
import torch.nn as nn
# --- YOLO + PyTorch 2.6 compatibility ---
from ultralytics import YOLO
import ultralytics
torch.serialization.add_safe_globals([ultralytics.nn.tasks.DetectionModel])
# --- Logging ---
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# ============================================================
# FIXED MODEL PATHS
# ============================================================
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MODEL_PATHS = {
"fight_detection": os.path.join(BASE_DIR, "fight_detection.pt"),
"road_accident": os.path.join(BASE_DIR, "accident_detection.pt"),
"wallclimb": os.path.join(BASE_DIR, "wallclimb.pt"),
}
# Define which models are 3D-ResNet (run separately) vs YOLO
RESNET_MODELS = {"fight_detection", "road_accident"}
YOLO_MODELS = {"wallclimb"}
# ============================================================
# Dataclasses
# ============================================================
@dataclass
class ActionPrediction:
timestamp: float
frame_index: int
label: str
confidence: float
# ============================================================
# MODEL LOADER (YOLO or 3D-ResNet)
# ============================================================
def load_model(model_path: str, device: torch.device):
name = os.path.basename(model_path).lower()
# -------- YOLO MODEL (wallclimb) --------
if "wall" in name or "yolo" in name:
logger.info(f"Loading YOLO model: {model_path}")
return YOLO(model_path)
# -------- TRY TorchScript --------
try:
model = torch.jit.load(model_path, map_location=device)
logger.info(f"Loaded TorchScript model")
model.eval()
return model
except:
pass
# -------- 3D-ResNet --------
try:
ckpt = torch.load(model_path, map_location=device)
if isinstance(ckpt, dict):
logger.info(f"Loading 3D-ResNet model: {model_path}")
model = r3d_18(weights=None)
model.fc = nn.Linear(512, 2)
state = ckpt.get("state_dict", ckpt)
model.load_state_dict(state)
model.to(device)
model.eval()
return model
except Exception as e:
logger.error(f"3D-ResNet load failed: {e}")
raise RuntimeError(f"Unsupported model format: {model_path}")
# ============================================================
# FRAME PREPROCESSING FOR 3D-ResNet
# ============================================================
def preprocess_clip(frames: List[np.ndarray], device: torch.device, target_size=None):
"""
frames = list of 16 RGB frames
output: tensor (1, 3, 16, H, W)
"""
processed = []
# default target size used in your training/preprocessing
if not target_size:
target_size = (112, 112)
for f in frames:
img = cv2.cvtColor(f, cv2.COLOR_BGR2RGB)
if target_size:
img = cv2.resize(img, (target_size[1], target_size[0]))
img = img / 255.0
img = img.transpose(2, 0, 1)
processed.append(img)
clip = np.stack(processed, axis=1)
tensor = torch.from_numpy(clip).float().unsqueeze(0).to(device)
return tensor
# ============================================================
# INTERPRET MODEL OUTPUT
# ============================================================
# Map class indices to action labels
ACTION_LABELS = {
0: "fighting",
1: "accident",
2: "climbing"
}
# Per-action confidence thresholds
ACTION_CONFIDENCE_THRESHOLDS = {
"fighting": 0.5,
"accident": 0.65,
"climbing": 0.8
}
def interpret_prediction(model, output, model_name, confidence_threshold=None):
"""
Interpret model output and return one of three actions: "fighting", "accident", or "climbing".
If confidence is below 0.5, suppress the prediction and return ("no_action", 0.0).
Model-specific handling:
- fight_detection: returns "fighting" if class 1, "no_action" for class 0
- road_accident: returns "accident" if class 1, "no_action" for class 0
- wallclimb (YOLO): returns "climbing" for class 2
"""
# -------- YOLO (wallclimb) --------
if hasattr(model, "predict") and isinstance(output, list):
logger.info(f"🔍 YOLO prediction for {model_name}")
boxes = output[0].boxes
if boxes is None or len(boxes) == 0:
logger.info("🚫 No boxes detected by YOLO")
return ("no_action", 0.0)
best = boxes[0]
cls_idx = int(best.cls)
conf = float(best.conf)
# YOLO returns climbing detections
label = "climbing" if cls_idx == 0 else "no_action"
# Use per-action threshold or provided threshold
threshold = confidence_threshold if confidence_threshold is not None else ACTION_CONFIDENCE_THRESHOLDS.get(label, 0.5)
logger.info(f"🎯 YOLO detection: class_idx={cls_idx}, confidence={conf:.3f}, threshold={threshold}")
# Suppress if confidence < threshold
if conf < threshold:
logger.info(f"🚫 Confidence {conf:.3f} below threshold {threshold}")
return ("no_action", 0.0)
logger.info(f"✅ YOLO final result: {label} (conf: {conf:.3f})")
return (label, conf)
# -------- 3D-ResNet (fight_detection or road_accident) --------
if isinstance(output, torch.Tensor):
logger.info(f"🔍 3D-ResNet prediction for {model_name}")
probs = torch.softmax(output, dim=1)[0]
cls_idx = int(torch.argmax(probs).item())
conf = float(probs[cls_idx])
logger.info(f"📊 Raw probabilities: {probs.tolist()}")
# Model-specific mapping (class 0 = negative, class 1 = positive)
if "fight" in model_name.lower():
label = "fighting" if cls_idx == 1 else "no_action"
logger.info(f"🥊 Fight detection: class {cls_idx} -> {label}")
elif "accident" in model_name.lower() or "road" in model_name.lower():
# match user's naming and capitalization for saved frames
label = "Accident" if cls_idx == 1 else "no_action"
else:
label = "no_action"
logger.info(f"❓ Unknown model type, defaulting to no_action")
# Use per-action threshold or provided threshold
threshold = confidence_threshold if confidence_threshold is not None else ACTION_CONFIDENCE_THRESHOLDS.get(label.lower(), 0.5)
logger.info(f"🎯 Predicted class: {cls_idx}, confidence: {conf:.3f}, threshold: {threshold}")
# Suppress if confidence < threshold
if conf < threshold:
logger.info(f"🚫 Confidence {conf:.3f} below threshold {threshold}")
return ("no_action", 0.0)
logger.info(f"✅ 3D-ResNet final result: {label} (conf: {conf:.3f})")
return (label, conf)
return ("no_action", 0.0)
# ============================================================
# VIDEO PROCESSING
# ============================================================
def process_video_with_model(
video_path,
model_path,
output_dir,
model_name=None,
use_gpu=True,
frame_skip=1,
target_size=None,
annotate=True):
device = torch.device("cuda" if (use_gpu and torch.cuda.is_available()) else "cpu")
model_name = model_name or os.path.splitext(os.path.basename(model_path))[0]
logger.info(f"[{model_name}] Loading model...")
model = load_model(model_path, device)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
logger.error(f"[{model_name}] Could not open video")
return
fps = cap.get(cv2.CAP_PROP_FPS) or 25
frame_buffer = []
idx = 0
frames_processed = 0
predictions = []
# annotation folder
anno_dir = os.path.join(output_dir, f"{model_name}_annotated")
if annotate:
os.makedirs(anno_dir, exist_ok=True)
start = time.time()
while True:
ret, frame = cap.read()
if not ret:
break
if idx % frame_skip != 0:
idx += 1
continue
timestamp = idx / fps
try:
# -------- YOLO --------
if hasattr(model, "predict"):
output = model.predict(frame, verbose=False)
label, conf = interpret_prediction(model, output, model_name)
# -------- 3D-ResNet uses CLIPS of 16 frames --------
else:
frame_buffer.append(frame)
if len(frame_buffer) < 16:
idx += 1
continue
clip = preprocess_clip(frame_buffer[-16:], device, target_size)
with torch.no_grad():
output = model(clip)
label, conf = interpret_prediction(model, output, model_name)
# Only record and annotate positive detections
if label != "no_action":
predictions.append(ActionPrediction(timestamp, idx, label, conf))
frames_processed += 1
# -------- Annotate output --------
if annotate:
anno = frame.copy()
cv2.putText(
anno,
f"{label} {conf:.2f}",
(10, 35),
cv2.FONT_HERSHEY_SIMPLEX,
1.0,
(0, 255, 0),
2,
)
cv2.imwrite(os.path.join(anno_dir, f"{idx:06}.jpg"), anno)
except Exception as e:
logger.error(f"[{model_name}] Error on frame {idx}: {e}")
idx += 1
cap.release()
# Save results
os.makedirs(output_dir, exist_ok=True)
json_path = os.path.join(output_dir, f"{os.path.basename(video_path)}__{model_name}.json")
with open(json_path, "w") as f:
json.dump({
"video": video_path,
"model": model_path,
"frames_processed": frames_processed,
"processing_time": time.time() - start,
"predictions": [asdict(p) for p in predictions]
}, f, indent=2)
logger.info(f"[{model_name}] Finished. Saved: {json_path}")
# ============================================================
# MULTI-MODEL EXECUTOR (Windows-safe)
# ============================================================
def run_models_on_videos(video_paths, model_paths,
output_dir="./action_recognition_outputs",
use_gpu=True, frame_skip=5,
target_size=None, annotate=True):
os.makedirs(output_dir, exist_ok=True)
processes = []
for model_path in model_paths:
model_name = os.path.splitext(os.path.basename(model_path))[0]
for video in video_paths:
p = mp.Process(target=process_video_with_model,
args=(video, model_path, output_dir, model_name,
use_gpu, frame_skip, target_size, annotate))
p.start()
processes.append(p)
logger.info(f"Started PID={p.pid}{model_name}")
for p in processes:
p.join()
logger.info(f"PID={p.pid} finished with code {p.exitcode}")
# ============================================================
# MAIN
# ============================================================
if __name__ == "__main__":
mp.set_start_method("spawn", force=True) # IMPORTANT FIX ON WINDOWS
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--videos", "-v", nargs="+", required=True)
parser.add_argument("--models", "-m", nargs="*", default=list(MODEL_PATHS.values()))
parser.add_argument("--output", "-o", default="./action_recognition_outputs")
parser.add_argument("--no-gpu", action="store_true")
parser.add_argument("--frame-skip", type=int, default=5)
parser.add_argument("--no-annotate", action="store_true")
args = parser.parse_args()
run_models_on_videos(
video_paths=args.videos,
model_paths=args.models,
output_dir=args.output,
use_gpu=not args.no_gpu,
frame_skip=max(1, args.frame_skip),
annotate=not args.no_annotate
)