import os import magic import cv2 import tempfile from fastapi import UploadFile from typing import Tuple from app.core.logger import logger class VideoValidator: ALLOWED_MIMES = {"video/mp4", "video/quicktime", "video/x-msvideo", "video/webm"} ALLOWED_EXTENSIONS = {".mp4", ".mov", ".avi", ".webm"} MAX_FILE_BYTES = 100 * 1024 * 1024 # 100 MB Limit @classmethod def validate_stream(cls, file: UploadFile) -> Tuple[bool, str]: try: ext = os.path.splitext(file.filename)[1].lower() if ext not in cls.ALLOWED_EXTENSIONS: logger.warning(f"Security Alert: Blocked illegal file extension '{ext}'") return False, f"File extension {ext} is not allowed." file_head = file.file.read(2048) file.file.seek(0) actual_mime = magic.from_buffer(file_head, mime=True) if actual_mime not in cls.ALLOWED_MIMES: logger.warning(f"Security Alert: Blocked illegal MIME type '{actual_mime}' masquerading as {ext}") return False, f"Invalid file signature detected: {actual_mime}" bytes_read = 0 temp_path = None try: with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_video: temp_path = temp_video.name while chunk := file.file.read(1024 * 1024): # Read 1 MB at a time bytes_read += len(chunk) if bytes_read > cls.MAX_FILE_BYTES: logger.warning(f"Security Alert: Blocked upload exceeding {cls.MAX_FILE_BYTES} bytes") return False, "File size exceeds the maximum allowed limit" temp_video.write(chunk) temp_video.flush() # File is safely closed by Python now, so OpenCV can gain read-access on Windows file.file.seek(0) cap = cv2.VideoCapture(temp_path) if not cap.isOpened(): logger.warning("Security Alert: OpenCV failed to open the video container. File may be corrupted or disguised.") cap.release() return False, "Video container is corrupted or unreadable." success, frame = cap.read() cap.release() if not success or frame is None: logger.warning("Security Alert: OpenCV opened container but failed to extract any visual frames.") return False, "Video contains no readable visual frames." finally: if temp_path and os.path.exists(temp_path): try: os.remove(temp_path) except: pass return True, "Valid" except Exception as e: logger.error(f"Critical error during video validation: {str(e)}") return False, "An unexpected error occured while processing the video stream."