| """
|
| Behavior Analysis Integrator for DetectifAI
|
|
|
| This module integrates behavior analysis (action recognition) into the video processing pipeline.
|
| It processes video segments/keyframes to detect suspicious behaviors like fighting, accidents, and climbing.
|
| Similar to ObjectDetectionIntegrator, it creates behavior-based events and identifies suspicious frames
|
| for facial recognition processing.
|
| """
|
|
|
| import os
|
| import cv2
|
| import time
|
| import logging
|
| import json
|
| from typing import List, Dict, Any, Tuple, Optional
|
| from dataclasses import dataclass, asdict
|
| import numpy as np
|
| import math
|
|
|
|
|
| from behavior_analysis.action_recognition import (
|
| load_model, preprocess_clip, interpret_prediction,
|
| MODEL_PATHS, RESNET_MODELS, YOLO_MODELS, ActionPrediction
|
| )
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| @dataclass
|
| class BehaviorDetectionResult:
|
| """Result of behavior detection on a frame or segment"""
|
| frame_path: str
|
| timestamp: float
|
| frame_index: int
|
| behavior_detected: str
|
| confidence: float
|
| model_used: str
|
| processing_time: float
|
|
|
|
|
| @dataclass
|
| class BehaviorEvent:
|
| """Behavior-based event created from detections"""
|
| event_id: str
|
| behavior_type: str
|
| start_timestamp: float
|
| end_timestamp: float
|
| confidence: float
|
| frame_indices: List[int]
|
| keyframes: List[str]
|
| model_used: str
|
| importance_score: float
|
|
|
|
|
| class BehaviorAnalysisIntegrator:
|
| """Integration layer between behavior analysis and video processing pipeline"""
|
|
|
| def __init__(self, config):
|
| self.config = config
|
| self.enabled = getattr(config, 'enable_behavior_analysis', False)
|
|
|
| logger.info(f"π Initializing BehaviorAnalysisIntegrator - enabled: {self.enabled}")
|
|
|
|
|
| self.models = {}
|
| self.device = None
|
|
|
| if self.enabled:
|
| try:
|
| import torch
|
| self.device = torch.device("cuda" if (torch.cuda.is_available() and getattr(config, 'use_gpu_acceleration', True)) else "cpu")
|
|
|
|
|
| logger.info(f"π§ Attempting to load models from: {MODEL_PATHS}")
|
| for model_name, model_path in MODEL_PATHS.items():
|
| logger.info(f"π Checking model {model_name} at: {model_path}")
|
| if os.path.exists(model_path):
|
| try:
|
| logger.info(f"β³ Loading {model_name}...")
|
| self.models[model_name] = load_model(model_path, self.device)
|
| logger.info(f"β
Loaded behavior analysis model: {model_name}")
|
| except Exception as e:
|
| logger.error(f"β Failed to load {model_name}: {e}")
|
| else:
|
| logger.error(f"β Model file not found: {model_path}")
|
|
|
| if not self.models:
|
| logger.warning("β οΈ No behavior analysis models loaded, disabling behavior analysis")
|
| self.enabled = False
|
| else:
|
| logger.info(f"β
Behavior analysis initialized with {len(self.models)} models")
|
|
|
| except ImportError:
|
| logger.warning("β οΈ PyTorch not available, disabling behavior analysis")
|
| self.enabled = False
|
| else:
|
| logger.info("Behavior analysis disabled in config")
|
|
|
|
|
| self._general_detector = None
|
| if self.enabled:
|
| try:
|
| from ultralytics import YOLO as _YOLO
|
| self._general_detector = _YOLO("yolov8n.pt")
|
| logger.info("β
General YOLO (yolov8n) loaded for fight/accident validation")
|
| except Exception as e:
|
| logger.warning(f"β οΈ General YOLO not available β fight/accident validation disabled: {e}")
|
|
|
| def detect_behavior_in_frame(self, frame_path: str, timestamp: float, frame_index: int = 0) -> List[BehaviorDetectionResult]:
|
| """
|
| Detect behaviors in a single frame
|
|
|
| Args:
|
| frame_path: Path to frame image
|
| timestamp: Timestamp in seconds
|
| frame_index: Frame index number
|
|
|
| Returns:
|
| List of BehaviorDetectionResult objects (one per model)
|
| """
|
| if not self.enabled or not self.models:
|
| return []
|
|
|
| if not os.path.exists(frame_path):
|
| logger.warning(f"Frame not found: {frame_path}")
|
| return []
|
|
|
| results = []
|
| frame = cv2.imread(frame_path)
|
| if frame is None:
|
| logger.warning(f"Failed to read frame: {frame_path}")
|
| return []
|
|
|
| for model_name, model in self.models.items():
|
| try:
|
| start_time = time.time()
|
|
|
|
|
| if model_name in YOLO_MODELS:
|
| output = model.predict(frame, verbose=False)
|
|
|
| label, conf = interpret_prediction(model, output, model_name)
|
|
|
| logger.info(f"π YOLO model {model_name} prediction: {label} (confidence: {conf:.3f})")
|
|
|
| if label != "no_action":
|
| result = BehaviorDetectionResult(
|
| frame_path=frame_path,
|
| timestamp=timestamp,
|
| frame_index=frame_index,
|
| behavior_detected=label,
|
| confidence=conf,
|
| model_used=model_name,
|
| processing_time=time.time() - start_time
|
| )
|
| results.append(result)
|
|
|
|
|
|
|
|
|
|
|
|
|
| except Exception as e:
|
| logger.error(f"Error detecting behavior with {model_name}: {e}")
|
| continue
|
|
|
| return results
|
|
|
| def detect_behavior_in_segment(self, video_path: str, start_time: float, end_time: float,
|
| frame_indices: List[int] = None) -> List[BehaviorDetectionResult]:
|
| """
|
| Detect behaviors in a video segment (for 3D-ResNet models that need temporal context)
|
|
|
| Args:
|
| video_path: Path to video file
|
| start_time: Start timestamp in seconds
|
| end_time: End timestamp in seconds
|
| frame_indices: Optional list of frame indices to process
|
|
|
| Returns:
|
| List of BehaviorDetectionResult objects
|
| """
|
| if not self.enabled or not self.models:
|
| return []
|
|
|
| if not os.path.exists(video_path):
|
| logger.warning(f"Video not found: {video_path}")
|
| return []
|
|
|
| results = []
|
| cap = cv2.VideoCapture(video_path)
|
| if not cap.isOpened():
|
| logger.error(f"Could not open video: {video_path}")
|
| return []
|
|
|
| fps = cap.get(cv2.CAP_PROP_FPS) or 25
|
| start_frame = int(start_time * fps)
|
| end_frame = int(end_time * fps)
|
|
|
|
|
| frame_buffer = []
|
| cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
|
|
|
| for idx in range(start_frame, min(end_frame, int(cap.get(cv2.CAP_PROP_FRAME_COUNT)))):
|
| ret, frame = cap.read()
|
| if not ret:
|
| break
|
| frame_buffer.append(frame)
|
|
|
| cap.release()
|
|
|
|
|
| mid_frame_idx = (start_frame + end_frame) // 2 if end_frame > start_frame else start_frame
|
| return self._process_frame_buffer(frame_buffer, start_time, end_time, mid_frame_idx, video_path)
|
|
|
| def detect_behavior_in_segment_from_buffer(self, frame_buffer: List[np.ndarray],
|
| start_time: float, end_time: float,
|
| frame_indices: List[int] = None) -> List[BehaviorDetectionResult]:
|
| """
|
| Detect behaviors in a frame buffer (for live streams)
|
|
|
| Args:
|
| frame_buffer: List of frames (numpy arrays)
|
| start_time: Start timestamp in seconds
|
| end_time: End timestamp in seconds
|
| frame_indices: Optional list of frame indices
|
|
|
| Returns:
|
| List of BehaviorDetectionResult objects
|
| """
|
| if not self.enabled or not self.models:
|
| return []
|
|
|
| if len(frame_buffer) < 16:
|
| logger.debug(f"Frame buffer too short ({len(frame_buffer)} frames), skipping 3D-ResNet models")
|
| return []
|
|
|
|
|
| frames_to_process = frame_buffer[-16:] if len(frame_buffer) >= 16 else frame_buffer
|
| mid_frame_idx = len(frame_buffer) // 2 if frame_indices is None else (frame_indices[len(frame_indices) // 2] if frame_indices else len(frame_buffer) // 2)
|
|
|
| return self._process_frame_buffer(frames_to_process, start_time, end_time, mid_frame_idx, "live_stream")
|
|
|
| def _process_frame_buffer(self, frame_buffer: List[np.ndarray], start_time: float,
|
| end_time: float, frame_index: int, video_path: str = "live_stream") -> List[BehaviorDetectionResult]:
|
| """
|
| Process frame buffer with behavior analysis models
|
|
|
| Args:
|
| frame_buffer: List of frames (numpy arrays)
|
| start_time: Start timestamp
|
| end_time: End timestamp
|
| frame_index: Frame index for result
|
| video_path: Path to video file or "live_stream" for live streams
|
|
|
| Returns:
|
| List of BehaviorDetectionResult objects
|
| """
|
| if len(frame_buffer) < 16:
|
| return []
|
|
|
| results = []
|
|
|
|
|
| for model_name, model in self.models.items():
|
| if model_name not in RESNET_MODELS:
|
| continue
|
|
|
| try:
|
| start_time_proc = time.time()
|
|
|
|
|
| clip = preprocess_clip(frame_buffer[-16:], self.device)
|
|
|
| import torch
|
| model.eval()
|
| with torch.no_grad():
|
| output = model(clip)
|
|
|
|
|
| label, conf = interpret_prediction(model, output, model_name)
|
|
|
| logger.info(f"π Model {model_name} prediction: {label} (confidence: {conf:.3f})")
|
|
|
| if label != "no_action":
|
|
|
| mid_frame = frame_buffer[len(frame_buffer) // 2]
|
| label_lower = label.lower()
|
| if label_lower in ("fighting", "fight"):
|
| valid, ctx = self._validate_persons(mid_frame, min_count=2)
|
| if not valid:
|
| logger.info(f"π« Fight suppressed by Option B: <2 persons close together ({label})")
|
| continue
|
| logger.info(f"β
Fight validated by Option B: persons close together")
|
| elif label_lower in ("accident", "road_accident"):
|
| valid, ctx = self._validate_vehicles(mid_frame, min_count=2)
|
| if not valid:
|
| logger.info(f"π« Accident suppressed by Option B: <2 vehicles close together ({label})")
|
| continue
|
| logger.info(f"β
Accident validated by Option B: vehicles close together")
|
|
|
|
|
| mid_timestamp = (start_time + end_time) / 2
|
|
|
| result = BehaviorDetectionResult(
|
| frame_path="live_stream",
|
| timestamp=mid_timestamp,
|
| frame_index=frame_index,
|
| behavior_detected=label,
|
| confidence=conf,
|
| model_used=model_name,
|
| processing_time=time.time() - start_time_proc
|
| )
|
| results.append(result)
|
|
|
| except Exception as e:
|
| logger.error(f"Error detecting behavior with {model_name} in segment: {e}")
|
| continue
|
|
|
| return results
|
|
|
| def detect_behavior_in_keyframes(self, keyframes: List, video_path: str = None) -> List[BehaviorDetectionResult]:
|
| """
|
| Detect behaviors in keyframes
|
|
|
| Args:
|
| keyframes: List of KeyframeResult objects
|
| video_path: Optional path to video file (needed for 3D-ResNet models)
|
|
|
| Returns:
|
| List of BehaviorDetectionResult objects
|
| """
|
| if not self.enabled:
|
| logger.info("π« Behavior analysis disabled, skipping")
|
| return []
|
|
|
| logger.info(f"π¬ Starting behavior detection on {len(keyframes)} keyframes")
|
| logger.info(f"πΉ Video path provided: {video_path}")
|
| logger.info(f"π€ Available models: {list(self.models.keys())}")
|
|
|
| logger.info(f"π Running behavior analysis on {len(keyframes)} keyframes...")
|
|
|
| all_results = []
|
|
|
|
|
| yolo_models_available = [m for m in self.models.keys() if m in YOLO_MODELS]
|
| logger.info(f"π― Processing YOLO models (single frame): {yolo_models_available}")
|
|
|
| for i, keyframe in enumerate(keyframes):
|
|
|
| frame_path = None
|
| timestamp = 0.0
|
| frame_index = i
|
|
|
| if hasattr(keyframe, 'frame_data'):
|
| frame_path = keyframe.frame_data.frame_path if hasattr(keyframe.frame_data, 'frame_path') else None
|
| timestamp = keyframe.frame_data.timestamp if hasattr(keyframe.frame_data, 'timestamp') else 0.0
|
| elif hasattr(keyframe, 'frame_path'):
|
| frame_path = keyframe.frame_path
|
| timestamp = getattr(keyframe, 'timestamp', 0.0)
|
|
|
| if frame_path and os.path.exists(frame_path):
|
|
|
| frame_results = self.detect_behavior_in_frame(frame_path, timestamp, frame_index)
|
| all_results.extend(frame_results)
|
|
|
|
|
| if video_path and os.path.exists(video_path) and RESNET_MODELS:
|
| resnet_models_available = [m for m in self.models.keys() if m in RESNET_MODELS]
|
| logger.info(f"π¬ Processing 3D-ResNet models using video segments...")
|
| logger.info(f"π Available ResNet models: {resnet_models_available}")
|
| logger.info(f"π Total ResNet models to process: {len(resnet_models_available)}")
|
|
|
|
|
|
|
| segment_window = 1.0
|
|
|
| processed_segments = set()
|
|
|
| for keyframe in keyframes:
|
| timestamp = 0.0
|
| if hasattr(keyframe, 'frame_data'):
|
| timestamp = keyframe.frame_data.timestamp if hasattr(keyframe.frame_data, 'timestamp') else 0.0
|
| elif hasattr(keyframe, 'timestamp'):
|
| timestamp = getattr(keyframe, 'timestamp', 0.0)
|
|
|
| if timestamp > 0:
|
|
|
| start_time = max(0, timestamp - segment_window / 2)
|
| end_time = timestamp + segment_window / 2
|
|
|
|
|
| segment_key = (int(start_time * 10), int(end_time * 10))
|
|
|
| if segment_key not in processed_segments:
|
| processed_segments.add(segment_key)
|
|
|
| try:
|
| logger.info(f"π₯ Processing video segment: {start_time:.1f}s - {end_time:.1f}s")
|
|
|
| segment_results = self.detect_behavior_in_segment(
|
| video_path=video_path,
|
| start_time=start_time,
|
| end_time=end_time,
|
| frame_indices=None
|
| )
|
| logger.info(f"π Segment results: {len(segment_results)} detections")
|
| for result in segment_results:
|
| logger.info(f"π Detected: {result.behavior_detected} (conf: {result.confidence:.3f})")
|
| all_results.extend(segment_results)
|
| except Exception as e:
|
| logger.error(f"β Error processing segment {start_time:.1f}s-{end_time:.1f}s: {e}")
|
| continue
|
|
|
| logger.info(f"β
Behavior analysis complete: {len(all_results)} behaviors detected")
|
| return all_results
|
|
|
| def create_behavior_events(self, detection_results: List[BehaviorDetectionResult],
|
| temporal_window: float = 5.0) -> List[BehaviorEvent]:
|
| """
|
| Create behavior-based events from detection results
|
|
|
| Args:
|
| detection_results: List of BehaviorDetectionResult objects
|
| temporal_window: Time window in seconds for grouping detections
|
|
|
| Returns:
|
| List of BehaviorEvent objects
|
| """
|
| if not detection_results:
|
| return []
|
|
|
|
|
| events = []
|
| sorted_results = sorted(detection_results, key=lambda x: x.timestamp)
|
|
|
| current_event = None
|
| event_id_counter = 0
|
|
|
| for result in sorted_results:
|
| if result.behavior_detected == "no_action":
|
| continue
|
|
|
| if current_event is None:
|
|
|
| event_id_counter += 1
|
| current_event = {
|
| 'event_id': f"behavior_{result.behavior_detected}_{event_id_counter}",
|
| 'behavior_type': result.behavior_detected,
|
| 'start_timestamp': result.timestamp,
|
| 'end_timestamp': result.timestamp,
|
| 'confidences': [result.confidence],
|
| 'frame_indices': [result.frame_index],
|
| 'keyframes': [result.frame_path],
|
| 'model_used': result.model_used
|
| }
|
| elif (result.behavior_detected == current_event['behavior_type'] and
|
| result.timestamp - current_event['end_timestamp'] <= temporal_window):
|
|
|
| current_event['end_timestamp'] = result.timestamp
|
| current_event['confidences'].append(result.confidence)
|
| current_event['frame_indices'].append(result.frame_index)
|
| current_event['keyframes'].append(result.frame_path)
|
| else:
|
|
|
| avg_confidence = sum(current_event['confidences']) / len(current_event['confidences'])
|
| importance = avg_confidence * (current_event['end_timestamp'] - current_event['start_timestamp'] + 1)
|
|
|
| behavior_event = BehaviorEvent(
|
| event_id=current_event['event_id'],
|
| behavior_type=current_event['behavior_type'],
|
| start_timestamp=current_event['start_timestamp'],
|
| end_timestamp=current_event['end_timestamp'],
|
| confidence=avg_confidence,
|
| frame_indices=current_event['frame_indices'],
|
| keyframes=current_event['keyframes'],
|
| model_used=current_event['model_used'],
|
| importance_score=importance
|
| )
|
| events.append(behavior_event)
|
|
|
|
|
| event_id_counter += 1
|
| current_event = {
|
| 'event_id': f"behavior_{result.behavior_detected}_{event_id_counter}",
|
| 'behavior_type': result.behavior_detected,
|
| 'start_timestamp': result.timestamp,
|
| 'end_timestamp': result.timestamp,
|
| 'confidences': [result.confidence],
|
| 'frame_indices': [result.frame_index],
|
| 'keyframes': [result.frame_path],
|
| 'model_used': result.model_used
|
| }
|
|
|
|
|
| if current_event:
|
| avg_confidence = sum(current_event['confidences']) / len(current_event['confidences'])
|
| importance = avg_confidence * (current_event['end_timestamp'] - current_event['start_timestamp'] + 1)
|
|
|
| behavior_event = BehaviorEvent(
|
| event_id=current_event['event_id'],
|
| behavior_type=current_event['behavior_type'],
|
| start_timestamp=current_event['start_timestamp'],
|
| end_timestamp=current_event['end_timestamp'],
|
| confidence=avg_confidence,
|
| frame_indices=current_event['frame_indices'],
|
| keyframes=current_event['keyframes'],
|
| model_used=current_event['model_used'],
|
| importance_score=importance
|
| )
|
| events.append(behavior_event)
|
|
|
| logger.info(f"β
Created {len(events)} behavior-based events")
|
| return events
|
|
|
| def process_keyframes_with_behavior_analysis(self, keyframes: List, video_path: str = None) -> Tuple[List[BehaviorDetectionResult], List[BehaviorEvent]]:
|
| """
|
| Process keyframes with behavior analysis and create behavior-based events
|
|
|
| Args:
|
| keyframes: List of KeyframeResult objects
|
| video_path: Optional path to video file (needed for 3D-ResNet models)
|
|
|
| Returns:
|
| Tuple of (detection_results, behavior_events)
|
| """
|
| if not self.enabled:
|
| logger.info("π« Behavior analysis disabled, skipping...")
|
| return [], []
|
|
|
| logger.info("π ===== STARTING BEHAVIOR ANALYSIS INTEGRATION =====")
|
| logger.info(f"π Input: {len(keyframes)} keyframes, video_path: {video_path}")
|
| logger.info(f"π€ Loaded models: {list(self.models.keys())}")
|
| logger.info(f"βοΈ Confidence thresholds: fighting={getattr(self.config, 'fighting_detection_confidence', 0.5)}, accident={getattr(self.config, 'accident_detection_confidence', 0.6)}, climbing={getattr(self.config, 'climbing_detection_confidence', 0.7)}")
|
|
|
| logger.info("π Starting behavior analysis integration")
|
|
|
|
|
| detection_results = self.detect_behavior_in_keyframes(keyframes, video_path=video_path)
|
|
|
|
|
| temporal_window = getattr(self.config, 'behavior_event_temporal_window', 5.0)
|
| logger.info(f"π
Creating behavior events with temporal window: {temporal_window}s")
|
| logger.info(f"π Total detections to process: {len(detection_results)}")
|
|
|
| positive_detections = [r for r in detection_results if r.behavior_detected != "no_action"]
|
| logger.info(f"β
Positive detections: {len(positive_detections)}")
|
| for detection in positive_detections:
|
| logger.info(f" π― {detection.behavior_detected} at {detection.timestamp:.1f}s (conf: {detection.confidence:.3f})")
|
|
|
| behavior_events = self.create_behavior_events(detection_results, temporal_window)
|
|
|
|
|
| if hasattr(self.config, 'output_base_dir') and detection_results:
|
| detection_metadata = {
|
| 'total_keyframes': len(keyframes),
|
| 'frames_with_behaviors': len([r for r in detection_results if r.behavior_detected != "no_action"]),
|
| 'behaviors_detected': {
|
| 'fighting': len([r for r in detection_results if r.behavior_detected == "fighting"]),
|
| 'accident': len([r for r in detection_results if r.behavior_detected == "accident"]),
|
| 'climbing': len([r for r in detection_results if r.behavior_detected == "climbing"])
|
| },
|
| 'total_events': len(behavior_events),
|
| 'detection_summary': [asdict(r) for r in detection_results[:10]]
|
| }
|
|
|
| metadata_path = os.path.join(self.config.output_base_dir, 'behavior_analysis_metadata.json')
|
| os.makedirs(os.path.dirname(metadata_path), exist_ok=True)
|
|
|
| with open(metadata_path, 'w') as f:
|
| json.dump(detection_metadata, f, indent=2, default=str)
|
|
|
| logger.info(f"π Behavior analysis metadata saved: {metadata_path}")
|
|
|
| logger.info("π ===== BEHAVIOR ANALYSIS INTEGRATION COMPLETE =====")
|
| logger.info(f"π Summary:")
|
| logger.info(f" π Total detections: {len(detection_results)}")
|
| logger.info(f" β
Positive detections: {len([r for r in detection_results if r.behavior_detected != 'no_action'])}")
|
| logger.info(f" π
Events created: {len(behavior_events)}")
|
|
|
| for event in behavior_events:
|
| logger.info(f" π¬ Event: {event.behavior_type} ({event.start_timestamp:.1f}s-{event.end_timestamp:.1f}s, conf: {event.confidence:.3f})")
|
|
|
| return detection_results, behavior_events
|
|
|
| def get_suspicious_frames(self, detection_results: List[BehaviorDetectionResult]) -> List[BehaviorDetectionResult]:
|
| """
|
| Get frames with suspicious behaviors (for facial recognition processing)
|
|
|
| Args:
|
| detection_results: List of BehaviorDetectionResult objects
|
|
|
| Returns:
|
| List of suspicious BehaviorDetectionResult objects
|
| """
|
| suspicious = [r for r in detection_results if r.behavior_detected != "no_action"]
|
| logger.info(f"π Identified {len(suspicious)} suspicious frames from behavior analysis")
|
| return suspicious
|
|
|
|
|
|
|
|
|
|
|
| def _validate_persons(self, frame: np.ndarray, min_count: int = 2) -> Tuple[bool, list]:
|
| """Return (True, detections) if β₯ min_count persons are close together."""
|
| if self._general_detector is None:
|
| return True, []
|
| try:
|
| preds = self._general_detector(frame, conf=0.35, verbose=False)
|
| persons = []
|
| for r in preds:
|
| for box in r.boxes:
|
| if int(box.cls) == 0:
|
| x1, y1, x2, y2 = box.xyxy[0].tolist()
|
| persons.append({"class": "person", "confidence": float(box.conf),
|
| "bbox": [x1, y1, x2, y2]})
|
| if len(persons) < min_count:
|
| logger.info(f"π« Person check: only {len(persons)} person(s) β need {min_count}+")
|
| return False, persons
|
| for i in range(len(persons)):
|
| for j in range(i + 1, len(persons)):
|
| if self._boxes_are_close(persons[i]["bbox"], persons[j]["bbox"], frame.shape):
|
| logger.info(f"β
Person check passed: {len(persons)} persons, pair ({i},{j}) close")
|
| return True, persons
|
| logger.info(f"π« Person check: {len(persons)} persons but none close together")
|
| return False, persons
|
| except Exception as e:
|
| logger.warning(f"β οΈ Person validation error: {e}")
|
| return True, []
|
|
|
| def _validate_vehicles(self, frame: np.ndarray, min_count: int = 2) -> Tuple[bool, list]:
|
| """Return (True, detections) if β₯ min_count vehicles are close together."""
|
| if self._general_detector is None:
|
| return True, []
|
| try:
|
| preds = self._general_detector(frame, conf=0.35, verbose=False)
|
| vehicle_classes = {2: "car", 3: "motorcycle", 5: "bus", 7: "truck"}
|
| vehicles = []
|
| for r in preds:
|
| for box in r.boxes:
|
| cid = int(box.cls)
|
| if cid in vehicle_classes:
|
| x1, y1, x2, y2 = box.xyxy[0].tolist()
|
| vehicles.append({"class": vehicle_classes[cid],
|
| "confidence": float(box.conf),
|
| "bbox": [x1, y1, x2, y2]})
|
| if len(vehicles) < min_count:
|
| logger.info(f"π« Vehicle check: only {len(vehicles)} vehicle(s) β need {min_count}+")
|
| return False, vehicles
|
| for i in range(len(vehicles)):
|
| for j in range(i + 1, len(vehicles)):
|
| if self._boxes_are_close(vehicles[i]["bbox"], vehicles[j]["bbox"], frame.shape):
|
| logger.info(f"β
Vehicle check passed: {len(vehicles)} vehicles, pair ({i},{j}) close")
|
| return True, vehicles
|
| logger.info(f"π« Vehicle check: {len(vehicles)} vehicles but none close together")
|
| return False, vehicles
|
| except Exception as e:
|
| logger.warning(f"β οΈ Vehicle validation error: {e}")
|
| return True, []
|
|
|
| @staticmethod
|
| def _boxes_are_close(bbox1, bbox2, frame_shape, distance_ratio: float = 0.15) -> bool:
|
| """True if two boxes overlap or are within *distance_ratio* of frame diagonal."""
|
| x1a, y1a, x2a, y2a = bbox1
|
| x1b, y1b, x2b, y2b = bbox2
|
|
|
| if min(x2a, x2b) > max(x1a, x1b) and min(y2a, y2b) > max(y1a, y1b):
|
| return True
|
|
|
| dx = max(0, max(x1a, x1b) - min(x2a, x2b))
|
| dy = max(0, max(y1a, y1b) - min(y2a, y2b))
|
| dist = math.hypot(dx, dy)
|
| diag = math.hypot(frame_shape[0], frame_shape[1])
|
| return dist < diag * distance_ratio
|
|
|
| def get_behavior_analysis_summary(self) -> Dict[str, Any]:
|
| """Get summary statistics of behavior analysis"""
|
| return {
|
| 'enabled': self.enabled,
|
| 'models_loaded': list(self.models.keys()) if self.models else [],
|
| 'device': str(self.device) if self.device else None
|
| }
|
|
|
|
|