""" Real-Time Alert Engine for DetectifAI This module provides the core alert engine for processing live stream detections and generating real-time alerts with: - Threat classification (critical, high, medium, low) - Suspicious person re-appearance tracking via MinIO face store - Alert deduplication and cooldown management - Alert queue for SSE broadcast to frontend clients - False positive feedback loop for improving accuracy Alert Types: - Object Detection: gun, knife, fire - Behavior Detection: fight, accident, wall_climb - Suspicious Person Re-appearance: previously flagged face detected again """ import uuid import time import threading import logging from datetime import datetime, timedelta from typing import Dict, List, Any, Optional, Tuple from dataclasses import dataclass, asdict, field from enum import Enum from collections import deque import numpy as np logger = logging.getLogger(__name__) # ======================================== # Alert Enums & Data Models # ======================================== class AlertSeverity(Enum): CRITICAL = "critical" # Immediate danger: fire, gun HIGH = "high" # Serious threat: knife, fight MEDIUM = "medium" # Suspicious: wall_climb, accident LOW = "low" # Informational: suspicious person re-appearance class AlertType(Enum): OBJECT_DETECTION = "object_detection" BEHAVIOR_DETECTION = "behavior_detection" SUSPICIOUS_PERSON = "suspicious_person" class AlertStatus(Enum): PENDING = "pending" # Awaiting user confirmation CONFIRMED = "confirmed" # User confirmed as real threat DISMISSED = "dismissed" # User dismissed as false positive AUTO_EXPIRED = "auto_expired" # No response within timeout # Threat classification mapping THREAT_CLASSIFICATION = { # Object detections "fire": {"severity": AlertSeverity.CRITICAL, "type": AlertType.OBJECT_DETECTION, "display_name": "🔥 Fire Detected", "description": "Fire/flames detected in camera feed", "requires_confirmation": True}, "gun": {"severity": AlertSeverity.CRITICAL, "type": AlertType.OBJECT_DETECTION, "display_name": "🔫 Weapon (Gun) Detected", "description": "Firearm detected in camera feed", "requires_confirmation": True}, "knife": {"severity": AlertSeverity.HIGH, "type": AlertType.OBJECT_DETECTION, "display_name": "🔪 Weapon (Knife) Detected", "description": "Knife/blade detected in camera feed", "requires_confirmation": True}, # Behavior detections "fighting": {"severity": AlertSeverity.HIGH, "type": AlertType.BEHAVIOR_DETECTION, "display_name": "👊 Fight Detected", "description": "Physical altercation detected", "requires_confirmation": True}, "road_accident": {"severity": AlertSeverity.MEDIUM, "type": AlertType.BEHAVIOR_DETECTION, "display_name": "🚗 Accident Detected", "description": "Vehicle/road accident detected", "requires_confirmation": True}, "wallclimb": {"severity": AlertSeverity.MEDIUM, "type": AlertType.BEHAVIOR_DETECTION, "display_name": "🧗 Wall Climbing Detected", "description": "Unauthorized climbing/trespassing detected", "requires_confirmation": True}, # Aliases: behavior model returns these labels (lowercase of model output) "accident": {"severity": AlertSeverity.MEDIUM, "type": AlertType.BEHAVIOR_DETECTION, "display_name": "🚗 Accident Detected", "description": "Vehicle/road accident detected", "requires_confirmation": True}, "climbing": {"severity": AlertSeverity.MEDIUM, "type": AlertType.BEHAVIOR_DETECTION, "display_name": "🧗 Wall Climbing Detected", "description": "Unauthorized climbing/trespassing detected", "requires_confirmation": True}, # Suspicious person re-appearance "suspicious_reappearance": {"severity": AlertSeverity.LOW, "type": AlertType.SUSPICIOUS_PERSON, "display_name": "👤 Suspicious Person Re-appeared", "description": "A previously flagged person has been detected again", "requires_confirmation": True}, } @dataclass class RealTimeAlert: """Single real-time alert with all metadata""" alert_id: str camera_id: str alert_type: str # From AlertType enum value detection_class: str # e.g., 'fire', 'gun', 'fighting' severity: str # From AlertSeverity enum value display_name: str description: str confidence: float timestamp: float # Unix timestamp timestamp_iso: str # ISO formatted datetime string status: str = "pending" # From AlertStatus enum value # Detection details bounding_boxes: List[Dict] = field(default_factory=list) frame_snapshot_path: Optional[str] = None # MinIO path to frame snapshot frame_snapshot_url: Optional[str] = None # Presigned URL for frontend # Suspicious person tracking face_id: Optional[str] = None face_match_score: Optional[float] = None previous_events: List[str] = field(default_factory=list) # Previous event IDs involving this person # User feedback confirmed_by: Optional[str] = None confirmed_at: Optional[str] = None feedback_note: Optional[str] = None # Linked event in MongoDB event_id: Optional[str] = None video_id: Optional[str] = None requires_confirmation: bool = True def to_dict(self) -> Dict[str, Any]: """Convert to dict for JSON serialization and MongoDB storage. Uses a JSON round-trip to unconditionally sanitize all numpy types. """ import json class _NumpyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): return float(obj) if isinstance(obj, np.ndarray): return obj.tolist() return super().default(obj) return json.loads(json.dumps(asdict(self), cls=_NumpyEncoder)) def to_sse_payload(self) -> Dict[str, Any]: """Convert to lightweight SSE payload for frontend. Uses a JSON round-trip with a custom encoder to unconditionally sanitize ALL numpy scalar and array types, regardless of NumPy version. This is the only reliable approach — isinstance checks on np.integer miss np.int64 in some NumPy builds. """ import json class _NumpyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): return float(obj) if isinstance(obj, np.ndarray): return obj.tolist() return super().default(obj) raw = { "alert_id": self.alert_id, "camera_id": self.camera_id, "alert_type": self.alert_type, "detection_class": self.detection_class, "severity": self.severity, "display_name": self.display_name, "description": self.description, "confidence": round(float(self.confidence), 3), "timestamp": float(self.timestamp), "timestamp_iso": self.timestamp_iso, "status": self.status, "bounding_boxes": self.bounding_boxes, "frame_snapshot_url": self.frame_snapshot_url, "face_id": self.face_id, "face_match_score": float(self.face_match_score) if self.face_match_score is not None else None, "requires_confirmation": self.requires_confirmation, "event_id": self.event_id, } # Round-trip through JSON to force-cast all numpy types to Python natives return json.loads(json.dumps(raw, cls=_NumpyEncoder)) # ======================================== # Alert Engine (Singleton) # ======================================== class RealTimeAlertEngine: """ Central alert engine that processes detections from the live stream pipeline and manages the alert lifecycle: 1. Detection comes in from LiveStreamProcessor 2. Engine classifies threat severity 3. Checks for suspicious person re-appearance 4. Deduplicates against recent alerts (cooldown) 5. Stores snapshot frame in MinIO 6. Pushes alert to SSE broadcast queue 7. Persists alert to MongoDB 8. Handles user confirmation/dismissal feedback """ _instance = None _lock = threading.Lock() def __new__(cls, *args, **kwargs): """Singleton pattern — one alert engine for the whole app""" with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self._initialized = True # Alert queue for SSE broadcast (thread-safe deque) self._alert_queue: deque = deque(maxlen=500) self._alert_subscribers: List[Any] = [] # SSE subscriber queues self._subscriber_lock = threading.Lock() # Active alerts (pending user confirmation) self._active_alerts: Dict[str, RealTimeAlert] = {} self._alert_history: deque = deque(maxlen=1000) # Cooldown tracking to prevent duplicate alerts # Key: (camera_id, detection_class), Value: last_alert_timestamp self._cooldown_tracker: Dict[Tuple[str, str], float] = {} self._cooldown_seconds = { AlertSeverity.CRITICAL.value: 10, # 10s cooldown for critical (fire, gun) AlertSeverity.HIGH.value: 15, # 15s for high AlertSeverity.MEDIUM.value: 20, # 20s for medium AlertSeverity.LOW.value: 30, # 30s for low } # Suspicious person tracking self._flagged_faces: Dict[str, Dict] = {} # face_id -> metadata # Database connections (lazy loaded) self._db_manager = None self._minio_client = None # Statistics self.stats = { "total_alerts": 0, "confirmed_alerts": 0, "dismissed_alerts": 0, "pending_alerts": 0, "alerts_by_type": {}, "alerts_by_severity": {}, } logger.info("✅ Real-Time Alert Engine initialized") @property def db_manager(self): """Lazy-load database manager""" if self._db_manager is None: from database.config import DatabaseManager self._db_manager = DatabaseManager() return self._db_manager @property def alerts_collection(self): """Get MongoDB alerts collection""" return self.db_manager.db.real_time_alerts @property def minio_client(self): """Lazy-load MinIO client""" if self._minio_client is None: self._minio_client = self.db_manager.minio_client return self._minio_client # ======================================== # SSE Subscription Management # ======================================== def subscribe(self): """ Create a new SSE subscriber queue. Returns a queue that the SSE endpoint will read from. """ import queue q = queue.Queue(maxsize=100) with self._subscriber_lock: self._alert_subscribers.append(q) logger.info(f"📡 New SSE subscriber connected (total: {len(self._alert_subscribers)})") return q def unsubscribe(self, q): """Remove an SSE subscriber queue""" with self._subscriber_lock: if q in self._alert_subscribers: self._alert_subscribers.remove(q) logger.info(f"📡 SSE subscriber disconnected (total: {len(self._alert_subscribers)})") def _broadcast_alert(self, alert: RealTimeAlert): """Push alert to all SSE subscribers""" payload = alert.to_sse_payload() dead_subscribers = [] with self._subscriber_lock: for q in self._alert_subscribers: try: q.put_nowait(payload) except Exception: dead_subscribers.append(q) # Clean up dead subscribers for q in dead_subscribers: self._alert_subscribers.remove(q) def _broadcast_update(self, alert_id: str, update_data: Dict): """Broadcast alert status update to all subscribers""" payload = {"type": "alert_update", "alert_id": alert_id, **update_data} dead_subscribers = [] with self._subscriber_lock: for q in self._alert_subscribers: try: q.put_nowait(payload) except Exception: dead_subscribers.append(q) for q in dead_subscribers: self._alert_subscribers.remove(q) # ======================================== # Core Alert Processing # ======================================== def process_detection( self, camera_id: str, detection_class: str, confidence: float, bounding_boxes: List[Dict] = None, frame: Any = None, timestamp: float = None, face_id: str = None, face_match_score: float = None, video_id: str = None, ) -> Optional[RealTimeAlert]: """ Process a detection from the live stream and potentially create an alert. Args: camera_id: Camera identifier detection_class: Type of detection (e.g., 'fire', 'gun', 'fighting') confidence: Detection confidence (0.0 - 1.0) bounding_boxes: List of bounding box dicts frame: OpenCV frame (numpy array) for snapshot timestamp: Detection timestamp face_id: Face ID if facial recognition matched face_match_score: Face match similarity score video_id: Associated video ID Returns: RealTimeAlert if alert was created, None if suppressed by cooldown """ if timestamp is None: timestamp = time.time() # Normalize detection class detection_key = detection_class.lower().strip() # Look up threat classification threat_info = THREAT_CLASSIFICATION.get(detection_key) if threat_info is None: logger.info(f"Unknown detection class '{detection_key}', skipping alert") return None # Check cooldown if self._is_on_cooldown(camera_id, detection_key, threat_info["severity"].value): logger.info(f"Alert suppressed (cooldown): {detection_key} on {camera_id}") return None # Check confidence threshold min_confidence = self._get_min_confidence(detection_key) if confidence < min_confidence: logger.info(f"Alert suppressed (low confidence {confidence:.2f} < {min_confidence}): {detection_key}") return None # Create alert now = datetime.utcnow() alert = RealTimeAlert( alert_id=f"alert_{uuid.uuid4().hex[:12]}", camera_id=camera_id, alert_type=threat_info["type"].value, detection_class=detection_key, severity=threat_info["severity"].value, display_name=threat_info["display_name"], description=threat_info["description"], confidence=float(confidence), timestamp=timestamp, timestamp_iso=now.isoformat() + "Z", status=AlertStatus.PENDING.value, bounding_boxes=bounding_boxes or [], requires_confirmation=threat_info["requires_confirmation"], video_id=video_id or f"live_{camera_id}", face_id=face_id, face_match_score=float(face_match_score) if face_match_score else None, ) # Save frame snapshot to MinIO if frame is not None: snapshot_path = self._save_frame_snapshot(camera_id, alert.alert_id, frame) if snapshot_path: alert.frame_snapshot_path = snapshot_path alert.frame_snapshot_url = self._get_snapshot_url(snapshot_path) # Check suspicious person re-appearance if face_id and face_match_score: previous = self._check_suspicious_person(face_id) if previous: alert.previous_events = previous.get("event_ids", []) # Upgrade alert info for re-appearance alert.description = ( f"{threat_info['description']}. " f"⚠️ This person was previously involved in {len(previous.get('event_ids', []))} incident(s)." ) # Store in active alerts and history self._active_alerts[alert.alert_id] = alert self._alert_history.appendleft(alert) # Update cooldown self._cooldown_tracker[(camera_id, detection_key)] = timestamp # Update stats self.stats["total_alerts"] += 1 self.stats["pending_alerts"] += 1 self.stats["alerts_by_type"][detection_key] = self.stats["alerts_by_type"].get(detection_key, 0) + 1 self.stats["alerts_by_severity"][alert.severity] = self.stats["alerts_by_severity"].get(alert.severity, 0) + 1 # Persist to MongoDB (async) threading.Thread(target=self._persist_alert, args=(alert,), daemon=True).start() # Broadcast to SSE subscribers self._broadcast_alert(alert) logger.info( f"🚨 ALERT: [{alert.severity.upper()}] {alert.display_name} " f"(confidence: {confidence:.2f}) on camera {camera_id}" ) return alert def process_suspicious_person( self, camera_id: str, face_id: str, face_match_score: float, frame: Any = None, timestamp: float = None, matched_person_info: Dict = None, ) -> Optional[RealTimeAlert]: """ Process a suspicious person re-appearance detection. Called when facial recognition matches a previously flagged face. Args: camera_id: Camera identifier face_id: Matched face ID face_match_score: Similarity score (0.0-1.0) frame: Current frame timestamp: Detection timestamp matched_person_info: Previous incident info for this person """ if timestamp is None: timestamp = time.time() # Only alert if we have a meaningful match if face_match_score < 0.6: return None # Check cooldown for this specific face cooldown_key = (camera_id, f"face_{face_id}") last_alert_time = self._cooldown_tracker.get(cooldown_key, 0) if (timestamp - last_alert_time) < 60: # 60s cooldown per face return None # Create alert return self.process_detection( camera_id=camera_id, detection_class="suspicious_reappearance", confidence=face_match_score, frame=frame, timestamp=timestamp, face_id=face_id, face_match_score=face_match_score, ) # ======================================== # User Feedback (Confirm / Dismiss) # ======================================== def confirm_alert(self, alert_id: str, user_id: str = None, note: str = None) -> Optional[Dict]: """ User confirms alert as real threat. Updates MongoDB, stats, and broadcasts update. """ alert = self._active_alerts.get(alert_id) if not alert: # Try loading from DB alert = self._load_alert_from_db(alert_id) if not alert: logger.warning(f"Alert not found: {alert_id}") return None alert.status = AlertStatus.CONFIRMED.value alert.confirmed_by = user_id alert.confirmed_at = datetime.utcnow().isoformat() + "Z" alert.feedback_note = note # Update stats self.stats["confirmed_alerts"] += 1 self.stats["pending_alerts"] = max(0, self.stats["pending_alerts"] - 1) # Flag the person as suspicious for future tracking if alert.face_id: self._flag_suspicious_person(alert.face_id, alert) # Update in MongoDB threading.Thread( target=self._update_alert_in_db, args=(alert_id, { "status": alert.status, "confirmed_by": user_id, "confirmed_at": datetime.utcnow(), "feedback_note": note, "is_verified": True, "is_false_positive": False, }), daemon=True ).start() # Also update the linked event in the event collection if alert.event_id: threading.Thread( target=self._update_linked_event, args=(alert.event_id, True, False), daemon=True ).start() # Broadcast update self._broadcast_update(alert_id, { "status": "confirmed", "confirmed_by": user_id, "confirmed_at": alert.confirmed_at, }) logger.info(f"✅ Alert CONFIRMED: {alert_id} ({alert.display_name}) by {user_id}") return alert.to_dict() def dismiss_alert(self, alert_id: str, user_id: str = None, note: str = None) -> Optional[Dict]: """ User dismisses alert as false positive. Updates MongoDB, stats, and broadcasts update. """ alert = self._active_alerts.get(alert_id) if not alert: alert = self._load_alert_from_db(alert_id) if not alert: logger.warning(f"Alert not found: {alert_id}") return None alert.status = AlertStatus.DISMISSED.value alert.confirmed_by = user_id alert.confirmed_at = datetime.utcnow().isoformat() + "Z" alert.feedback_note = note # Update stats self.stats["dismissed_alerts"] += 1 self.stats["pending_alerts"] = max(0, self.stats["pending_alerts"] - 1) # Update in MongoDB threading.Thread( target=self._update_alert_in_db, args=(alert_id, { "status": alert.status, "confirmed_by": user_id, "confirmed_at": datetime.utcnow(), "feedback_note": note, "is_verified": True, "is_false_positive": True, }), daemon=True ).start() # Also mark linked event as false positive if alert.event_id: threading.Thread( target=self._update_linked_event, args=(alert.event_id, True, True), daemon=True ).start() # Broadcast update self._broadcast_update(alert_id, { "status": "dismissed", "confirmed_by": user_id, "confirmed_at": alert.confirmed_at, }) logger.info(f"❌ Alert DISMISSED: {alert_id} ({alert.display_name}) by {user_id}") return alert.to_dict() # ======================================== # Alert Queries # ======================================== def get_active_alerts(self, camera_id: str = None) -> List[Dict]: """Get all pending (unconfirmed) alerts, optionally filtered by camera""" alerts = [] for alert in self._active_alerts.values(): if alert.status == AlertStatus.PENDING.value: if camera_id is None or alert.camera_id == camera_id: alerts.append(alert.to_sse_payload()) return sorted(alerts, key=lambda a: a["timestamp"], reverse=True) def get_alert_history(self, limit: int = 50, camera_id: str = None, severity: str = None, status: str = None) -> List[Dict]: """Get alert history with optional filters""" alerts = [] for alert in self._alert_history: if camera_id and alert.camera_id != camera_id: continue if severity and alert.severity != severity: continue if status and alert.status != status: continue alerts.append(alert.to_dict()) if len(alerts) >= limit: break return alerts def get_alert_by_id(self, alert_id: str) -> Optional[Dict]: """Get a single alert by ID""" alert = self._active_alerts.get(alert_id) if alert: return alert.to_dict() # Try DB loaded = self._load_alert_from_db(alert_id) if loaded: return loaded.to_dict() return None def get_stats(self) -> Dict: """Get alert statistics""" return { **self.stats, "active_subscribers": len(self._alert_subscribers), "active_pending_count": sum( 1 for a in self._active_alerts.values() if a.status == AlertStatus.PENDING.value ), } # ======================================== # Suspicious Person Tracking # ======================================== def _flag_suspicious_person(self, face_id: str, alert: RealTimeAlert): """Flag a person as suspicious for future re-appearance tracking""" if face_id not in self._flagged_faces: self._flagged_faces[face_id] = { "face_id": face_id, "flagged_at": datetime.utcnow().isoformat(), "event_ids": [], "alert_ids": [], "incident_count": 0, } entry = self._flagged_faces[face_id] entry["event_ids"].append(alert.event_id or alert.alert_id) entry["alert_ids"].append(alert.alert_id) entry["incident_count"] += 1 entry["last_seen"] = datetime.utcnow().isoformat() # Also persist to MongoDB for cross-session tracking threading.Thread( target=self._persist_flagged_person, args=(face_id, entry), daemon=True ).start() logger.info(f"🏷️ Person {face_id[:8]}... flagged as suspicious (incidents: {entry['incident_count']})") def _check_suspicious_person(self, face_id: str) -> Optional[Dict]: """Check if a face belongs to a previously flagged person""" # Check in-memory cache first if face_id in self._flagged_faces: return self._flagged_faces[face_id] # Check MongoDB try: doc = self.alerts_collection.find_one( {"face_id": face_id, "status": "confirmed"}, sort=[("timestamp", -1)] ) if doc: return { "face_id": face_id, "event_ids": [doc.get("event_id", "")], "incident_count": 1, } except Exception as e: logger.warning(f"Error checking suspicious person: {e}") return None def _persist_flagged_person(self, face_id: str, entry: Dict): """Persist flagged person to MongoDB""" try: self.db_manager.db.flagged_persons.update_one( {"face_id": face_id}, {"$set": entry, "$setOnInsert": {"created_at": datetime.utcnow()}}, upsert=True ) except Exception as e: logger.error(f"Error persisting flagged person: {e}") def load_flagged_persons(self): """Load flagged persons from MongoDB on startup""" try: docs = self.db_manager.db.flagged_persons.find({}) for doc in docs: face_id = doc.get("face_id") if face_id: self._flagged_faces[face_id] = { "face_id": face_id, "flagged_at": doc.get("flagged_at", ""), "event_ids": doc.get("event_ids", []), "alert_ids": doc.get("alert_ids", []), "incident_count": doc.get("incident_count", 0), "last_seen": doc.get("last_seen", ""), } logger.info(f"📋 Loaded {len(self._flagged_faces)} flagged persons from database") except Exception as e: logger.warning(f"Could not load flagged persons: {e}") # ======================================== # Internal Helpers # ======================================== def _is_on_cooldown(self, camera_id: str, detection_class: str, severity: str) -> bool: """Check if a detection is within cooldown period""" key = (camera_id, detection_class) last_time = self._cooldown_tracker.get(key, 0) cooldown = self._cooldown_seconds.get(severity, 15) return (time.time() - last_time) < cooldown def _get_min_confidence(self, detection_class: str) -> float: """Get minimum confidence threshold for alerting""" thresholds = { "fire": 0.65, "gun": 0.60, "knife": 0.60, "fighting": 0.55, "road_accident": 0.50, "accident": 0.50, "wallclimb": 0.50, "climbing": 0.50, "suspicious_reappearance": 0.55, } return thresholds.get(detection_class, 0.50) def _save_frame_snapshot(self, camera_id: str, alert_id: str, frame) -> Optional[str]: """Save alert frame snapshot to MinIO""" try: import cv2 from io import BytesIO # Encode frame is_success, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) if not is_success: return None frame_bytes = buffer.tobytes() timestamp_str = datetime.utcnow().strftime("%Y%m%d_%H%M%S_%f") object_name = f"alerts/{camera_id}/{alert_id}_{timestamp_str}.jpg" bucket = self.db_manager.config.minio_keyframe_bucket frame_buffer = BytesIO(frame_bytes) self.minio_client.put_object( bucket, object_name, frame_buffer, length=len(frame_bytes), content_type="image/jpeg", metadata={"alert_id": alert_id, "camera_id": camera_id} ) return f"{bucket}/{object_name}" except Exception as e: logger.warning(f"Failed to save alert snapshot: {e}") return None def _get_snapshot_url(self, snapshot_path: str) -> Optional[str]: """Generate presigned URL for alert snapshot""" try: parts = snapshot_path.split("/", 1) if len(parts) != 2: return None bucket, object_name = parts url = self.minio_client.presigned_get_object( bucket, object_name, expires=timedelta(hours=2) ) return url except Exception as e: logger.warning(f"Failed to generate snapshot URL: {e}") return None def _persist_alert(self, alert: RealTimeAlert): """Persist alert to MongoDB""" try: import json doc = alert.to_dict() doc["created_at"] = datetime.utcnow() # JSON round-trip guarantees all numpy types are converted to Python natives doc = json.loads(json.dumps(doc, default=lambda o: ( int(o) if isinstance(o, np.integer) else float(o) if isinstance(o, np.floating) else o.tolist() if isinstance(o, np.ndarray) else str(o) ))) doc["created_at"] = datetime.utcnow() # Re-add after json round-trip (datetime not JSON-serializable) self.alerts_collection.insert_one(doc) logger.debug(f"Persisted alert to MongoDB: {alert.alert_id}") except Exception as e: logger.error(f"Failed to persist alert: {e}") def _update_alert_in_db(self, alert_id: str, update_data: Dict): """Update alert in MongoDB""" try: update_data["updated_at"] = datetime.utcnow() self.alerts_collection.update_one( {"alert_id": alert_id}, {"$set": update_data} ) except Exception as e: logger.error(f"Failed to update alert in DB: {e}") def _update_linked_event(self, event_id: str, is_verified: bool, is_false_positive: bool): """Update the linked event in the main event collection""" try: self.db_manager.db.event.update_one( {"event_id": event_id}, {"$set": { "is_verified": is_verified, "is_false_positive": is_false_positive, "verified_at": datetime.utcnow(), }} ) except Exception as e: logger.error(f"Failed to update linked event: {e}") def _load_alert_from_db(self, alert_id: str) -> Optional[RealTimeAlert]: """Load alert from MongoDB""" try: doc = self.alerts_collection.find_one({"alert_id": alert_id}) if doc: # Remove MongoDB _id field doc.pop("_id", None) doc.pop("created_at", None) doc.pop("updated_at", None) return RealTimeAlert(**{k: v for k, v in doc.items() if k in RealTimeAlert.__dataclass_fields__}) except Exception as e: logger.error(f"Failed to load alert from DB: {e}") return None # ======================================== # Module-level convenience functions # ======================================== def get_alert_engine() -> RealTimeAlertEngine: """Get the singleton alert engine instance""" return RealTimeAlertEngine() def process_live_detection(camera_id: str, detection_class: str, confidence: float, frame=None, **kwargs) -> Optional[RealTimeAlert]: """Convenience function to process a detection and potentially generate an alert""" engine = get_alert_engine() return engine.process_detection( camera_id=camera_id, detection_class=detection_class, confidence=confidence, frame=frame, **kwargs )