| """
|
| Video Compression and Storage Service for DetectifAI
|
|
|
| This module handles video compression and MinIO storage for compressed videos.
|
| """
|
|
|
| import os
|
| import cv2
|
| import subprocess
|
| import logging
|
| from io import BytesIO
|
| from typing import Dict, Optional
|
| from datetime import timedelta
|
| from minio.error import S3Error
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
| class VideoCompressionService:
|
| """Service for compressing videos and storing in S3-compatible storage"""
|
|
|
| def __init__(self, db_manager, config=None):
|
| self._db_manager = db_manager
|
| self.bucket = db_manager.config.minio_video_bucket
|
| self.config = config
|
|
|
|
|
| self.output_resolution = "720p"
|
| self.compression_crf = 23
|
| self.compression_preset = "medium"
|
|
|
|
|
| self.ffmpeg_available = self._check_ffmpeg_available()
|
|
|
| @property
|
| def minio(self):
|
| """Lazy access to S3 storage β tolerates unavailable storage"""
|
| return self._db_manager.minio_client
|
|
|
| def _check_ffmpeg_available(self) -> bool:
|
| """Check if FFmpeg is available on the system"""
|
| try:
|
| result = subprocess.run(
|
| ['ffmpeg', '-version'],
|
| capture_output=True,
|
| text=True,
|
| timeout=5
|
| )
|
| return result.returncode == 0
|
| except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError):
|
| return False
|
|
|
| def compress_and_store(self, input_path: str, video_id: str) -> Optional[Dict]:
|
| """Compress video and store in MinIO and locally"""
|
| try:
|
|
|
| local_dir = os.path.join("video_processing_outputs", "compressed", video_id)
|
| os.makedirs(local_dir, exist_ok=True)
|
| local_path = os.path.join(local_dir, "video.mp4")
|
|
|
|
|
| from io import BytesIO
|
| compressed_buffer = BytesIO()
|
|
|
|
|
| if self.ffmpeg_available:
|
| success = self._compress_with_ffmpeg_to_buffer(input_path, compressed_buffer)
|
| if not success:
|
| logger.warning("FFmpeg compression failed, falling back to OpenCV")
|
| compressed_buffer.seek(0)
|
| success = self._compress_with_opencv_to_buffer(input_path, compressed_buffer)
|
| else:
|
| logger.info("FFmpeg not available, using OpenCV compression")
|
| success = self._compress_with_opencv_to_buffer(input_path, compressed_buffer)
|
|
|
| if not success:
|
| logger.error("Both compression methods failed")
|
| return None
|
|
|
|
|
| compressed_buffer.seek(0)
|
| compressed_data = compressed_buffer.getvalue()
|
| compressed_size = len(compressed_data)
|
|
|
|
|
| with open(local_path, 'wb') as f:
|
| f.write(compressed_data)
|
| logger.info(f"β
Video saved locally: {local_path}")
|
|
|
|
|
| original_size = os.path.getsize(input_path)
|
| compression_ratio = ((original_size - compressed_size) / original_size) * 100
|
|
|
|
|
| minio_path = None
|
| if self.minio is not None:
|
| try:
|
| minio_path = f"compressed/{video_id}/video.mp4"
|
| compressed_buffer.seek(0)
|
| self.minio.put_object(
|
| self.bucket,
|
| minio_path,
|
| compressed_buffer,
|
| length=compressed_size,
|
| content_type='video/mp4'
|
| )
|
| except Exception as s3_err:
|
| logger.warning(f"β οΈ S3 upload skipped for compressed video: {s3_err}")
|
| minio_path = None
|
| else:
|
| logger.info("S3 storage unavailable β compressed video stored locally only")
|
|
|
| result = {
|
| 'success': True,
|
| 'minio_path': minio_path,
|
| 'local_path': local_path,
|
| 'original_size': original_size,
|
| 'compressed_size': compressed_size,
|
| 'compression_ratio': round(compression_ratio, 2),
|
| 'output_resolution': self.output_resolution
|
| }
|
|
|
| logger.info(f"β
Video compressed and stored: {compression_ratio:.1f}% reduction")
|
| return result
|
|
|
| except Exception as e:
|
| logger.error(f"β Compression and storage failed: {e}")
|
| return None
|
|
|
| def get_compressed_video_presigned_url(self, video_id: str, expires: timedelta = timedelta(hours=1)) -> str:
|
| """Generate presigned URL for compressed video access"""
|
| if self.minio is None:
|
| return None
|
| try:
|
| minio_path = f"compressed/{video_id}/video.mp4"
|
| return self.minio.presigned_get_object(self.bucket, minio_path, expires=expires)
|
| except S3Error as e:
|
| logger.error(f"β Failed to generate presigned URL for compressed video: {e}")
|
| return None
|
|
|
| def _compress_with_ffmpeg(self, input_path: str, output_path: str) -> bool:
|
| """Compress video using FFmpeg"""
|
| try:
|
|
|
| cmd = [
|
| 'ffmpeg',
|
| '-i', input_path,
|
| '-c:v', 'libx264',
|
| '-crf', str(self.compression_crf),
|
| '-preset', self.compression_preset,
|
| '-movflags', '+faststart',
|
| '-y'
|
| ]
|
|
|
|
|
| if self.output_resolution == "720p":
|
| cmd.extend(['-vf', "scale='min(1280,iw)':'min(720,ih)':force_original_aspect_ratio=decrease,pad=ceil(iw/2)*2:ceil(ih/2)*2"])
|
| elif self.output_resolution == "480p":
|
| cmd.extend(['-vf', "scale='min(854,iw)':'min(480,ih)':force_original_aspect_ratio=decrease,pad=ceil(iw/2)*2:ceil(ih/2)*2"])
|
|
|
| cmd.append(output_path)
|
|
|
|
|
| result = subprocess.run(
|
| cmd,
|
| capture_output=True,
|
| text=True
|
| )
|
|
|
| if result.returncode == 0 and os.path.exists(output_path):
|
| logger.info("β
FFmpeg compression successful")
|
| return True
|
| else:
|
| logger.error(f"FFmpeg error: {result.stderr}")
|
| return False
|
|
|
| except Exception as e:
|
| logger.error(f"FFmpeg compression failed: {e}")
|
| return False
|
|
|
| def _compress_with_ffmpeg_to_buffer(self, input_path: str, output_buffer: BytesIO) -> bool:
|
| """Compress video using FFmpeg with temporary file (more reliable than pipe)"""
|
| import tempfile
|
| try:
|
|
|
| with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_file:
|
| temp_path = temp_file.name
|
|
|
|
|
| cmd = [
|
| 'ffmpeg',
|
| '-i', input_path,
|
| '-c:v', 'libx264',
|
| '-crf', str(self.compression_crf),
|
| '-preset', self.compression_preset,
|
| '-movflags', '+faststart',
|
| '-y'
|
| ]
|
|
|
|
|
| if self.output_resolution == "720p":
|
| cmd.extend(['-vf', "scale='min(1280,iw)':'min(720,ih)':force_original_aspect_ratio=decrease,pad=ceil(iw/2)*2:ceil(ih/2)*2"])
|
| elif self.output_resolution == "480p":
|
| cmd.extend(['-vf', "scale='min(854,iw)':'min(480,ih)':force_original_aspect_ratio=decrease,pad=ceil(iw/2)*2:ceil(ih/2)*2"])
|
|
|
|
|
| cmd.append(temp_path)
|
|
|
|
|
| result = subprocess.run(
|
| cmd,
|
| capture_output=True,
|
| text=True,
|
| timeout=300
|
| )
|
|
|
| if result.returncode == 0 and os.path.exists(temp_path):
|
|
|
| with open(temp_path, 'rb') as f:
|
| output_buffer.write(f.read())
|
|
|
|
|
| os.unlink(temp_path)
|
|
|
| logger.info("β
FFmpeg compression to buffer successful")
|
| return True
|
| else:
|
|
|
| if os.path.exists(temp_path):
|
| os.unlink(temp_path)
|
| logger.error(f"FFmpeg error: {result.stderr}")
|
| return False
|
|
|
| except Exception as e:
|
| logger.error(f"FFmpeg compression to buffer failed: {e}")
|
| return False
|
|
|
| def _compress_with_opencv_to_buffer(self, input_path: str, output_buffer: BytesIO) -> bool:
|
| """Fallback compression using OpenCV directly to a buffer"""
|
| try:
|
|
|
| cap = cv2.VideoCapture(input_path)
|
| if not cap.isOpened():
|
| logger.error(f"Cannot open input video: {input_path}")
|
| return False
|
|
|
|
|
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
| fps = cap.get(cv2.CAP_PROP_FPS)
|
|
|
|
|
| if self.output_resolution == "720p":
|
| new_height = 720
|
| new_width = int((width / height) * new_height)
|
| elif self.output_resolution == "480p":
|
| new_height = 480
|
| new_width = int((width / height) * new_height)
|
| else:
|
| new_width, new_height = width, height
|
|
|
|
|
| import tempfile
|
| with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_file:
|
| temp_path = temp_file.name
|
|
|
|
|
|
|
| codecs_to_try = [
|
| ('avc1', 'H.264'),
|
| ('h264', 'H.264'),
|
| ('X264', 'H.264'),
|
| ('mp4v', 'MPEG-4')
|
| ]
|
|
|
| out = None
|
| used_codec = None
|
|
|
| for fourcc_code, name in codecs_to_try:
|
| try:
|
| fourcc = cv2.VideoWriter_fourcc(*fourcc_code)
|
| out = cv2.VideoWriter(temp_path, fourcc, fps, (new_width, new_height))
|
| if out.isOpened():
|
| used_codec = name
|
| logger.info(f"β
Using codec: {name} ({fourcc_code})")
|
| break
|
| out.release()
|
| except Exception as e:
|
| logger.debug(f"Codec {fourcc_code} failed: {e}")
|
|
|
| if not out or not out.isOpened():
|
| logger.error("β No suitable video codec found")
|
| return False
|
|
|
| while True:
|
| ret, frame = cap.read()
|
| if not ret:
|
| break
|
|
|
|
|
| if (new_width, new_height) != (width, height):
|
| frame = cv2.resize(frame, (new_width, new_height))
|
|
|
| out.write(frame)
|
|
|
| cap.release()
|
| out.release()
|
|
|
|
|
| if os.path.exists(temp_path):
|
| with open(temp_path, 'rb') as f:
|
| output_buffer.write(f.read())
|
| os.unlink(temp_path)
|
| logger.info("β
OpenCV compression to buffer successful")
|
| return True
|
| else:
|
| logger.error("OpenCV compression failed - output file not created")
|
| return False
|
|
|
| except Exception as e:
|
| logger.error(f"OpenCV compression to buffer failed: {e}")
|
| return False
|
|
|
| def _compress_with_opencv(self, input_path: str, output_path: str) -> bool:
|
| """Fallback compression using OpenCV"""
|
| try:
|
|
|
| cap = cv2.VideoCapture(input_path)
|
| if not cap.isOpened():
|
| logger.error(f"Cannot open input video: {input_path}")
|
| return False
|
|
|
|
|
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
| fps = cap.get(cv2.CAP_PROP_FPS)
|
|
|
|
|
| if self.output_resolution == "720p":
|
| new_height = 720
|
| new_width = int((width / height) * new_height)
|
| elif self.output_resolution == "480p":
|
| new_height = 480
|
| new_width = int((width / height) * new_height)
|
| else:
|
| new_width, new_height = width, height
|
|
|
|
|
| fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
| out = cv2.VideoWriter(
|
| output_path,
|
| fourcc,
|
| fps,
|
| (new_width, new_height)
|
| )
|
|
|
| while True:
|
| ret, frame = cap.read()
|
| if not ret:
|
| break
|
|
|
|
|
| if (new_width, new_height) != (width, height):
|
| frame = cv2.resize(frame, (new_width, new_height))
|
|
|
| out.write(frame)
|
|
|
| cap.release()
|
| out.release()
|
|
|
| if os.path.exists(output_path):
|
| logger.info("β
OpenCV compression successful")
|
| return True
|
| else:
|
| logger.error("OpenCV compression failed - output file not created")
|
| return False
|
|
|
| except Exception as e:
|
| logger.error(f"OpenCV compression failed: {e}")
|
| return False |