File size: 11,197 Bytes
fd50325 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 | """
Keyframe Repository for DetectifAI Database Operations
This module provides MinIO storage and database operations for keyframes.
"""
import os
import io
import cv2
import numpy as np
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import logging
from minio.error import S3Error
logger = logging.getLogger(__name__)
class KeyframeRepository:
"""Repository for keyframe operations with S3 storage and MongoDB"""
def __init__(self, db_manager):
self._db_manager = db_manager
self.db = db_manager.db
self.bucket = db_manager.config.minio_keyframe_bucket # Use dedicated keyframes bucket
self.collection = self.db.keyframes # MongoDB collection for keyframe metadata
@property
def minio(self):
"""Lazy access to S3 storage β tolerates unavailable storage"""
return self._db_manager.minio_client
def save_keyframe_to_minio(self, video_id: str, frame_data: bytes, frame_number: int, timestamp: float) -> Optional[str]:
"""Save a single keyframe directly to S3 storage"""
if self.minio is None:
return None
try:
minio_path = f"{video_id}/frame_{frame_number:06d}.jpg" # Use consistent naming pattern
# Upload bytes directly to MinIO using BytesIO
from io import BytesIO
buffer = BytesIO(frame_data)
self.minio.put_object(
self.bucket,
minio_path,
buffer,
length=len(frame_data),
content_type='image/jpeg'
)
logger.info(f"β
Uploaded keyframe to MinIO: {minio_path}")
return minio_path
except Exception as e:
logger.error(f"β Failed to upload keyframe to MinIO: {e}")
return None
def save_keyframes_batch(self, video_id: str, keyframes: List) -> List[Dict]:
"""Save multiple keyframes directly to MinIO and locally, return their storage info"""
keyframe_info = []
try:
# Create local storage directory
local_dir = os.path.join("video_processing_outputs", "keyframes", video_id)
os.makedirs(local_dir, exist_ok=True)
for keyframe in keyframes:
# Handle KeyframeResult objects
frame_data = keyframe.frame_data if hasattr(keyframe, 'frame_data') else keyframe
frame = frame_data.get('frame') # numpy array
frame_number = frame_data.get('frame_number', 0)
timestamp = frame_data.get('timestamp', 0.0)
if frame is not None:
# Convert numpy array to jpg bytes
is_success, buffer = cv2.imencode('.jpg', frame)
if not is_success:
continue
frame_bytes = buffer.tobytes()
# Save locally
local_filename = f"frame_{frame_number:06d}.jpg"
local_path = os.path.join(local_dir, local_filename)
with open(local_path, 'wb') as f:
f.write(frame_bytes)
logger.info(f"β
Keyframe saved locally: {local_path}")
# Upload bytes directly to MinIO
minio_path = self.save_keyframe_to_minio(
video_id, frame_bytes, frame_number, timestamp
)
if minio_path:
info = {
'frame_number': frame_number,
'timestamp': timestamp,
'minio_path': minio_path,
'local_path': local_path,
'quality_score': frame_data.get('quality_score', 0.0),
'enhancement_applied': frame_data.get('enhancement_applied', False)
}
keyframe_info.append(info)
logger.info(f"β
Uploaded {len(keyframe_info)} keyframes to MinIO and saved locally for video {video_id}")
return keyframe_info
except Exception as e:
logger.error(f"β Failed to upload keyframes batch: {e}")
return keyframe_info # Return whatever was successful
def get_keyframe_presigned_url(self, minio_path: str, expires: timedelta = timedelta(hours=1)) -> str:
"""Generate presigned URL for keyframe access"""
if self.minio is None:
return None
try:
return self.minio.presigned_get_object(self.bucket, minio_path, expires=expires)
except S3Error as e:
logger.error(f"β Failed to generate presigned URL for keyframe: {e}")
return None
def get_video_keyframes_presigned_urls(self, video_id: str, expires: timedelta = timedelta(hours=1)) -> List[Dict]:
"""Get presigned URLs for all keyframes of a video"""
if self.minio is None:
return self._get_keyframes_from_local(video_id) if hasattr(self, '_get_keyframes_from_local') else []
try:
# Try both storage patterns:
# 1) {video_id}/keyframes/frame_*.jpg (legacy / some pipelines)
# 2) {video_id}/frame_*.jpg (save_keyframe_to_minio pattern)
logger.info(f"π Looking for keyframes in bucket '{self.bucket}' for video '{video_id}'")
objects = list(self.minio.list_objects(self.bucket, prefix=f"{video_id}/keyframes/", recursive=True))
if not objects:
# Fallback: flat storage path used by save_keyframe_to_minio
objects = list(self.minio.list_objects(self.bucket, prefix=f"{video_id}/", recursive=True))
logger.info(f"π¦ Found {len(objects)} objects in MinIO for keyframes")
keyframes_urls = []
for obj in objects:
if obj.object_name.endswith('.jpg'):
# Extract frame number and timestamp from filename
filename = obj.object_name.split('/')[-1] # e.g., "frame_000001.jpg"
frame_number = 0
timestamp = 0.0
try:
# Parse frame number from filename like "frame_000001.jpg"
if 'frame_' in filename:
frame_str = filename.split('_')[1].split('.')[0]
frame_number = int(frame_str)
# Estimate timestamp from frame number (assuming 30 fps)
timestamp = frame_number / 30.0
except (ValueError, IndexError):
pass
# Try to get metadata from MinIO object
try:
obj_stat = self.minio.stat_object(self.bucket, obj.object_name)
if obj_stat.metadata:
# Extract timestamp from metadata if available
if 'timestamp' in obj_stat.metadata:
try:
timestamp = float(obj_stat.metadata['timestamp'])
except:
pass
if 'frame_number' in obj_stat.metadata:
try:
frame_number = int(obj_stat.metadata['frame_number'])
except:
pass
except:
pass
# Generate presigned URL and API URL
presigned_url = self.get_keyframe_presigned_url(obj.object_name, expires=expires)
# Also provide API endpoint URL for direct serving
api_url = f"/api/minio/image/{self.bucket}/{obj.object_name}"
if presigned_url:
keyframes_urls.append({
'frame_number': frame_number,
'timestamp': timestamp,
'minio_path': obj.object_name,
'presigned_url': presigned_url,
'url': api_url, # Use API endpoint for better reliability
'api_url': api_url,
'filename': filename
})
# Sort by frame number
keyframes_urls.sort(key=lambda x: x['frame_number'])
logger.info(f"β
Generated {len(keyframes_urls)} presigned URLs for video {video_id} keyframes")
return keyframes_urls
except Exception as e:
logger.error(f"β Failed to get keyframes presigned URLs for video {video_id}: {e}")
return []
def create_keyframe(self, keyframe_doc: Dict[str, Any]) -> Optional[str]:
"""
Save keyframe metadata to MongoDB
Args:
keyframe_doc: Dictionary containing keyframe metadata:
- camera_id: Camera identifier (for live streams)
- video_id: Video identifier (for uploaded videos, optional)
- timestamp: Frame timestamp in seconds
- timestamp_ms: Frame timestamp in milliseconds
- frame_index: Frame number/index
- minio_path: Path to keyframe in MinIO
- objects_detected: List of detected objects
- behaviors_detected: List of detected behaviors
- motion_detected: Whether motion was detected
- motion_score: Motion detection score
- created_at: Creation timestamp
Returns:
MongoDB document ID or None
"""
try:
# Ensure required fields
if 'created_at' not in keyframe_doc:
keyframe_doc['created_at'] = datetime.utcnow()
# Convert numpy types if present
try:
from database.models import convert_numpy_types, prepare_for_mongodb
keyframe_doc = convert_numpy_types(keyframe_doc)
keyframe_doc = prepare_for_mongodb(keyframe_doc)
except ImportError:
# Fallback if models not available
pass
# Insert into MongoDB
result = self.collection.insert_one(keyframe_doc)
logger.info(f"β
Saved keyframe metadata to MongoDB: {keyframe_doc.get('minio_path', 'unknown')}")
return str(result.inserted_id)
except Exception as e:
logger.error(f"β Failed to save keyframe metadata to MongoDB: {e}")
import traceback
logger.error(traceback.format_exc())
return None
|