DetectifAI-Backend / video_compression.py
blacksinisterx's picture
fix: keyframe images, video clips, evidence images, live stream webcam+URL, remove demo mode
fd50325 verified
"""
Video Compression Module
This module handles:
- Video compression with configurable quality settings
- Resolution scaling
- Format conversion
- Compression statistics and reporting
"""
import os
import subprocess
import json
import cv2
import logging
from typing import Dict, Any, Tuple, Optional
from datetime import datetime
logger = logging.getLogger(__name__)
class VideoCompressor:
"""Handle video compression and format conversion"""
def __init__(self, config):
self.config = config
self.compressed_dir = os.path.join(config.output_base_dir, "compressed")
os.makedirs(self.compressed_dir, exist_ok=True)
# Verify FFmpeg availability
self.ffmpeg_available = self._check_ffmpeg()
def _check_ffmpeg(self) -> bool:
"""Check if FFmpeg is available"""
try:
result = subprocess.run(
['ffmpeg', '-version'],
capture_output=True,
text=True,
timeout=10
)
available = result.returncode == 0
logger.info(f"FFmpeg available: {available}")
return available
except (subprocess.TimeoutExpired, FileNotFoundError, Exception) as e:
logger.warning(f"FFmpeg not available: {e}")
return False
def compress_video(self, input_path: str, output_filename: str = None) -> str:
"""
Compress video with configured settings
Args:
input_path: Path to input video
output_filename: Optional custom output filename
Returns:
Path to compressed video
"""
if not os.path.exists(input_path):
raise FileNotFoundError(f"Input video not found: {input_path}")
# Generate output path
if output_filename is None:
base_name = os.path.splitext(os.path.basename(input_path))[0]
output_filename = f"{base_name}_compressed.{self.config.video_output_format}"
output_path = os.path.join(self.compressed_dir, output_filename)
logger.info(f"Compressing video: {input_path} -> {output_path}")
if self.ffmpeg_available:
return self._compress_with_ffmpeg(input_path, output_path)
else:
return self._compress_with_opencv(input_path, output_path)
def _compress_with_ffmpeg(self, input_path: str, output_path: str) -> str:
"""Compress video using FFmpeg"""
try:
# Build FFmpeg command
cmd = self._build_ffmpeg_command(input_path, output_path)
logger.info(f"Running FFmpeg command: {' '.join(cmd)}")
# Run compression
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=300 # 5 minute timeout
)
if result.returncode == 0:
if os.path.exists(output_path):
# Get compression statistics
stats = self._get_compression_stats(input_path, output_path)
logger.info(f"✅ Compression successful: {stats}")
return output_path
else:
logger.error("FFmpeg completed but output file not found")
return ""
else:
logger.error(f"FFmpeg error: {result.stderr}")
return ""
except subprocess.TimeoutExpired:
logger.error("FFmpeg compression timed out")
return ""
except Exception as e:
logger.error(f"FFmpeg compression failed: {e}")
return ""
def _build_ffmpeg_command(self, input_path: str, output_path: str) -> list:
"""Build FFmpeg command with configured parameters"""
cmd = ['ffmpeg', '-y', '-i', input_path]
# Video codec and quality settings
cmd.extend(['-c:v', 'libx264'])
cmd.extend(['-preset', self.config.compression_preset])
cmd.extend(['-crf', str(self.config.compression_crf)])
# Resolution scaling
if self.config.output_resolution != "original":
scale_filter = self._get_scale_filter()
if scale_filter:
cmd.extend(['-vf', scale_filter])
# Audio settings (copy or remove)
cmd.extend(['-c:a', 'aac', '-b:a', '128k'])
# Output optimizations
cmd.extend(['-movflags', '+faststart'])
cmd.append(output_path)
return cmd
def _get_scale_filter(self) -> str:
"""Get FFmpeg scale filter for resolution"""
resolution_map = {
"720p": "scale=1280:720:force_original_aspect_ratio=decrease,pad=1280:720:(ow-iw)/2:(oh-ih)/2",
"1080p": "scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2",
"480p": "scale=854:480:force_original_aspect_ratio=decrease,pad=854:480:(ow-iw)/2:(oh-ih)/2"
}
return resolution_map.get(self.config.output_resolution, "")
def _compress_with_opencv(self, input_path: str, output_path: str) -> str:
"""Fallback compression using OpenCV"""
logger.info("Using OpenCV for video compression (fallback)")
try:
cap = cv2.VideoCapture(input_path)
if not cap.isOpened():
logger.error(f"Cannot open input video: {input_path}")
return ""
# Get video properties
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# Adjust resolution if needed
output_width, output_height = self._get_output_dimensions(width, height)
# Set up video writer with H.264 codec for better browser compatibility
# Try multiple codecs in order of preference
codec_options = [
'avc1', # H.264 (best browser support)
'H264', # H.264 alternative
'X264', # H.264 alternative
'mp4v' # MPEG-4 fallback
]
out = None
for codec in codec_options:
try:
fourcc = cv2.VideoWriter_fourcc(*codec)
out = cv2.VideoWriter(output_path, fourcc, fps, (output_width, output_height))
if out.isOpened():
logger.info(f"Using codec: {codec}")
break
out.release()
except Exception as e:
logger.warning(f"Codec {codec} failed: {e}")
continue
if not out or not out.isOpened():
logger.error("Cannot create output video writer with any codec")
cap.release()
return ""
# Process frames
frame_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
# Resize frame if needed
if (output_width, output_height) != (width, height):
frame = cv2.resize(frame, (output_width, output_height))
out.write(frame)
frame_count += 1
# Progress logging
if frame_count % 100 == 0:
progress = (frame_count / total_frames) * 100
logger.info(f"Compression progress: {progress:.1f}%")
cap.release()
out.release()
if os.path.exists(output_path):
stats = self._get_compression_stats(input_path, output_path)
logger.info(f"✅ OpenCV compression successful: {stats}")
return output_path
else:
logger.error("OpenCV compression failed - output file not created")
return ""
except Exception as e:
logger.error(f"OpenCV compression failed: {e}")
return ""
def _get_output_dimensions(self, input_width: int, input_height: int) -> Tuple[int, int]:
"""Calculate output dimensions based on configuration"""
if self.config.output_resolution == "original":
return input_width, input_height
resolution_map = {
"720p": (1280, 720),
"1080p": (1920, 1080),
"480p": (854, 480)
}
target_width, target_height = resolution_map.get(
self.config.output_resolution,
(input_width, input_height)
)
# Maintain aspect ratio
aspect_ratio = input_width / input_height
if aspect_ratio > target_width / target_height:
# Width-constrained
output_width = target_width
output_height = int(target_width / aspect_ratio)
else:
# Height-constrained
output_height = target_height
output_width = int(target_height * aspect_ratio)
# Ensure even dimensions (required for some codecs)
output_width = (output_width // 2) * 2
output_height = (output_height // 2) * 2
return output_width, output_height
def _get_compression_stats(self, input_path: str, output_path: str) -> Dict[str, Any]:
"""Get compression statistics"""
try:
input_size = os.path.getsize(input_path)
output_size = os.path.getsize(output_path)
compression_ratio = input_size / output_size if output_size > 0 else 0
size_reduction = ((input_size - output_size) / input_size) * 100
# Get video properties
input_cap = cv2.VideoCapture(input_path)
output_cap = cv2.VideoCapture(output_path)
stats = {
'input_size_mb': round(input_size / (1024*1024), 2),
'output_size_mb': round(output_size / (1024*1024), 2),
'compression_ratio': round(compression_ratio, 2),
'size_reduction_percent': round(size_reduction, 1),
'input_resolution': f"{int(input_cap.get(cv2.CAP_PROP_FRAME_WIDTH))}x{int(input_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))}",
'output_resolution': f"{int(output_cap.get(cv2.CAP_PROP_FRAME_WIDTH))}x{int(output_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))}",
'input_fps': round(input_cap.get(cv2.CAP_PROP_FPS), 2),
'output_fps': round(output_cap.get(cv2.CAP_PROP_FPS), 2)
}
input_cap.release()
output_cap.release()
return stats
except Exception as e:
logger.error(f"Failed to get compression stats: {e}")
return {}
def batch_compress(self, input_directory: str, output_directory: str = None) -> Dict[str, str]:
"""
Compress multiple videos in a directory
Args:
input_directory: Directory containing videos to compress
output_directory: Optional output directory (uses compressed_dir by default)
Returns:
Dictionary mapping input paths to output paths
"""
if output_directory is None:
output_directory = self.compressed_dir
os.makedirs(output_directory, exist_ok=True)
# Find video files
video_extensions = {'.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv'}
video_files = []
for filename in os.listdir(input_directory):
if any(filename.lower().endswith(ext) for ext in video_extensions):
video_files.append(os.path.join(input_directory, filename))
logger.info(f"Found {len(video_files)} videos to compress")
results = {}
for video_path in video_files:
try:
base_name = os.path.splitext(os.path.basename(video_path))[0]
output_filename = f"{base_name}_compressed.{self.config.video_output_format}"
output_path = os.path.join(output_directory, output_filename)
compressed_path = self._compress_with_ffmpeg(video_path, output_path) if self.ffmpeg_available else self._compress_with_opencv(video_path, output_path)
if compressed_path:
results[video_path] = compressed_path
logger.info(f"✅ Compressed: {os.path.basename(video_path)}")
else:
logger.error(f"❌ Failed to compress: {os.path.basename(video_path)}")
except Exception as e:
logger.error(f"Error compressing {video_path}: {e}")
logger.info(f"Batch compression complete: {len(results)}/{len(video_files)} successful")
return results
def save_compression_report(self, compression_results: Dict[str, Any],
output_path: str) -> bool:
"""Save compression report to JSON file"""
try:
report = {
'compression_info': {
'timestamp': datetime.now().isoformat(),
'config': {
'output_resolution': self.config.output_resolution,
'compression_crf': self.config.compression_crf,
'compression_preset': self.config.compression_preset,
'video_output_format': self.config.video_output_format
}
},
'results': compression_results
}
with open(output_path, 'w') as f:
json.dump(report, f, indent=2)
logger.info(f"Compression report saved: {output_path}")
return True
except Exception as e:
logger.error(f"Failed to save compression report: {e}")
return False
def estimate_compression_time(self, input_path: str) -> Optional[float]:
"""Estimate compression time based on video properties"""
try:
cap = cv2.VideoCapture(input_path)
if not cap.isOpened():
return None
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps
cap.release()
# Rough estimation: 0.1-0.5x realtime depending on preset
preset_multipliers = {
'ultrafast': 0.1,
'fast': 0.2,
'medium': 0.3,
'slow': 0.5
}
multiplier = preset_multipliers.get(self.config.compression_preset, 0.3)
estimated_time = duration * multiplier
return estimated_time
except Exception as e:
logger.error(f"Failed to estimate compression time: {e}")
return None