File size: 11,471 Bytes
fd50325 2278049 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 | """
Data Models for DetectifAI Database Integration
This module defines data models that map EXACTLY to the MongoDB collections
defined in DetectifAI_db/database_setup.py schema.
CRITICAL: Only use fields defined in the MongoDB schema validators.
Extra fields must go in meta_data for video_file or use related collections.
"""
from typing import List, Dict, Any, Optional
from datetime import datetime
from bson import ObjectId
from dataclasses import dataclass, asdict
import json
import numpy as np
@dataclass
class VideoFileModel:
"""Maps EXACTLY to video_file collection schema in MongoDB Atlas"""
# Required fields (from schema)
video_id: str
user_id: str
file_path: str # MinIO path or local path
# Optional fields (from schema)
minio_object_key: Optional[str] = None
minio_bucket: Optional[str] = None
codec: Optional[str] = None
fps: Optional[float] = 30.0 # bsonType: double - must be float
upload_date: Optional[datetime] = None
duration_secs: Optional[int] = None # bsonType: int - must be INTEGER not float
file_size_bytes: Optional[int] = None # bsonType: long
meta_data: Optional[Dict] = None # Store ALL extra fields here (processing_status, resolution, etc.)
_id: Optional[ObjectId] = None
def to_dict(self) -> Dict:
"""Convert to dictionary for MongoDB insertion with proper type conversion"""
data = asdict(self)
# Set defaults
if data.get('upload_date') is None:
data['upload_date'] = datetime.utcnow()
if data.get('fps') is None:
data['fps'] = 30.0
# Ensure duration is integer (MongoDB schema requires int)
if data.get('duration_secs') is not None:
data['duration_secs'] = int(data['duration_secs'])
# Ensure file_size is integer (MongoDB schema requires long)
if data.get('file_size_bytes') is not None:
data['file_size_bytes'] = int(data['file_size_bytes'])
# Ensure fps is float (MongoDB schema requires double)
if data.get('fps') is not None:
data['fps'] = float(data['fps'])
return data
@dataclass
class DetectedFaceModel:
"""Maps to existing detected_faces collection"""
video_id: str
frame_timestamp: float
face_bbox: List[float] # [x1, y1, x2, y2]
confidence: float
face_encoding: Optional[List[float]] = None
keyframe_minio_path: Optional[str] = None
keyframe_id: Optional[ObjectId] = None
person_id: Optional[str] = None
is_suspicious: bool = False
_id: Optional[ObjectId] = None
def to_dict(self) -> Dict:
return asdict(self)
@dataclass
class EventModel:
"""Maps EXACTLY to event collection schema in MongoDB Atlas"""
# Required fields (from schema)
event_id: str
video_id: str
start_timestamp_ms: int # bsonType: long - MUST be milliseconds as INTEGER
end_timestamp_ms: int # bsonType: long - MUST be milliseconds as INTEGER
# Optional fields (from schema)
event_type: Optional[str] = None # 'object_detection', 'motion', 'fire', 'weapon', etc.
confidence_score: Optional[float] = None # bsonType: double (NOT 'confidence')
is_verified: bool = False
is_false_positive: bool = False
verified_at: Optional[datetime] = None
verified_by: Optional[str] = None
visual_embedding: Optional[List[float]] = None # For future FAISS integration
bounding_boxes: Optional[Dict] = None # Store detection bboxes here as object
_id: Optional[ObjectId] = None
def to_dict(self) -> Dict:
"""Convert to dictionary for MongoDB insertion with proper type conversion"""
data = asdict(self)
# Ensure timestamps are integers (milliseconds) - CRITICAL for MongoDB long type
data['start_timestamp_ms'] = int(data['start_timestamp_ms'])
data['end_timestamp_ms'] = int(data['end_timestamp_ms'])
# Ensure confidence_score is float
if data.get('confidence_score') is not None:
data['confidence_score'] = float(data['confidence_score'])
# Set default empty arrays/objects for schema compliance
if data.get('visual_embedding') is None:
data['visual_embedding'] = []
if data.get('bounding_boxes') is None:
data['bounding_boxes'] = {}
return data
@dataclass
class EventCaptionModel:
"""Maps to existing event_caption collection"""
event_id: ObjectId
video_id: str
caption_text: str
generated_by: str = "system" # system, user, ai
confidence: Optional[float] = None
created_at: Optional[datetime] = None
_id: Optional[ObjectId] = None
def to_dict(self) -> Dict:
data = asdict(self)
if data.get('created_at') is None:
data['created_at'] = datetime.utcnow()
return data
@dataclass
class EventClipModel:
"""Maps to existing event_clip collection"""
event_id: ObjectId
video_id: str
clip_start_timestamp: float
clip_end_timestamp: float
minio_clip_path: str
clip_duration: float
frame_count: int
created_at: Optional[datetime] = None
_id: Optional[ObjectId] = None
def to_dict(self) -> Dict:
data = asdict(self)
if data.get('created_at') is None:
data['created_at'] = datetime.utcnow()
return data
@dataclass
class EventDescriptionModel:
"""Maps to existing event_description collection"""
event_id: ObjectId
video_id: str
description_text: str
description_type: str = "automatic" # automatic, manual, ai_generated
tags: Optional[List[str]] = None
created_at: Optional[datetime] = None
_id: Optional[ObjectId] = None
def to_dict(self) -> Dict:
data = asdict(self)
if data.get('created_at') is None:
data['created_at'] = datetime.utcnow()
return data
@dataclass
class FaceMatchModel:
"""Maps to existing face_matches collection"""
video_id: str
face_1_id: ObjectId
face_2_id: ObjectId
similarity_score: float
match_confidence: float
is_match: bool
person_id: Optional[str] = None
created_at: Optional[datetime] = None
_id: Optional[ObjectId] = None
def to_dict(self) -> Dict:
data = asdict(self)
if data.get('created_at') is None:
data['created_at'] = datetime.utcnow()
return data
# New models for video processing pipeline
@dataclass
class KeyframeModel:
"""New collection for extracted keyframes"""
video_id: str
frame_number: int
timestamp: float
quality_score: float
motion_score: float
minio_path: str
enhancement_applied: bool = False
face_count: int = 0
object_detections: Optional[List[Dict]] = None
processing_metadata: Optional[Dict] = None
created_at: Optional[datetime] = None
_id: Optional[ObjectId] = None
def to_dict(self) -> Dict:
data = asdict(self)
if data.get('created_at') is None:
data['created_at'] = datetime.utcnow()
if data.get('object_detections') is None:
data['object_detections'] = []
return data
@dataclass
class VideoSegmentModel:
"""New collection for video segments"""
video_id: str
segment_id: int
start_timestamp: float
end_timestamp: float
duration: float
start_frame: int
end_frame: int
keyframe_ids: List[ObjectId]
activity_level: str # low, medium, high
motion_statistics: Optional[Dict] = None
segment_minio_path: Optional[str] = None
created_at: Optional[datetime] = None
_id: Optional[ObjectId] = None
def to_dict(self) -> Dict:
data = asdict(self)
if data.get('created_at') is None:
data['created_at'] = datetime.utcnow()
return data
@dataclass
class ProcessingJobModel:
"""New collection for tracking processing jobs"""
video_id: str
job_type: str = "complete_processing" # complete_processing, keyframe_extraction, object_detection
status: str = "queued" # queued, processing, completed, failed
progress: int = 0 # 0-100
message: str = ""
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
processing_stats: Optional[Dict] = None
error_details: Optional[Dict] = None
created_at: Optional[datetime] = None
_id: Optional[ObjectId] = None
def to_dict(self) -> Dict:
data = asdict(self)
if data.get('created_at') is None:
data['created_at'] = datetime.utcnow()
return data
@dataclass
class ObjectDetectionModel:
"""Detailed object detection results"""
video_id: str
keyframe_id: ObjectId
detection_id: str
class_name: str # fire, smoke, knife, gun
confidence: float
bbox: List[float] # [x1, y1, x2, y2]
center_point: List[float] # [x, y]
area: float
frame_timestamp: float
detection_model: str # 'fire' for fire_YOLO11.pt, 'weapon' for weapon_YOLO11.pt
threat_level: str = "low"
created_at: Optional[datetime] = None
_id: Optional[ObjectId] = None
def to_dict(self) -> Dict:
data = asdict(self)
if data.get('created_at') is None:
data['created_at'] = datetime.utcnow()
return data
class ModelFactory:
"""Factory class for creating model instances from database documents"""
@staticmethod
def create_video_file(doc: Dict) -> VideoFileModel:
"""Create VideoFileModel from MongoDB document"""
return VideoFileModel(**doc)
@staticmethod
def create_keyframe(doc: Dict) -> KeyframeModel:
"""Create KeyframeModel from MongoDB document"""
return KeyframeModel(**doc)
@staticmethod
def create_event(doc: Dict) -> EventModel:
"""Create EventModel from MongoDB document"""
return EventModel(**doc)
@staticmethod
def create_processing_job(doc: Dict) -> ProcessingJobModel:
"""Create ProcessingJobModel from MongoDB document"""
return ProcessingJobModel(**doc)
# Helper functions for database operations
def prepare_for_mongodb(data: Dict) -> Dict:
"""Prepare data dictionary for MongoDB insertion"""
# Remove None ObjectId fields
cleaned_data = {}
for key, value in data.items():
if key == '_id' and value is None:
continue
cleaned_data[key] = value
return cleaned_data
def convert_objectid_to_string(doc: Dict) -> Dict:
"""Convert ObjectId fields to strings for JSON serialization"""
if isinstance(doc, dict):
for key, value in doc.items():
if isinstance(value, ObjectId):
doc[key] = str(value)
elif isinstance(value, list):
doc[key] = [convert_objectid_to_string(item) if isinstance(item, dict) else str(item) if isinstance(item, ObjectId) else item for item in value]
elif isinstance(value, dict):
doc[key] = convert_objectid_to_string(value)
return doc |