File size: 3,800 Bytes
0baa056
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
"""
services/validators.py
Four-layer upload protection gate.

Call order enforced by the route handler:
  1. validate_file_type()       β€” reject non-images early (before reading bytes)
  2. validate_file_size()       β€” reject oversized uploads
  3. validate_image_dimensions() β€” reject resolution bombs
  4. validate_image_integrity()  β€” reject corrupted files
"""

from io import BytesIO
from typing import Tuple

from fastapi import HTTPException, UploadFile
from PIL import Image, UnidentifiedImageError

# ── Security: Protect against decompression bomb attacks ──────────────────────
# Limit image to 50 megapixels to prevent memory exhaustion
Image.MAX_IMAGE_PIXELS = 50_000_000

from config.constants import (
    ALLOWED_TYPES,
    INVALID_FILE_TYPE,
    FILE_TOO_LARGE,
    INVALID_DIMENSIONS,
    INVALID_IMAGE,
)
from config.settings import MAX_FILE_SIZE, MAX_WIDTH, MAX_HEIGHT


# ── 1. MIME type ───────────────────────────────────────────────────────────────

def validate_file_type(file: UploadFile) -> None:
    """Reject files whose Content-Type is not in the allowed list."""
    if file.content_type not in ALLOWED_TYPES:
        raise HTTPException(status_code=415, detail=INVALID_FILE_TYPE)


# ── 2. File size ───────────────────────────────────────────────────────────────

def validate_file_size(data: bytes) -> None:
    """Reject payloads larger than MAX_FILE_SIZE bytes."""
    if len(data) > MAX_FILE_SIZE:
        raise HTTPException(status_code=413, detail=FILE_TOO_LARGE)


# ── 3. Image dimensions ────────────────────────────────────────────────────────

def validate_image_dimensions(image: Image.Image) -> None:
    """Reject images wider or taller than the configured maximum."""
    width, height = image.size
    if width > MAX_WIDTH or height > MAX_HEIGHT:
        raise HTTPException(
            status_code=400,
            detail=f"{INVALID_DIMENSIONS} (received {width}Γ—{height})",
        )


# ── 4. Image integrity ─────────────────────────────────────────────────────────

def validate_image_integrity(data: bytes) -> Image.Image:
    """
    Attempt to fully decode the image data.
    Returns the PIL Image on success so the caller doesn't re-open it.
    Raises 400 if the file is corrupted or unrecognised.
    """
    try:
        image = Image.open(BytesIO(data))
        image.verify()                    # detects truncated / corrupted files
        # Re-open after verify() β€” Pillow closes the internal stream.
        image = Image.open(BytesIO(data)).convert("RGBA")
        return image
    except (UnidentifiedImageError, Exception):
        raise HTTPException(status_code=400, detail=INVALID_IMAGE)


# ── Convenience: run the full chain ───────────────────────────────────────────

async def run_all_validations(file: UploadFile) -> tuple[bytes, Image.Image]:
    """
    Execute every validation step and return (raw_bytes, pil_image).
    Raises HTTPException at the first failure.
    """
    validate_file_type(file)

    data: bytes = await file.read()

    validate_file_size(data)
    image = validate_image_integrity(data)
    validate_image_dimensions(image)

    return data, image