Spaces:
Running
Running
Anish
[Feature Added] > Video Pipeline. Have to update the confidence and ai_analysis param for images, where they won't appear.
0dab55c | import os | |
| import magic | |
| import cv2 | |
| import tempfile | |
| from fastapi import UploadFile | |
| from typing import Tuple | |
| from app.core.logger import logger | |
| class VideoValidator: | |
| ALLOWED_MIMES = {"video/mp4", "video/quicktime", "video/x-msvideo", "video/webm"} | |
| ALLOWED_EXTENSIONS = {".mp4", ".mov", ".avi", ".webm"} | |
| MAX_FILE_BYTES = 100 * 1024 * 1024 # 100 MB Limit | |
| def validate_stream(cls, file: UploadFile) -> Tuple[bool, str]: | |
| try: | |
| ext = os.path.splitext(file.filename)[1].lower() | |
| if ext not in cls.ALLOWED_EXTENSIONS: | |
| logger.warning(f"Security Alert: Blocked illegal file extension '{ext}'") | |
| return False, f"File extension {ext} is not allowed." | |
| file_head = file.file.read(2048) | |
| file.file.seek(0) | |
| actual_mime = magic.from_buffer(file_head, mime=True) | |
| if actual_mime not in cls.ALLOWED_MIMES: | |
| logger.warning(f"Security Alert: Blocked illegal MIME type '{actual_mime}' masquerading as {ext}") | |
| return False, f"Invalid file signature detected: {actual_mime}" | |
| bytes_read = 0 | |
| temp_path = None | |
| try: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_video: | |
| temp_path = temp_video.name | |
| while chunk := file.file.read(1024 * 1024): # Read 1 MB at a time | |
| bytes_read += len(chunk) | |
| if bytes_read > cls.MAX_FILE_BYTES: | |
| logger.warning(f"Security Alert: Blocked upload exceeding {cls.MAX_FILE_BYTES} bytes") | |
| return False, "File size exceeds the maximum allowed limit" | |
| temp_video.write(chunk) | |
| temp_video.flush() | |
| # File is safely closed by Python now, so OpenCV can gain read-access on Windows | |
| file.file.seek(0) | |
| cap = cv2.VideoCapture(temp_path) | |
| if not cap.isOpened(): | |
| logger.warning("Security Alert: OpenCV failed to open the video container. File may be corrupted or disguised.") | |
| cap.release() | |
| return False, "Video container is corrupted or unreadable." | |
| success, frame = cap.read() | |
| cap.release() | |
| if not success or frame is None: | |
| logger.warning("Security Alert: OpenCV opened container but failed to extract any visual frames.") | |
| return False, "Video contains no readable visual frames." | |
| finally: | |
| if temp_path and os.path.exists(temp_path): | |
| try: | |
| os.remove(temp_path) | |
| except: | |
| pass | |
| return True, "Valid" | |
| except Exception as e: | |
| logger.error(f"Critical error during video validation: {str(e)}") | |
| return False, "An unexpected error occured while processing the video stream." |