| """
|
| Live Stream Processor for DetectifAI
|
|
|
| Processes live webcam/CCTV footage through the same pipeline as uploaded videos:
|
| - Object detection (fire, weapons)
|
| - Behavior analysis (fighting, accidents, climbing)
|
| - Facial recognition on suspicious frames
|
| - Real-time event detection
|
| - Storage in MongoDB and MinIO
|
| """
|
|
|
| import cv2
|
| import numpy as np
|
| import io
|
| import os
|
| import time
|
| import threading
|
| import logging
|
| import uuid
|
| from datetime import datetime
|
| from typing import Optional, Dict, Any, List, Tuple
|
| from pathlib import Path
|
|
|
| from config import VideoProcessingConfig, get_security_focused_config
|
| from object_detection import ObjectDetector
|
| from behavior_analysis_integrator import BehaviorAnalysisIntegrator
|
| from database.config import DatabaseManager
|
| from database.repositories import VideoRepository, EventRepository
|
| from database.keyframe_repository import KeyframeRepository
|
|
|
|
|
| try:
|
| from real_time_alerts import get_alert_engine, RealTimeAlertEngine
|
| ALERTS_AVAILABLE = True
|
| except ImportError:
|
| ALERTS_AVAILABLE = False
|
| logging.warning("Real-time alerts module not available")
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| class LiveStreamProcessor:
|
| """Process live video streams with DetectifAI pipeline"""
|
|
|
| def __init__(self, config: VideoProcessingConfig = None, camera_id: str = "webcam_01",
|
| object_detector=None, behavior_analyzer=None):
|
| """
|
| Initialize live stream processor
|
|
|
| Args:
|
| config: VideoProcessingConfig object
|
| camera_id: Unique identifier for the camera/stream
|
| object_detector: Pre-loaded ObjectDetector (shared from main pipeline)
|
| behavior_analyzer: Pre-loaded BehaviorAnalysisIntegrator (shared)
|
| """
|
| self.config = config or get_security_focused_config()
|
| self.camera_id = camera_id
|
| self.is_processing = False
|
| self.cap = None
|
| self.camera_index = 0
|
| self.frame_count = 0
|
| self.last_keyframe_time = 0
|
| self.keyframe_interval = 1.0
|
|
|
|
|
| try:
|
| self.db_manager = DatabaseManager()
|
| self.video_repo = VideoRepository(self.db_manager)
|
| self.event_repo = EventRepository(self.db_manager)
|
| self.keyframe_repo = KeyframeRepository(self.db_manager)
|
| except Exception as e:
|
| logger.warning(f"โ ๏ธ Database connection failed (live stream continues without DB): {e}")
|
| self.db_manager = None
|
| self.video_repo = None
|
| self.event_repo = None
|
| self.keyframe_repo = None
|
|
|
|
|
| self.object_detector = object_detector
|
| if self.object_detector is None and self.config.enable_object_detection:
|
| try:
|
| self.object_detector = ObjectDetector(self.config)
|
| logger.info("โ
Object detection enabled for live stream (new instance)")
|
| except Exception as e:
|
| logger.warning(f"โ ๏ธ Object detection initialization failed: {e}")
|
| self.config.enable_object_detection = False
|
| elif self.object_detector is not None:
|
| logger.info("โ
Object detection enabled for live stream (shared instance)")
|
|
|
| self.behavior_analyzer = behavior_analyzer
|
| if self.behavior_analyzer is None and getattr(self.config, 'enable_behavior_analysis', False):
|
| try:
|
| self.behavior_analyzer = BehaviorAnalysisIntegrator(self.config)
|
| logger.info("โ
Behavior analysis enabled for live stream (new instance)")
|
| except Exception as e:
|
| logger.warning(f"โ ๏ธ Behavior analysis initialization failed: {e}")
|
| self.config.enable_behavior_analysis = False
|
| elif self.behavior_analyzer is not None:
|
| logger.info("โ
Behavior analysis enabled for live stream (shared instance)")
|
|
|
|
|
| self.general_detector = None
|
| try:
|
| from ultralytics import YOLO as _YOLO
|
| self.general_detector = _YOLO("yolov8n.pt")
|
| logger.info("โ
General YOLO detector (yolov8n) loaded for person/car validation")
|
| except Exception as e:
|
| logger.warning(f"โ ๏ธ General YOLO detector not available: {e}")
|
|
|
|
|
| self.face_recognizer = None
|
| if getattr(self.config, 'enable_facial_recognition', False):
|
| try:
|
| from facial_recognition import FacialRecognitionIntegrated
|
| self.face_recognizer = FacialRecognitionIntegrated(self.config)
|
| logger.info("โ
Facial recognition enabled for live stream")
|
| except Exception as e:
|
| logger.warning(f"โ ๏ธ Facial recognition initialization failed: {e}")
|
|
|
|
|
| self.frame_buffer = []
|
| self.buffer_size = 16
|
|
|
|
|
| self.last_behavior_analysis_time = 0
|
| self.behavior_analysis_interval = 7.0
|
|
|
|
|
| self.prev_frame_gray = None
|
| self.motion_threshold = 25
|
|
|
|
|
| self.alert_engine = None
|
| if ALERTS_AVAILABLE:
|
| try:
|
| self.alert_engine = get_alert_engine()
|
| self.alert_engine.load_flagged_persons()
|
| logger.info("โ
Real-time alert engine connected for live stream")
|
| except Exception as e:
|
| logger.warning(f"โ ๏ธ Alert engine initialization failed: {e}")
|
|
|
|
|
| self.stats = {
|
| 'frames_processed': 0,
|
| 'keyframes_extracted': 0,
|
| 'objects_detected': 0,
|
| 'behaviors_detected': 0,
|
| 'events_created': 0,
|
| 'alerts_generated': 0,
|
| 'start_time': None
|
| }
|
|
|
| logger.info(f"โ
Live stream processor initialized for camera: {camera_id}")
|
|
|
| def preprocess_frame(self, frame: np.ndarray) -> Optional[np.ndarray]:
|
| """
|
| Preprocess frame: resize, enhance, check quality
|
|
|
| Args:
|
| frame: Input frame from camera
|
|
|
| Returns:
|
| Preprocessed frame or None if frame is too blurry
|
| """
|
| if frame is None:
|
| return None
|
|
|
|
|
| target_size = (640, 640)
|
| processed = cv2.resize(frame, target_size)
|
|
|
|
|
| gray = cv2.cvtColor(processed, cv2.COLOR_BGR2GRAY)
|
| laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
|
|
|
|
|
| if laplacian_var < 100:
|
| return None
|
|
|
| return processed
|
|
|
| def detect_motion(self, frame_gray: np.ndarray) -> Tuple[bool, float]:
|
| """
|
| Detect motion in frame
|
|
|
| Args:
|
| frame_gray: Grayscale frame
|
|
|
| Returns:
|
| (motion_detected, motion_score)
|
| """
|
| if self.prev_frame_gray is None:
|
| self.prev_frame_gray = frame_gray
|
| return False, 0.0
|
|
|
| diff = cv2.absdiff(self.prev_frame_gray, frame_gray)
|
| self.prev_frame_gray = frame_gray
|
|
|
| motion_score = np.sum(diff > self.motion_threshold)
|
| motion_detected = motion_score > 5000
|
|
|
| return motion_detected, float(motion_score)
|
|
|
| def process_frame(self, frame: np.ndarray, timestamp: float) -> Dict[str, Any]:
|
| """
|
| Process a single frame through the pipeline
|
|
|
| Args:
|
| frame: Input frame
|
| timestamp: Frame timestamp in seconds
|
|
|
| Returns:
|
| Processing results dictionary
|
| """
|
| results = {
|
| 'timestamp': timestamp,
|
| 'frame_count': self.frame_count,
|
| 'objects_detected': [],
|
| 'behaviors_detected': [],
|
| 'motion_detected': False,
|
| 'motion_score': 0.0,
|
| 'events': []
|
| }
|
|
|
|
|
| processed_frame = self.preprocess_frame(frame)
|
| if processed_frame is None:
|
| return results
|
|
|
|
|
| gray = cv2.cvtColor(processed_frame, cv2.COLOR_BGR2GRAY)
|
| motion_detected, motion_score = self.detect_motion(gray)
|
| results['motion_detected'] = motion_detected
|
| results['motion_score'] = motion_score
|
|
|
|
|
| self.frame_buffer.append(processed_frame.copy())
|
| if len(self.frame_buffer) > self.buffer_size:
|
| self.frame_buffer.pop(0)
|
|
|
|
|
|
|
| should_run_detection = motion_detected or (self.frame_count % 30 == 0)
|
|
|
| if self.object_detector and should_run_detection:
|
| try:
|
|
|
| from core.video_processing import KeyframeResult, FrameData
|
| frame_data = FrameData(
|
| frame_path=None,
|
| timestamp=timestamp,
|
| frame_number=self.frame_count,
|
| quality_score=0.8,
|
| motion_score=1.0 if motion_detected else 0.0,
|
| burst_active=False,
|
| enhancement_applied=False
|
| )
|
| keyframe = KeyframeResult(
|
| frame_data=frame_data,
|
| keyframe_score=0.8,
|
| selection_reason='live_frame'
|
| )
|
|
|
|
|
| import tempfile
|
| temp_dir = tempfile.gettempdir()
|
| temp_frame_path = os.path.join(temp_dir, f"live_frame_{self.camera_id}_{self.frame_count}.jpg")
|
| cv2.imwrite(temp_frame_path, processed_frame)
|
| keyframe.frame_data.frame_path = temp_frame_path
|
|
|
|
|
| detection_result = self.object_detector.detect_objects_in_keyframes([keyframe])
|
| if detection_result and len(detection_result) > 0:
|
| detections = detection_result[0]
|
| if hasattr(detections, 'total_detections') and detections.total_detections > 0:
|
| results['objects_detected'] = [
|
| {
|
| 'class': det.class_name,
|
| 'confidence': float(det.confidence),
|
| 'bbox': det.bbox
|
| }
|
| for det in detections.detected_objects
|
| ]
|
| self.stats['objects_detected'] += len(results['objects_detected'])
|
|
|
|
|
| obj_classes = [obj['class'] for obj in results['objects_detected']]
|
| logger.info(f"๐ฏ REAL-TIME DETECTION: {len(results['objects_detected'])} object(s) detected: {', '.join(obj_classes)} (frame {self.frame_count})")
|
|
|
|
|
| if self.alert_engine:
|
| for det in results['objects_detected']:
|
| alert = self.alert_engine.process_detection(
|
| camera_id=self.camera_id,
|
| detection_class=det['class'],
|
| confidence=det['confidence'],
|
| bounding_boxes=[det],
|
| frame=processed_frame,
|
| timestamp=timestamp,
|
| video_id=f"live_{self.camera_id}",
|
| )
|
| if alert:
|
| self.stats['alerts_generated'] = self.stats.get('alerts_generated', 0) + 1
|
|
|
|
|
| try:
|
| os.remove(temp_frame_path)
|
| except:
|
| pass
|
|
|
| except Exception as e:
|
| logger.warning(f"Error in object detection: {e}")
|
|
|
|
|
|
|
|
|
| behavior_ready = (
|
| self.behavior_analyzer
|
| and len(self.frame_buffer) >= 16
|
| and (time.time() - self.last_behavior_analysis_time >= self.behavior_analysis_interval)
|
| )
|
| if behavior_ready:
|
| try:
|
|
|
| behavior_results = self.behavior_analyzer.detect_behavior_in_segment_from_buffer(
|
| frame_buffer=self.frame_buffer,
|
| start_time=timestamp - (len(self.frame_buffer) / 30.0),
|
| end_time=timestamp,
|
| frame_indices=list(range(max(0, self.frame_count - len(self.frame_buffer) + 1), self.frame_count + 1))
|
| )
|
|
|
| self.last_behavior_analysis_time = time.time()
|
|
|
| if behavior_results:
|
| results['behaviors_detected'] = [
|
| {
|
| 'behavior_type': r.behavior_detected,
|
| 'behavior': r.behavior_detected,
|
| 'confidence': float(r.confidence),
|
| 'model': r.model_used
|
| }
|
| for r in behavior_results
|
| ]
|
| self.stats['behaviors_detected'] += len(results['behaviors_detected'])
|
|
|
|
|
| behavior_types = [b['behavior_type'] for b in results['behaviors_detected']]
|
| logger.info(f"๐ญ REAL-TIME BEHAVIOR: {len(results['behaviors_detected'])} behavior(s) detected: {', '.join(behavior_types)} (frame {self.frame_count})")
|
|
|
|
|
|
|
| if self.alert_engine:
|
| for beh in results['behaviors_detected']:
|
| btype = beh['behavior_type'].lower()
|
|
|
|
|
| if btype in ('fighting', 'fight'):
|
| valid, person_dets = self._validate_fight_with_yolo(processed_frame)
|
| if not valid:
|
| logger.info(f"๐ซ Skipping fight alert โ YOLO person check failed")
|
| continue
|
|
|
| beh['yolo_context'] = person_dets
|
|
|
|
|
| elif btype in ('accident', 'road_accident'):
|
| valid, vehicle_dets = self._validate_accident_with_yolo(processed_frame)
|
| if not valid:
|
| logger.info(f"๐ซ Skipping accident alert โ YOLO vehicle check failed")
|
| continue
|
| beh['yolo_context'] = vehicle_dets
|
|
|
| alert = self.alert_engine.process_detection(
|
| camera_id=self.camera_id,
|
| detection_class=beh['behavior_type'],
|
| confidence=beh['confidence'],
|
| frame=processed_frame,
|
| timestamp=timestamp,
|
| video_id=f"live_{self.camera_id}",
|
| )
|
| if alert:
|
| self.stats['alerts_generated'] = self.stats.get('alerts_generated', 0) + 1
|
| except Exception as e:
|
| logger.warning(f"Error in behavior analysis: {e}")
|
|
|
|
|
| if self.face_recognizer and (results['objects_detected'] or results['behaviors_detected']):
|
| try:
|
|
|
| face_results = self.face_recognizer.detect_faces_in_frame(
|
| processed_frame,
|
| timestamp=timestamp,
|
| event_id=f"live_{self.camera_id}_{int(timestamp)}"
|
| )
|
| if face_results:
|
| results['faces_detected'] = len(face_results)
|
|
|
|
|
| if self.alert_engine:
|
| for face in face_results:
|
| face_id = face.get('face_id') if isinstance(face, dict) else getattr(face, 'face_id', None)
|
| match_score = face.get('confidence', 0.0) if isinstance(face, dict) else getattr(face, 'confidence_score', 0.0)
|
| if face_id and match_score:
|
| alert = self.alert_engine.process_suspicious_person(
|
| camera_id=self.camera_id,
|
| face_id=str(face_id),
|
| face_match_score=float(match_score),
|
| frame=processed_frame,
|
| timestamp=timestamp,
|
| )
|
| if alert:
|
| self.stats['alerts_generated'] = self.stats.get('alerts_generated', 0) + 1
|
| except Exception as e:
|
| logger.warning(f"Error in facial recognition: {e}")
|
|
|
| return results
|
|
|
| def save_keyframe(self, frame: np.ndarray, results: Dict[str, Any], timestamp: float) -> Optional[str]:
|
| """
|
| Save keyframe to MinIO and MongoDB (matches uploaded video pipeline)
|
|
|
| Args:
|
| frame: Frame to save
|
| results: Processing results
|
| timestamp: Frame timestamp
|
|
|
| Returns:
|
| MinIO object path or None
|
| """
|
| try:
|
|
|
| is_success, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
|
| if not is_success:
|
| logger.warning(f"โ ๏ธ Failed to encode frame {self.frame_count} as JPEG")
|
| return None
|
|
|
| frame_bytes = buffer.tobytes()
|
| frame_size = len(frame_bytes)
|
|
|
|
|
| timestamp_str = datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f")
|
| object_name = f"live/{self.camera_id}/{timestamp_str}.jpg"
|
|
|
|
|
| minio_client = self.keyframe_repo.minio
|
| bucket = self.keyframe_repo.bucket
|
|
|
| logger.info(f"๐ค Uploading keyframe to MinIO: {bucket}/{object_name} ({frame_size} bytes)")
|
|
|
|
|
| from io import BytesIO
|
| frame_buffer = BytesIO(frame_bytes)
|
|
|
|
|
| metadata = {
|
| "frame_index": str(self.frame_count),
|
| "timestamp": str(timestamp),
|
| "camera_id": self.camera_id,
|
| "motion_detected": str(results.get('motion_detected', False)),
|
| "motion_score": str(results.get('motion_score', 0.0))
|
| }
|
|
|
| minio_client.put_object(
|
| bucket,
|
| object_name,
|
| frame_buffer,
|
| length=frame_size,
|
| content_type="image/jpeg",
|
| metadata=metadata
|
| )
|
|
|
| logger.info(f"โ
Uploaded keyframe to MinIO: {bucket}/{object_name}")
|
|
|
|
|
| keyframe_doc = {
|
| "camera_id": self.camera_id,
|
| "video_id": f"live_{self.camera_id}",
|
| "timestamp": timestamp,
|
| "timestamp_ms": int(timestamp * 1000),
|
| "frame_index": self.frame_count,
|
| "frame_number": self.frame_count,
|
| "minio_path": object_name,
|
| "minio_bucket": bucket,
|
| "objects_detected": results.get('objects_detected', []),
|
| "behaviors_detected": results.get('behaviors_detected', []),
|
| "motion_detected": results.get('motion_detected', False),
|
| "motion_score": results.get('motion_score', 0.0),
|
| "created_at": datetime.utcnow()
|
| }
|
|
|
|
|
| keyframe_id = self.keyframe_repo.create_keyframe(keyframe_doc)
|
| if keyframe_id:
|
| logger.info(f"โ
Saved keyframe metadata to MongoDB: {object_name} (ID: {keyframe_id})")
|
| else:
|
| logger.warning(f"โ ๏ธ Failed to save keyframe metadata to MongoDB: {object_name}")
|
|
|
| self.stats['keyframes_extracted'] += 1
|
|
|
|
|
| return f"{bucket}/{object_name}"
|
|
|
| except Exception as e:
|
| logger.error(f"โ Error saving keyframe: {e}")
|
| import traceback
|
| logger.error(traceback.format_exc())
|
| return None
|
|
|
| def create_event(self, results: Dict[str, Any], start_time: float, end_time: float) -> Optional[str]:
|
| """
|
| Create event from processing results (matches uploaded video pipeline)
|
|
|
| Args:
|
| results: Processing results
|
| start_time: Event start time
|
| end_time: Event end time
|
|
|
| Returns:
|
| Event ID or None
|
| """
|
| try:
|
|
|
| event_type = "motion"
|
| if results.get('objects_detected'):
|
|
|
| primary_object = results['objects_detected'][0].get('class', 'object')
|
| event_type = f"object_detection_{primary_object}"
|
| elif results.get('behaviors_detected'):
|
| primary_behavior = results['behaviors_detected'][0].get('behavior_type', 'behavior')
|
| event_type = f"behavior_detection_{primary_behavior}"
|
|
|
|
|
| confidences = []
|
| if results.get('objects_detected'):
|
| confidences.extend([float(r.get('confidence', 0.0)) for r in results['objects_detected']])
|
| if results.get('behaviors_detected'):
|
| confidences.extend([float(r.get('confidence', 0.0)) for r in results['behaviors_detected']])
|
| max_confidence = max(confidences) if confidences else 0.0
|
|
|
|
|
| bounding_boxes = {}
|
| if results.get('objects_detected'):
|
| bounding_boxes["detections"] = [
|
| {
|
| "class": det.get('class', 'unknown'),
|
| "confidence": float(det.get('confidence', 0.0)),
|
| "bbox": [float(x) for x in det.get('bbox', [0, 0, 0, 0])],
|
| "timestamp": float(start_time),
|
| "model": det.get('detection_model', 'fire' if det.get('class') == 'fire' else 'weapon')
|
|
|
| }
|
| for det in results['objects_detected']
|
| ]
|
|
|
|
|
| event_doc = {
|
| "event_id": f"live_{self.camera_id}_{int(start_time)}_{uuid.uuid4().hex[:8]}",
|
| "camera_id": self.camera_id,
|
| "video_id": f"live_{self.camera_id}",
|
| "event_type": event_type,
|
| "start_timestamp": start_time,
|
| "end_timestamp": end_time,
|
| "start_timestamp_ms": int(start_time * 1000),
|
| "end_timestamp_ms": int(end_time * 1000),
|
| "confidence": max_confidence,
|
| "confidence_score": max_confidence,
|
| "description": f"Live stream event: {event_type} detected",
|
| "bounding_boxes": bounding_boxes,
|
| "metadata": {
|
| "camera_id": self.camera_id,
|
| "objects_detected": results.get('objects_detected', []),
|
| "behaviors_detected": results.get('behaviors_detected', []),
|
| "motion_score": results.get('motion_score', 0.0),
|
| "source": "live_stream"
|
| }
|
| }
|
|
|
| logger.info(f"๐ Creating event: {event_type} (confidence: {max_confidence:.2f})")
|
| event_id = self.event_repo.create_event(event_doc)
|
|
|
| if event_id:
|
| logger.info(f"โ
Created event in MongoDB: {event_doc['event_id']} (MongoDB ID: {event_id})")
|
| self.stats['events_created'] += 1
|
| else:
|
| logger.warning(f"โ ๏ธ Failed to create event in MongoDB: {event_doc['event_id']}")
|
|
|
| return event_id
|
|
|
| except Exception as e:
|
| logger.error(f"โ Error creating event: {e}")
|
| import traceback
|
| logger.error(traceback.format_exc())
|
| return None
|
|
|
| def _validate_fight_with_yolo(self, frame: np.ndarray) -> Tuple[bool, List[Dict]]:
|
| """
|
| Option B: Validate fighting detection by checking for 2+ persons close together.
|
| Uses general YOLO (yolov8n) to detect persons (COCO class 0).
|
|
|
| Returns:
|
| (is_valid, person_detections) โ True if 2+ persons are near each other
|
| """
|
| if self.general_detector is None:
|
|
|
| return True, []
|
| try:
|
| results = self.general_detector(frame, conf=0.35, verbose=False)
|
| persons = []
|
| for r in results:
|
| for box in r.boxes:
|
| cls_id = int(box.cls)
|
| if cls_id == 0:
|
| x1, y1, x2, y2 = box.xyxy[0].tolist()
|
| persons.append({
|
| 'class': 'person', 'confidence': float(box.conf),
|
| 'bbox': [x1, y1, x2, y2],
|
| 'center': ((x1 + x2) / 2, (y1 + y2) / 2)
|
| })
|
| if len(persons) < 2:
|
| logger.info(f"๐ซ Fight validation failed: only {len(persons)} person(s) detected, need 2+")
|
| return False, persons
|
|
|
| for i in range(len(persons)):
|
| for j in range(i + 1, len(persons)):
|
| if self._boxes_are_close(persons[i]['bbox'], persons[j]['bbox'], frame.shape):
|
| logger.info(f"โ
Fight validation passed: {len(persons)} persons, pair ({i},{j}) close together")
|
| return True, persons
|
| logger.info(f"๐ซ Fight validation failed: {len(persons)} persons but none close together")
|
| return False, persons
|
| except Exception as e:
|
| logger.warning(f"โ ๏ธ Fight YOLO validation error: {e}")
|
| return True, []
|
|
|
| def _validate_accident_with_yolo(self, frame: np.ndarray) -> Tuple[bool, List[Dict]]:
|
| """
|
| Option B: Validate accident detection by checking for 2+ vehicles close together.
|
| Uses general YOLO (yolov8n) to detect cars/trucks/buses (COCO classes 2,5,7).
|
|
|
| Returns:
|
| (is_valid, vehicle_detections) โ True if 2+ vehicles are near each other
|
| """
|
| if self.general_detector is None:
|
| return True, []
|
| try:
|
| results = self.general_detector(frame, conf=0.35, verbose=False)
|
| vehicles = []
|
| vehicle_classes = {2: 'car', 3: 'motorcycle', 5: 'bus', 7: 'truck'}
|
| for r in results:
|
| for box in r.boxes:
|
| cls_id = int(box.cls)
|
| if cls_id in vehicle_classes:
|
| x1, y1, x2, y2 = box.xyxy[0].tolist()
|
| vehicles.append({
|
| 'class': vehicle_classes[cls_id], 'confidence': float(box.conf),
|
| 'bbox': [x1, y1, x2, y2],
|
| 'center': ((x1 + x2) / 2, (y1 + y2) / 2)
|
| })
|
| if len(vehicles) < 2:
|
| logger.info(f"๐ซ Accident validation failed: only {len(vehicles)} vehicle(s) detected, need 2+")
|
| return False, vehicles
|
| for i in range(len(vehicles)):
|
| for j in range(i + 1, len(vehicles)):
|
| if self._boxes_are_close(vehicles[i]['bbox'], vehicles[j]['bbox'], frame.shape):
|
| logger.info(f"โ
Accident validation passed: {len(vehicles)} vehicles, pair ({i},{j}) close together")
|
| return True, vehicles
|
| logger.info(f"๐ซ Accident validation failed: {len(vehicles)} vehicles but none close together")
|
| return False, vehicles
|
| except Exception as e:
|
| logger.warning(f"โ ๏ธ Accident YOLO validation error: {e}")
|
| return True, []
|
|
|
| @staticmethod
|
| def _boxes_are_close(bbox1: List[float], bbox2: List[float], frame_shape: tuple,
|
| overlap_threshold: float = 0.0, distance_ratio: float = 0.15) -> bool:
|
| """
|
| Check if two bounding boxes are close/touching/overlapping.
|
|
|
| Args:
|
| bbox1, bbox2: [x1, y1, x2, y2]
|
| frame_shape: (height, width, channels)
|
| overlap_threshold: IoU threshold (0 = any overlap counts)
|
| distance_ratio: max gap as fraction of frame diagonal for 'close'
|
|
|
| Returns:
|
| True if boxes overlap or are very close
|
| """
|
| x1a, y1a, x2a, y2a = bbox1
|
| x1b, y1b, x2b, y2b = bbox2
|
|
|
| ix1 = max(x1a, x1b); iy1 = max(y1a, y1b)
|
| ix2 = min(x2a, x2b); iy2 = min(y2a, y2b)
|
| if ix2 > ix1 and iy2 > iy1:
|
| return True
|
|
|
| dx = max(0, max(x1a, x1b) - min(x2a, x2b))
|
| dy = max(0, max(y1a, y1b) - min(y2a, y2b))
|
| distance = (dx ** 2 + dy ** 2) ** 0.5
|
| diag = (frame_shape[0] ** 2 + frame_shape[1] ** 2) ** 0.5
|
| return distance < (diag * distance_ratio)
|
|
|
| def process_single_frame(self, frame: np.ndarray) -> Dict[str, Any]:
|
| """
|
| Process a single frame sent from the browser webcam.
|
| Computes elapsed timestamp automatically.
|
| """
|
| if self.stats.get('start_time'):
|
| timestamp = time.time() - self.stats['start_time']
|
| else:
|
| timestamp = 0.0
|
| return self.process_frame(frame, timestamp)
|
|
|
| def generate_frames(self, camera_index: int = 0, stream_url: str = None):
|
| """
|
| Generator function for video frames with processing
|
|
|
| Args:
|
| camera_index: Camera device index (0 for default webcam)
|
| stream_url: Optional RTSP/HTTP stream URL (takes priority over camera_index)
|
|
|
| Yields:
|
| Processed frame bytes for streaming
|
| """
|
|
|
| if self.cap is not None:
|
| try:
|
| self.cap.release()
|
| except:
|
| pass
|
|
|
|
|
| source = stream_url if stream_url else camera_index
|
| source_label = stream_url if stream_url else f"camera {camera_index}"
|
|
|
|
|
| max_retries = 3
|
| self.cap = None
|
|
|
| for attempt in range(max_retries):
|
| try:
|
| logger.info(f"Attempting to open {source_label} (attempt {attempt + 1}/{max_retries})")
|
| self.cap = cv2.VideoCapture(source)
|
|
|
|
|
| time.sleep(0.5)
|
|
|
| if self.cap.isOpened():
|
|
|
| ret, test_frame = self.cap.read()
|
| if ret and test_frame is not None:
|
| logger.info(f"โ
Successfully opened {source_label}")
|
| break
|
| else:
|
| logger.warning(f"{source_label} opened but cannot read frames")
|
| self.cap.release()
|
| self.cap = None
|
| else:
|
| logger.warning(f"{source_label} failed to open")
|
| if self.cap:
|
| self.cap.release()
|
| self.cap = None
|
| except Exception as e:
|
| logger.error(f"Error opening {source_label}: {e}")
|
| if self.cap:
|
| try:
|
| self.cap.release()
|
| except:
|
| pass
|
| self.cap = None
|
|
|
| if self.cap is None or not self.cap.isOpened():
|
| if stream_url:
|
| error_msg = f"โ Could not connect to stream: {stream_url}"
|
| else:
|
| error_msg = (
|
| f"โ No camera available (index {camera_index}).\n"
|
| "The server has no physical webcam.\n"
|
| "Use an RTSP/HTTP stream URL instead."
|
| )
|
| logger.error(error_msg)
|
|
|
| error_frame = self._create_error_frame(error_msg)
|
| ret, buffer = cv2.imencode('.jpg', error_frame)
|
| if ret:
|
| yield (b'--frame\r\n'
|
| b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')
|
| return
|
|
|
|
|
| try:
|
| self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
|
| self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
|
| self.cap.set(cv2.CAP_PROP_FPS, 30)
|
|
|
| self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
| except Exception as e:
|
| logger.warning(f"Could not set camera properties: {e}")
|
|
|
| self.is_processing = True
|
| self.stats['start_time'] = time.time()
|
| self.frame_count = 0
|
| self.last_keyframe_time = time.time()
|
|
|
| logger.info(f"๐ฅ Started live stream processing for camera {camera_index}")
|
| logger.info(f"๐ Camera properties: {self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)}x{self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)} @ {self.cap.get(cv2.CAP_PROP_FPS)} FPS")
|
| logger.info(f"๐ Entering frame generation loop...")
|
|
|
| current_event_start = None
|
| event_results = None
|
|
|
| try:
|
| consecutive_failures = 0
|
| max_failures = 10
|
| while self.is_processing:
|
| ret, frame = self.cap.read()
|
| if not ret or frame is None:
|
| consecutive_failures += 1
|
| if consecutive_failures >= max_failures:
|
| logger.error(f"โ Failed to read {max_failures} consecutive frames from camera")
|
| break
|
| logger.warning(f"โ ๏ธ Failed to read frame from camera (failure {consecutive_failures}/{max_failures})")
|
| time.sleep(0.1)
|
| continue
|
|
|
| consecutive_failures = 0
|
| self.frame_count += 1
|
| self.stats['frames_processed'] += 1
|
|
|
| if self.frame_count == 1:
|
| logger.info(f"โ
Successfully read first frame! Frame shape: {frame.shape}")
|
| current_time = time.time()
|
| timestamp = current_time - self.stats['start_time']
|
|
|
|
|
| results = self.process_frame(frame, timestamp)
|
|
|
|
|
| should_extract_keyframe = (
|
| (current_time - self.last_keyframe_time >= self.keyframe_interval) or
|
| results.get('objects_detected') or
|
| results.get('behaviors_detected')
|
| )
|
|
|
| if should_extract_keyframe:
|
| self.save_keyframe(frame, results, timestamp)
|
| self.last_keyframe_time = current_time
|
|
|
|
|
| if results.get('objects_detected') or results.get('behaviors_detected'):
|
| if current_event_start is None:
|
| current_event_start = timestamp
|
| event_results = results
|
| else:
|
|
|
| event_results['objects_detected'].extend(results.get('objects_detected', []))
|
| event_results['behaviors_detected'].extend(results.get('behaviors_detected', []))
|
| else:
|
|
|
| if current_event_start is not None:
|
| self.create_event(event_results, current_event_start, timestamp)
|
| current_event_start = None
|
| event_results = None
|
|
|
|
|
| annotated_frame = self.annotate_frame(frame, results)
|
|
|
|
|
| ret, buffer = cv2.imencode('.jpg', annotated_frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
|
| if ret:
|
| frame_bytes = buffer.tobytes()
|
| if self.frame_count % 30 == 0:
|
| logger.debug(f"๐น Yielding frame {self.frame_count} ({len(frame_bytes)} bytes)")
|
| yield (b'--frame\r\n'
|
| b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')
|
| else:
|
| logger.warning(f"โ ๏ธ Failed to encode frame {self.frame_count}")
|
|
|
|
|
| time.sleep(0.033)
|
|
|
| except Exception as e:
|
| logger.error(f"Error in frame generation: {e}")
|
| import traceback
|
| logger.error(traceback.format_exc())
|
| finally:
|
| self.stop()
|
|
|
| def _create_error_frame(self, error_message: str) -> np.ndarray:
|
| """Create an error frame to display when camera fails"""
|
| frame = np.zeros((480, 640, 3), dtype=np.uint8)
|
| frame.fill(20)
|
|
|
|
|
| font = cv2.FONT_HERSHEY_SIMPLEX
|
| text = "Camera Error"
|
| text_size = cv2.getTextSize(text, font, 1, 2)[0]
|
| text_x = (640 - text_size[0]) // 2
|
| text_y = 200
|
| cv2.putText(frame, text, (text_x, text_y), font, 1, (0, 0, 255), 2)
|
|
|
|
|
| msg_lines = error_message.split(' ')
|
| line = ""
|
| y_offset = 250
|
| for word in msg_lines:
|
| test_line = line + word + " "
|
| test_size = cv2.getTextSize(test_line, font, 0.6, 1)[0]
|
| if test_size[0] > 600:
|
| cv2.putText(frame, line, (20, y_offset), font, 0.6, (255, 255, 255), 1)
|
| line = word + " "
|
| y_offset += 30
|
| else:
|
| line = test_line
|
| if line:
|
| cv2.putText(frame, line, (20, y_offset), font, 0.6, (255, 255, 255), 1)
|
|
|
| return frame
|
|
|
| def annotate_frame(self, frame: np.ndarray, results: Dict[str, Any]) -> np.ndarray:
|
| """
|
| Draw annotations on frame (detections, behaviors, etc.) - matches uploaded video pipeline
|
|
|
| Args:
|
| frame: Input frame
|
| results: Processing results
|
|
|
| Returns:
|
| Annotated frame
|
| """
|
| annotated = frame.copy()
|
|
|
|
|
| for obj in results.get('objects_detected', []):
|
| bbox = obj.get('bbox', [0, 0, 100, 100])
|
| class_name = obj.get('class', 'object')
|
| confidence = float(obj.get('confidence', 0.0))
|
|
|
| x1, y1, x2, y2 = map(int, bbox)
|
|
|
|
|
| color_map = {
|
| 'fire': (255, 255, 0),
|
| 'knife': (0, 255, 255),
|
| 'gun': (0, 255, 0),
|
| 'smoke': (128, 128, 128)
|
| }
|
| color = color_map.get(class_name.lower(), (0, 0, 255))
|
|
|
|
|
| cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 3)
|
|
|
|
|
| label = f"{class_name}: {confidence:.2f}"
|
| font = cv2.FONT_HERSHEY_SIMPLEX
|
| font_scale = 0.6
|
| thickness = 2
|
| label_size, baseline = cv2.getTextSize(label, font, font_scale, thickness)
|
|
|
|
|
| cv2.rectangle(annotated,
|
| (x1, y1 - label_size[1] - 10),
|
| (x1 + label_size[0], y1),
|
| color, -1)
|
|
|
|
|
| cv2.putText(annotated, label, (x1, y1 - 5),
|
| font, font_scale, (255, 255, 255), thickness)
|
|
|
|
|
| behavior_y_offset = 30
|
| for behavior in results.get('behaviors_detected', []):
|
| behavior_type = behavior.get('behavior_type', behavior.get('behavior', 'unknown'))
|
| confidence = float(behavior.get('confidence', 0.0))
|
| label = f"{behavior_type.upper()}: {confidence:.2f}"
|
|
|
|
|
| behavior_colors = {
|
| 'fighting': (0, 0, 255),
|
| 'road_accident': (0, 165, 255),
|
| 'wallclimb': (255, 0, 255)
|
| }
|
| behavior_color = behavior_colors.get(behavior_type.lower(), (0, 255, 0))
|
|
|
|
|
| font = cv2.FONT_HERSHEY_SIMPLEX
|
| font_scale = 0.7
|
| thickness = 2
|
| label_size, baseline = cv2.getTextSize(label, font, font_scale, thickness)
|
|
|
|
|
| cv2.rectangle(annotated,
|
| (10, behavior_y_offset - label_size[1] - 5),
|
| (10 + label_size[0], behavior_y_offset + 5),
|
| behavior_color, -1)
|
|
|
| cv2.putText(annotated, label, (10, behavior_y_offset),
|
| font, font_scale, (255, 255, 255), thickness)
|
| behavior_y_offset += 35
|
|
|
|
|
| if results.get('motion_detected'):
|
| motion_label = f"MOTION: {results.get('motion_score', 0.0):.0f}"
|
| cv2.putText(annotated, motion_label, (10, behavior_y_offset),
|
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
|
| behavior_y_offset += 30
|
|
|
|
|
| if results.get('faces_detected', 0) > 0:
|
| face_label = f"FACES: {results['faces_detected']}"
|
| cv2.putText(annotated, face_label, (10, behavior_y_offset),
|
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 192, 203), 2)
|
| behavior_y_offset += 30
|
|
|
|
|
| stats_text = f"Frame: {self.frame_count} | Objects: {len(results.get('objects_detected', []))} | Events: {self.stats['events_created']}"
|
| cv2.putText(annotated, stats_text, (10, annotated.shape[0] - 10),
|
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
|
|
|
| return annotated
|
|
|
| def stop(self):
|
| """Stop processing and release resources"""
|
| self.is_processing = False
|
| if self.cap:
|
| self.cap.release()
|
| logger.info("๐ Live stream processing stopped")
|
|
|
| def get_stats(self) -> Dict[str, Any]:
|
| """Get processing statistics"""
|
| runtime = time.time() - self.stats['start_time'] if self.stats['start_time'] else 0
|
| return {
|
| **self.stats,
|
| 'runtime_seconds': runtime,
|
| 'fps': self.stats['frames_processed'] / runtime if runtime > 0 else 0,
|
| 'is_processing': self.is_processing
|
| }
|
|
|
|
|
|
|
| _live_processors = {}
|
|
|
|
|
| def get_live_processor(camera_id: str = "webcam_01", config: VideoProcessingConfig = None,
|
| object_detector=None, behavior_analyzer=None) -> LiveStreamProcessor:
|
| """Get or create a live stream processor for a camera.
|
|
|
| If object_detector / behavior_analyzer are provided they will be
|
| shared with the processor (avoids loading models twice).
|
| """
|
| if camera_id not in _live_processors:
|
| _live_processors[camera_id] = LiveStreamProcessor(
|
| config, camera_id,
|
| object_detector=object_detector,
|
| behavior_analyzer=behavior_analyzer
|
| )
|
| else:
|
|
|
| proc = _live_processors[camera_id]
|
| if proc.object_detector is None and object_detector is not None:
|
| proc.object_detector = object_detector
|
| logger.info("โ
Injected shared object detector into existing live processor")
|
| if proc.behavior_analyzer is None and behavior_analyzer is not None:
|
| proc.behavior_analyzer = behavior_analyzer
|
| logger.info("โ
Injected shared behavior analyzer into existing live processor")
|
| return _live_processors[camera_id]
|
|
|
|
|
| def stop_live_processor(camera_id: str):
|
| """Stop and remove a live stream processor"""
|
| if camera_id in _live_processors:
|
| _live_processors[camera_id].stop()
|
| del _live_processors[camera_id]
|
|
|
|
|