File size: 19,257 Bytes
fd50325 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 | """
Video Captioning Integrator for DetectifAI
This module integrates video captioning into the video processing pipeline.
It generates neutral, policy-safe captions from keyframes and stores them with semantic embeddings.
"""
import os
import sys
import logging
from typing import List, Dict, Any
from datetime import datetime
from PIL import Image
import cv2
# Import from video_captioning package
try:
# Try direct import from package
from video_captioning import CaptioningService, Frame, CaptioningConfig
except ImportError:
# Fallback to explicit path
from video_captioning.video_captioning.captioning_service import CaptioningService
from video_captioning.video_captioning.models import Frame
from video_captioning.video_captioning.config import CaptioningConfig
logger = logging.getLogger(__name__)
class VideoCaptioningIntegrator:
"""Integration layer between video captioning and DetectifAI pipeline"""
def __init__(self, config, db_manager=None):
self.config = config
self.db_manager = db_manager
self.enabled = getattr(config, 'enable_video_captioning', False)
logger.info(f"π¬ Initializing VideoCaptioningIntegrator - enabled: {self.enabled}")
# Initialize captioning service if enabled
self.captioning_service = None
if self.enabled:
try:
# Create captioning configuration from DetectifAI config
captioning_config = CaptioningConfig(
vision_model_name=getattr(config, 'captioning_vision_model', "Salesforce/blip-image-captioning-base"),
vision_device=getattr(config, 'captioning_device', "cpu"),
vision_batch_size=getattr(config, 'captioning_batch_size', 4),
embedding_model_name=getattr(config, 'captioning_embedding_model', "sentence-transformers/all-MiniLM-L6-v2"),
db_connection_string=getattr(config, 'captioning_db_path', None),
vector_db_path=getattr(config, 'captioning_vector_db_path', "./video_captioning_store"),
enable_async_processing=getattr(config, 'captioning_async', True),
log_rejected_captions=True
)
# Initialize with MongoDB support
self.captioning_service = CaptioningService(captioning_config, db_manager=db_manager)
logger.info("β
Video captioning service initialized successfully (MongoDB + FAISS)")
except Exception as e:
logger.error(f"β Failed to initialize video captioning service: {e}")
self.enabled = False
else:
logger.info("Video captioning disabled in config")
def _download_keyframe_from_minio(self, bucket, minio_path, local_path):
"""Download a keyframe from MinIO to local path"""
try:
if not self.db_manager or not self.db_manager.minio_client:
logger.error("MinIO client not available")
return False
# Add timeout to prevent hanging
import socket
original_timeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(30) # 30 second timeout
try:
self.db_manager.minio_client.fget_object(bucket, minio_path, local_path)
logger.debug(f"β
Downloaded {minio_path} to {local_path}")
return True
finally:
socket.setdefaulttimeout(original_timeout)
except Exception as e:
logger.error(f"β Failed to download {minio_path} from MinIO: {e}")
return False
def process_keyframes_with_captioning(self, keyframes: List, video_id: str = None) -> Dict[str, Any]:
"""
Process keyframes to generate captions
Args:
keyframes: List of KeyframeResult objects
video_id: Optional video identifier
Returns:
Dictionary containing captioning results
"""
if not self.enabled or not self.captioning_service:
logger.info("π« Video captioning disabled, skipping...")
return {
'enabled': False,
'total_captions': 0,
'captions': []
}
logger.info(f"π¬ Starting video captioning on {len(keyframes)} keyframes")
# Add overall timeout for the entire captioning process
import signal
def timeout_handler(signum, frame):
raise TimeoutError("Video captioning exceeded maximum time limit")
# Set 5 minute timeout for entire captioning process
# Note: signal.alarm only works on Unix, so we'll use a different approach
start_time = datetime.now()
max_processing_time = 300 # 5 minutes in seconds
# Create temporary directory for downloaded keyframes
import tempfile
temp_dir = tempfile.mkdtemp(prefix="keyframes_")
logger.info(f"π Created temporary directory for keyframes: {temp_dir}")
try:
# Get keyframe bucket from db_manager
keyframe_bucket = None
if self.db_manager and hasattr(self.db_manager, 'keyframe_repo'):
keyframe_bucket = self.db_manager.keyframe_repo.bucket
elif self.db_manager:
# Fallback: try to get from config or use default
keyframe_bucket = getattr(self.db_manager, 'keyframe_bucket', 'detectifai-keyframes')
else:
keyframe_bucket = 'detectifai-keyframes' # Default bucket name
logger.info(f"πͺ£ Using MinIO bucket: {keyframe_bucket}")
# Convert keyframes to Frame objects
frames = []
downloaded_files = [] # Track files for cleanup
max_keyframes_to_process = 10 # Reduced limit for faster processing
logger.info(f"π Processing up to {min(len(keyframes), max_keyframes_to_process)} keyframes (limited for performance)")
for idx, keyframe in enumerate(keyframes[:max_keyframes_to_process]): # Limit processing
try:
# Debug: Log keyframe structure
logger.debug(f"Processing keyframe {idx}: type={type(keyframe)}")
# Try different keyframe structures
frame_path = None
timestamp = None
frame_index = idx
minio_path = None
minio_bucket_override = None
# Check for different attribute names
if hasattr(keyframe, 'frame_path'):
frame_path = keyframe.frame_path
elif hasattr(keyframe, 'path'):
frame_path = keyframe.path
elif hasattr(keyframe, 'frame_data') and hasattr(keyframe.frame_data, 'frame_path'):
frame_path = keyframe.frame_data.frame_path
# Check for MinIO metadata in keyframe object (added by database_video_service)
if hasattr(keyframe, 'minio_path'):
minio_path = keyframe.minio_path
minio_bucket_override = getattr(keyframe, 'minio_bucket', None)
elif hasattr(keyframe, 'frame_data'):
if hasattr(keyframe.frame_data, 'minio_path'):
minio_path = keyframe.frame_data.minio_path
minio_bucket_override = getattr(keyframe.frame_data, 'minio_bucket', None)
# Get timestamp
if hasattr(keyframe, 'timestamp'):
timestamp = keyframe.timestamp
elif hasattr(keyframe, 'frame_data') and hasattr(keyframe.frame_data, 'timestamp'):
timestamp = keyframe.frame_data.timestamp
else:
timestamp = 0.0
# Get frame index
if hasattr(keyframe, 'frame_index'):
frame_index = keyframe.frame_index
elif hasattr(keyframe, 'frame_number'):
frame_index = keyframe.frame_number
elif hasattr(keyframe, 'frame_data') and hasattr(keyframe.frame_data, 'frame_number'):
frame_index = keyframe.frame_data.frame_number
# Check if frame_path is a MinIO path (doesn't exist locally)
if frame_path and not os.path.exists(frame_path):
logger.debug(f"β οΈ Frame path doesn't exist locally: {frame_path}")
# Use MinIO path from keyframe metadata if available
if not minio_path and video_id:
# Fallback: construct MinIO path from video_id and frame_index
minio_path = f"{video_id}/keyframes/frame_{frame_index:06d}.jpg"
if minio_path:
logger.debug(f"π Attempting to download from MinIO: {minio_path}")
# Use bucket from keyframe metadata or default
bucket_to_use = minio_bucket_override or keyframe_bucket
# Download from MinIO to temp directory
local_temp_path = os.path.join(temp_dir, f"frame_{frame_index:06d}.jpg")
if self._download_keyframe_from_minio(bucket_to_use, minio_path, local_temp_path):
frame_path = local_temp_path
downloaded_files.append(local_temp_path)
logger.debug(f"β
Downloaded keyframe to: {frame_path}")
else:
logger.warning(f"β Failed to download keyframe from MinIO: {minio_path}")
continue
else:
logger.warning(f"β οΈ No MinIO path available and no video_id to construct path")
continue
# Load image from keyframe path
if frame_path and os.path.exists(frame_path):
logger.debug(f"πΈ Loading image from: {frame_path}")
pil_image = Image.open(frame_path)
# Create Frame object
frame = Frame(
frame_id=f"frame_{frame_index:06d}",
timestamp=datetime.fromtimestamp(timestamp) if timestamp else datetime.now(),
video_id=video_id or "unknown",
image=pil_image
)
frames.append(frame)
logger.debug(f"β
Successfully converted keyframe {idx}")
else:
logger.warning(f"β οΈ Keyframe {idx} has no valid frame_path or file doesn't exist: {frame_path}")
except Exception as e:
logger.error(f"β Error converting keyframe {idx}: {e}")
import traceback
logger.error(traceback.format_exc())
continue
if not frames:
logger.warning("β οΈ No frames could be converted for captioning")
logger.warning(f"Keyframe sample: {keyframes[0] if keyframes else 'No keyframes'}")
return {
'enabled': True,
'total_captions': 0,
'captions': [],
'errors': ['No frames could be converted - check keyframe structure or MinIO access']
}
logger.info(f"π Processing {len(frames)} frames for captioning...")
logger.info(f"β±οΈ Time elapsed: {(datetime.now() - start_time).total_seconds():.1f}s")
# Check if we've exceeded time limit before processing
elapsed = (datetime.now() - start_time).total_seconds()
if elapsed > max_processing_time:
logger.error(f"β Exceeded time limit before caption generation: {elapsed:.1f}s")
return {
'enabled': True,
'total_captions': 0,
'captions': [],
'errors': [f'Timeout: Exceeded {max_processing_time}s before caption generation']
}
# Process frames through captioning pipeline with error handling
try:
logger.info("π€ Calling captioning service to process frames...")
result = self.captioning_service.process_frames(frames)
logger.info(f"β
Captioning service completed in {(datetime.now() - start_time).total_seconds():.1f}s")
except Exception as caption_error:
logger.error(f"β Caption generation failed: {caption_error}")
import traceback
logger.error(traceback.format_exc())
return {
'enabled': True,
'total_captions': 0,
'captions': [],
'errors': [f'Caption generation error: {str(caption_error)}']
}
# Extract caption records and print debugging info
captions = []
logger.info("=" * 80)
logger.info("π¬ VIDEO CAPTIONING RESULTS - KEYFRAME CAPTIONS")
logger.info("=" * 80)
for idx, record in enumerate(result.caption_records, 1):
caption_data = {
'caption_id': record.caption_id,
'frame_id': record.frame_id,
'timestamp': record.timestamp.isoformat(),
'raw_caption': record.raw_caption,
'sanitized_caption': record.sanitized_caption,
'created_at': record.created_at.isoformat()
}
captions.append(caption_data)
# DEBUG: Print caption for each keyframe
logger.info(f"\nπΈ Keyframe #{idx} - {record.frame_id}")
logger.info(f" β±οΈ Timestamp: {record.timestamp}")
logger.info(f" π€ Raw Caption: {record.raw_caption}")
logger.info(f" β¨ Sanitized Caption: {record.sanitized_caption}")
logger.info(f" π Caption ID: {record.caption_id}")
# Also print to console for immediate visibility
print(f"\n{'='*60}")
print(f"πΈ Keyframe #{idx}: {record.frame_id}")
print(f"β±οΈ Time: {record.timestamp}")
print(f"π€ Caption: {record.sanitized_caption}")
print(f"{'='*60}")
logger.info("\n" + "=" * 80)
logger.info(f"β
Video captioning complete: {len(captions)} captions generated and saved to MongoDB")
logger.info(f"πΎ Embeddings saved to FAISS vector database")
logger.info("=" * 80)
return {
'enabled': True,
'total_captions': len(captions),
'captions': captions,
'processing_time': result.processing_time,
'errors': result.errors
}
except Exception as e:
logger.error(f"β Video captioning failed: {e}", exc_info=True)
return {
'enabled': True,
'total_captions': 0,
'captions': [],
'errors': [str(e)]
}
finally:
# Cleanup: Remove temporary directory and downloaded files
try:
import shutil
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
logger.info(f"π§Ή Cleaned up temporary directory: {temp_dir}")
except Exception as e:
logger.warning(f"β οΈ Failed to cleanup temporary directory: {e}")
def search_captions(self, query: str, video_id: str = None, top_k: int = 5) -> List[Dict[str, Any]]:
"""
Search captions using semantic similarity
Args:
query: Search query text
video_id: Optional video ID to filter results
top_k: Number of results to return
Returns:
List of matching caption records with similarity scores
"""
if not self.enabled or not self.captioning_service:
return []
try:
results = self.captioning_service.search_captions(query, top_k=top_k)
# Filter by video_id if provided
if video_id:
results = [r for r in results if r.get('video_id') == video_id]
return results
except Exception as e:
logger.error(f"Caption search failed: {e}")
return []
def get_video_captions(self, video_id: str) -> List[Dict[str, Any]]:
"""
Get all captions for a specific video
Args:
video_id: Video identifier
Returns:
List of caption records
"""
if not self.enabled or not self.captioning_service:
return []
try:
return self.captioning_service.get_video_captions(video_id)
except Exception as e:
logger.error(f"Failed to get video captions: {e}")
return []
def get_statistics(self) -> Dict[str, Any]:
"""Get captioning service statistics"""
if not self.enabled or not self.captioning_service:
return {'enabled': False}
try:
stats = self.captioning_service.get_statistics()
stats['enabled'] = True
return stats
except Exception as e:
logger.error(f"Failed to get statistics: {e}")
return {'enabled': True, 'error': str(e)}
|