Spaces:
Running
Running
File size: 3,138 Bytes
96d596c 0dab55c 96d596c 0dab55c 96d596c 0dab55c 96d596c 0dab55c 96d596c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 | import os
import magic
import cv2
import tempfile
from fastapi import UploadFile
from typing import Tuple
from app.core.logger import logger
class VideoValidator:
ALLOWED_MIMES = {"video/mp4", "video/quicktime", "video/x-msvideo", "video/webm"}
ALLOWED_EXTENSIONS = {".mp4", ".mov", ".avi", ".webm"}
MAX_FILE_BYTES = 100 * 1024 * 1024 # 100 MB Limit
@classmethod
def validate_stream(cls, file: UploadFile) -> Tuple[bool, str]:
try:
ext = os.path.splitext(file.filename)[1].lower()
if ext not in cls.ALLOWED_EXTENSIONS:
logger.warning(f"Security Alert: Blocked illegal file extension '{ext}'")
return False, f"File extension {ext} is not allowed."
file_head = file.file.read(2048)
file.file.seek(0)
actual_mime = magic.from_buffer(file_head, mime=True)
if actual_mime not in cls.ALLOWED_MIMES:
logger.warning(f"Security Alert: Blocked illegal MIME type '{actual_mime}' masquerading as {ext}")
return False, f"Invalid file signature detected: {actual_mime}"
bytes_read = 0
temp_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_video:
temp_path = temp_video.name
while chunk := file.file.read(1024 * 1024): # Read 1 MB at a time
bytes_read += len(chunk)
if bytes_read > cls.MAX_FILE_BYTES:
logger.warning(f"Security Alert: Blocked upload exceeding {cls.MAX_FILE_BYTES} bytes")
return False, "File size exceeds the maximum allowed limit"
temp_video.write(chunk)
temp_video.flush()
# File is safely closed by Python now, so OpenCV can gain read-access on Windows
file.file.seek(0)
cap = cv2.VideoCapture(temp_path)
if not cap.isOpened():
logger.warning("Security Alert: OpenCV failed to open the video container. File may be corrupted or disguised.")
cap.release()
return False, "Video container is corrupted or unreadable."
success, frame = cap.read()
cap.release()
if not success or frame is None:
logger.warning("Security Alert: OpenCV opened container but failed to extract any visual frames.")
return False, "Video contains no readable visual frames."
finally:
if temp_path and os.path.exists(temp_path):
try:
os.remove(temp_path)
except:
pass
return True, "Valid"
except Exception as e:
logger.error(f"Critical error during video validation: {str(e)}")
return False, "An unexpected error occured while processing the video stream." |