Spotix-API / backend /app /utils /file_handler.py
Anish
[Feature Added] > Video Pipeline. Have to update the confidence and ai_analysis param for images, where they won't appear.
0dab55c
from app.core.storage import active_storage
import os
import uuid
import magic
from PIL import Image, UnidentifiedImageError
from fastapi import UploadFile, HTTPException
from app.security.video_validator import VideoValidator
UPLOAD_DIR = "uploads"
MAX_FILE_SIZE = 50 * 1024 * 1024
Image.MAX_IMAGE_PIXELS = 64_000_000
ALLOWED_TYPES = {
"image/png",
"image/jpeg",
"video/mp4"
}
def validate_file(file: UploadFile):
ext = file.filename.split(".")[-1].lower() if "." in file.filename else ""
if ext in ["mp4", "mov", "avi", "webm"]:
is_valid, validation_reason = VideoValidator.validate_stream(file)
if not is_valid:
raise HTTPException(status_code=400, detail=validation_reason)
file.file.seek(0) # CRITICAL: Reset the pointer after Video validation reads it!
return file.size or 0
file.file.seek(0)
file_bytes = file.file.read()
size = len(file_bytes)
if size > MAX_FILE_SIZE:
raise HTTPException(status_code=400, detail="File size too large")
actual_mime = magic.from_buffer(file_bytes, mime=True)
if "image" not in actual_mime:
raise HTTPException(status_code=400, detail=f"Invalid image signature. Detected: {actual_mime}")
try:
file.file.seek(0)
with Image.open(file.file) as img:
img.verify()
width, height = img.size
if width > 8000 or height > 8000:
raise HTTPException(status_code=400, detail="Image resolution exceeds 8000x8000 limit.")
except Image.DecompressionBombError:
raise HTTPException(status_code=400, detail="Decompression Bomb detected! Image rejected")
file.file.seek(0)
return size
def generate_unique_filename(filename: str) -> str:
ext = filename.split(".")[-1]
unique_name = f"{uuid.uuid4()}.{ext}"
return unique_name
def save_file(file: UploadFile, filename: str):
return active_storage.save(file, filename)