File size: 11,471 Bytes
fd50325
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2278049
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
"""

Data Models for DetectifAI Database Integration



This module defines data models that map EXACTLY to the MongoDB collections

defined in DetectifAI_db/database_setup.py schema.



CRITICAL: Only use fields defined in the MongoDB schema validators.

Extra fields must go in meta_data for video_file or use related collections.

"""

from typing import List, Dict, Any, Optional
from datetime import datetime
from bson import ObjectId
from dataclasses import dataclass, asdict
import json
import numpy as np

@dataclass
class VideoFileModel:
    """Maps EXACTLY to video_file collection schema in MongoDB Atlas"""
    # Required fields (from schema)
    video_id: str
    user_id: str
    file_path: str  # MinIO path or local path
    
    # Optional fields (from schema)
    minio_object_key: Optional[str] = None
    minio_bucket: Optional[str] = None
    codec: Optional[str] = None
    fps: Optional[float] = 30.0  # bsonType: double - must be float
    upload_date: Optional[datetime] = None
    duration_secs: Optional[int] = None  # bsonType: int - must be INTEGER not float
    file_size_bytes: Optional[int] = None  # bsonType: long
    meta_data: Optional[Dict] = None  # Store ALL extra fields here (processing_status, resolution, etc.)
    
    _id: Optional[ObjectId] = None
    
    def to_dict(self) -> Dict:
        """Convert to dictionary for MongoDB insertion with proper type conversion"""
        data = asdict(self)
        
        # Set defaults
        if data.get('upload_date') is None:
            data['upload_date'] = datetime.utcnow()
        if data.get('fps') is None:
            data['fps'] = 30.0
        
        # Ensure duration is integer (MongoDB schema requires int)
        if data.get('duration_secs') is not None:
            data['duration_secs'] = int(data['duration_secs'])
        
        # Ensure file_size is integer (MongoDB schema requires long)
        if data.get('file_size_bytes') is not None:
            data['file_size_bytes'] = int(data['file_size_bytes'])
        
        # Ensure fps is float (MongoDB schema requires double)
        if data.get('fps') is not None:
            data['fps'] = float(data['fps'])
        
        return data

@dataclass 
class DetectedFaceModel:
    """Maps to existing detected_faces collection"""
    video_id: str
    frame_timestamp: float
    face_bbox: List[float]  # [x1, y1, x2, y2]
    confidence: float
    face_encoding: Optional[List[float]] = None
    keyframe_minio_path: Optional[str] = None
    keyframe_id: Optional[ObjectId] = None
    person_id: Optional[str] = None
    is_suspicious: bool = False
    _id: Optional[ObjectId] = None
    
    def to_dict(self) -> Dict:
        return asdict(self)

@dataclass
class EventModel:
    """Maps EXACTLY to event collection schema in MongoDB Atlas"""
    # Required fields (from schema)
    event_id: str
    video_id: str
    start_timestamp_ms: int  # bsonType: long - MUST be milliseconds as INTEGER
    end_timestamp_ms: int    # bsonType: long - MUST be milliseconds as INTEGER
    
    # Optional fields (from schema)
    event_type: Optional[str] = None  # 'object_detection', 'motion', 'fire', 'weapon', etc.
    confidence_score: Optional[float] = None  # bsonType: double (NOT 'confidence')
    is_verified: bool = False
    is_false_positive: bool = False
    verified_at: Optional[datetime] = None
    verified_by: Optional[str] = None
    visual_embedding: Optional[List[float]] = None  # For future FAISS integration
    bounding_boxes: Optional[Dict] = None  # Store detection bboxes here as object
    
    _id: Optional[ObjectId] = None
    
    def to_dict(self) -> Dict:
        """Convert to dictionary for MongoDB insertion with proper type conversion"""
        data = asdict(self)
        
        # Ensure timestamps are integers (milliseconds) - CRITICAL for MongoDB long type
        data['start_timestamp_ms'] = int(data['start_timestamp_ms'])
        data['end_timestamp_ms'] = int(data['end_timestamp_ms'])
        
        # Ensure confidence_score is float
        if data.get('confidence_score') is not None:
            data['confidence_score'] = float(data['confidence_score'])
        
        # Set default empty arrays/objects for schema compliance
        if data.get('visual_embedding') is None:
            data['visual_embedding'] = []
        if data.get('bounding_boxes') is None:
            data['bounding_boxes'] = {}
        
        return data

@dataclass
class EventCaptionModel:
    """Maps to existing event_caption collection"""
    event_id: ObjectId
    video_id: str
    caption_text: str
    generated_by: str = "system"  # system, user, ai
    confidence: Optional[float] = None
    created_at: Optional[datetime] = None
    _id: Optional[ObjectId] = None
    
    def to_dict(self) -> Dict:
        data = asdict(self)
        if data.get('created_at') is None:
            data['created_at'] = datetime.utcnow()
        return data

@dataclass
class EventClipModel:
    """Maps to existing event_clip collection"""
    event_id: ObjectId
    video_id: str
    clip_start_timestamp: float
    clip_end_timestamp: float
    minio_clip_path: str
    clip_duration: float
    frame_count: int
    created_at: Optional[datetime] = None
    _id: Optional[ObjectId] = None
    
    def to_dict(self) -> Dict:
        data = asdict(self)
        if data.get('created_at') is None:
            data['created_at'] = datetime.utcnow()
        return data

@dataclass
class EventDescriptionModel:
    """Maps to existing event_description collection"""
    event_id: ObjectId
    video_id: str
    description_text: str
    description_type: str = "automatic"  # automatic, manual, ai_generated
    tags: Optional[List[str]] = None
    created_at: Optional[datetime] = None
    _id: Optional[ObjectId] = None
    
    def to_dict(self) -> Dict:
        data = asdict(self)
        if data.get('created_at') is None:
            data['created_at'] = datetime.utcnow()
        return data

@dataclass
class FaceMatchModel:
    """Maps to existing face_matches collection"""
    video_id: str
    face_1_id: ObjectId
    face_2_id: ObjectId
    similarity_score: float
    match_confidence: float
    is_match: bool
    person_id: Optional[str] = None
    created_at: Optional[datetime] = None
    _id: Optional[ObjectId] = None
    
    def to_dict(self) -> Dict:
        data = asdict(self)
        if data.get('created_at') is None:
            data['created_at'] = datetime.utcnow()
        return data

# New models for video processing pipeline

@dataclass
class KeyframeModel:
    """New collection for extracted keyframes"""
    video_id: str
    frame_number: int
    timestamp: float
    quality_score: float
    motion_score: float
    minio_path: str
    enhancement_applied: bool = False
    face_count: int = 0
    object_detections: Optional[List[Dict]] = None
    processing_metadata: Optional[Dict] = None
    created_at: Optional[datetime] = None
    _id: Optional[ObjectId] = None
    
    def to_dict(self) -> Dict:
        data = asdict(self)
        if data.get('created_at') is None:
            data['created_at'] = datetime.utcnow()
        if data.get('object_detections') is None:
            data['object_detections'] = []
        return data

@dataclass
class VideoSegmentModel:
    """New collection for video segments"""
    video_id: str
    segment_id: int
    start_timestamp: float
    end_timestamp: float
    duration: float
    start_frame: int
    end_frame: int
    keyframe_ids: List[ObjectId]
    activity_level: str  # low, medium, high
    motion_statistics: Optional[Dict] = None
    segment_minio_path: Optional[str] = None
    created_at: Optional[datetime] = None
    _id: Optional[ObjectId] = None
    
    def to_dict(self) -> Dict:
        data = asdict(self)
        if data.get('created_at') is None:
            data['created_at'] = datetime.utcnow()
        return data

@dataclass
class ProcessingJobModel:
    """New collection for tracking processing jobs"""
    video_id: str
    job_type: str = "complete_processing"  # complete_processing, keyframe_extraction, object_detection
    status: str = "queued"  # queued, processing, completed, failed
    progress: int = 0  # 0-100
    message: str = ""
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    processing_stats: Optional[Dict] = None
    error_details: Optional[Dict] = None
    created_at: Optional[datetime] = None
    _id: Optional[ObjectId] = None
    
    def to_dict(self) -> Dict:
        data = asdict(self)
        if data.get('created_at') is None:
            data['created_at'] = datetime.utcnow()
        return data

@dataclass
class ObjectDetectionModel:
    """Detailed object detection results"""
    video_id: str
    keyframe_id: ObjectId
    detection_id: str
    class_name: str  # fire, smoke, knife, gun
    confidence: float
    bbox: List[float]  # [x1, y1, x2, y2]
    center_point: List[float]  # [x, y]
    area: float
    frame_timestamp: float
    detection_model: str  # 'fire' for fire_YOLO11.pt, 'weapon' for weapon_YOLO11.pt
    threat_level: str = "low"
    created_at: Optional[datetime] = None
    _id: Optional[ObjectId] = None
    
    def to_dict(self) -> Dict:
        data = asdict(self)
        if data.get('created_at') is None:
            data['created_at'] = datetime.utcnow()
        return data

class ModelFactory:
    """Factory class for creating model instances from database documents"""
    
    @staticmethod
    def create_video_file(doc: Dict) -> VideoFileModel:
        """Create VideoFileModel from MongoDB document"""
        return VideoFileModel(**doc)
    
    @staticmethod
    def create_keyframe(doc: Dict) -> KeyframeModel:
        """Create KeyframeModel from MongoDB document"""
        return KeyframeModel(**doc)
    
    @staticmethod
    def create_event(doc: Dict) -> EventModel:
        """Create EventModel from MongoDB document"""
        return EventModel(**doc)
    
    @staticmethod
    def create_processing_job(doc: Dict) -> ProcessingJobModel:
        """Create ProcessingJobModel from MongoDB document"""
        return ProcessingJobModel(**doc)

# Helper functions for database operations

def prepare_for_mongodb(data: Dict) -> Dict:
    """Prepare data dictionary for MongoDB insertion"""
    # Remove None ObjectId fields
    cleaned_data = {}
    for key, value in data.items():
        if key == '_id' and value is None:
            continue
        cleaned_data[key] = value
    return cleaned_data

def convert_objectid_to_string(doc: Dict) -> Dict:
    """Convert ObjectId fields to strings for JSON serialization"""
    if isinstance(doc, dict):
        for key, value in doc.items():
            if isinstance(value, ObjectId):
                doc[key] = str(value)
            elif isinstance(value, list):
                doc[key] = [convert_objectid_to_string(item) if isinstance(item, dict) else str(item) if isinstance(item, ObjectId) else item for item in value]
            elif isinstance(value, dict):
                doc[key] = convert_objectid_to_string(value)
    return doc