File size: 3,138 Bytes
96d596c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0dab55c
 
 
 
 
 
 
 
 
 
 
 
 
96d596c
0dab55c
96d596c
0dab55c
96d596c
 
 
 
 
 
 
 
 
 
 
 
 
0dab55c
 
 
 
 
 
 
96d596c
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import os
import magic
import cv2
import tempfile
from fastapi import UploadFile
from typing import Tuple
from app.core.logger import logger

class VideoValidator:
    ALLOWED_MIMES = {"video/mp4", "video/quicktime", "video/x-msvideo", "video/webm"}
    ALLOWED_EXTENSIONS = {".mp4", ".mov", ".avi", ".webm"}
    MAX_FILE_BYTES = 100 * 1024 * 1024 # 100 MB Limit

    @classmethod
    def validate_stream(cls, file: UploadFile) -> Tuple[bool, str]:
        try:
            ext = os.path.splitext(file.filename)[1].lower()
            if ext not in cls.ALLOWED_EXTENSIONS:
                logger.warning(f"Security Alert: Blocked illegal file extension '{ext}'")
                return False, f"File extension {ext} is not allowed."

            file_head = file.file.read(2048)
            file.file.seek(0)

            actual_mime = magic.from_buffer(file_head, mime=True)
            if actual_mime not in cls.ALLOWED_MIMES:
                logger.warning(f"Security Alert: Blocked illegal MIME type '{actual_mime}' masquerading as {ext}")
                return False, f"Invalid file signature detected: {actual_mime}"
            
            bytes_read = 0

            temp_path = None
            try:
                with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_video:
                    temp_path = temp_video.name
                    while chunk := file.file.read(1024 * 1024): # Read 1 MB at a time
                        bytes_read += len(chunk)
                        if bytes_read > cls.MAX_FILE_BYTES:
                            logger.warning(f"Security Alert: Blocked upload exceeding {cls.MAX_FILE_BYTES} bytes")
                            return False, "File size exceeds the maximum allowed limit"

                        temp_video.write(chunk)
                    
                    temp_video.flush()
                
                # File is safely closed by Python now, so OpenCV can gain read-access on Windows
                file.file.seek(0)
                cap = cv2.VideoCapture(temp_path)

                if not cap.isOpened():
                    logger.warning("Security Alert: OpenCV failed to open the video container. File may be corrupted or disguised.")
                    cap.release()
                    return False, "Video container is corrupted or unreadable."

                success, frame = cap.read()
                cap.release()

                if not success or frame is None:
                    logger.warning("Security Alert: OpenCV opened container but failed to extract any visual frames.")
                    return False, "Video contains no readable visual frames."
            
            finally:
                if temp_path and os.path.exists(temp_path):
                    try:
                        os.remove(temp_path)
                    except:
                        pass
            
            return True, "Valid"
        
        except Exception as e:
            logger.error(f"Critical error during video validation: {str(e)}")
            return False, "An unexpected error occured while processing the video stream."