champ-chatbot / tests /test_upload_file.py
qyle's picture
deployment
f80f41e verified
import io
import os
from pathlib import Path
from fastapi import UploadFile
from fastapi.datastructures import Headers
from PIL import Image
import pytest
from constants import MAX_FILE_NAME_LENGTH, MAX_FILE_SIZE
from exceptions import (
FileExtractionError,
FileExtractionException,
FileValidationError,
FileValidationException,
)
from helpers.file_helper import (
ValidatedFile,
clean_text,
extract_text_from_file,
sanitize_image,
validate_file,
)
from tests.file_factory import (
create_empty_txt,
create_fake_large_jpeg,
create_fake_large_png,
create_fake_small_png,
create_jpeg_cpu_scan_bomb,
create_jpeg_pixel_bomb,
create_jpeg_with_excessive_markers,
create_malformed_jpeg,
create_malformed_pdf,
create_malformed_png,
create_png_decompression_bomb,
create_simple_txt,
create_simple_pdf,
create_simple_docx,
create_simple_png,
create_simple_jpeg,
create_deeply_nested_docx,
create_deeply_nested_docx_bomb,
create_zip_bomb_docx,
create_xxe_docx,
)
DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
INVALID_FILE_NAMES = [
# Path traversal attacks
"../etc/passwd",
"../../secret.txt",
# Null bytes
"file\x00.txt",
# Special characters
"file;rm -rf.txt",
"file|cmd.txt",
"file&cmd.txt",
"file>redirect.txt",
"<script>.txt",
# Hidden files
".hidden.txt",
# Just an extension
".txt",
# Empty name
"",
# Only spaces
" ",
# Double extensions
"malware.exe.txt",
"script.php.pdf",
# Windows reserved names
"CON.txt",
"PRN.txt",
"AUX.txt",
"NUL.txt",
"COM1.txt",
"COM2.txt",
"COM3.txt",
"COM4.txt",
"LPT1.txt",
"LPT2.txt",
"LPT3.txt",
"LPT4.txt",
"CON",
"NUL",
"AUX",
]
CLEAN_TEXT_SCENARIOS = [
("Hello\n\n\nWorld", "Hello\n\nWorld"),
("Hello\n\nWorld", "Hello\n\nWorld"),
("Hello\nWorld", "Hello\nWorld"),
("Line 1\n \n\nLine 2", "Line 1\n\nLine 2"),
("Hello World", "Hello World"),
("\n Hello World \n", "Hello World"),
("", ""),
(" ", ""),
]
@pytest.fixture
def large_txt_file(tmp_path: Path):
f = tmp_path / "large.txt"
f.write_text("a" * (MAX_FILE_SIZE + 1))
return f
@pytest.fixture
def exe_disguised_as_txt(tmp_path: Path):
f = tmp_path / "innocent.txt"
# MZ is the magic header for Windows executables
f.write_bytes(b"MZ" + b"\x00" * 100)
return f
@pytest.fixture
def empty_txt(tmp_path):
"""Create an empty text file."""
filepath = tmp_path / "empty.txt"
create_empty_txt(str(filepath))
return filepath
@pytest.fixture
def simple_txt(tmp_path):
"""Create a simple text file."""
filepath = tmp_path / "simple.txt"
create_simple_txt(str(filepath))
return filepath
@pytest.fixture
def simple_pdf(tmp_path):
"""Create a simple PDF file."""
filepath = tmp_path / "simple.pdf"
create_simple_pdf(str(filepath))
return filepath
@pytest.fixture
def simple_docx(tmp_path):
"""Create a simple DOCX file."""
filepath = tmp_path / "simple.docx"
create_simple_docx(str(filepath))
return filepath
@pytest.fixture
def simple_png(tmp_path):
"""Create a simple PNG file."""
filepath = tmp_path / "simple.png"
create_simple_png(str(filepath))
return filepath
@pytest.fixture
def simple_jpeg(tmp_path):
"""Create a simple JPEG file."""
filepath = tmp_path / "simple.jpeg"
create_simple_jpeg(str(filepath))
return filepath
@pytest.fixture
def deeply_nested_docx(tmp_path):
"""Create a deeply nested DOCX that's slow to parse."""
filepath = tmp_path / "deeply_nested.docx"
create_deeply_nested_docx(str(filepath))
return filepath
@pytest.fixture
def deeply_nested_docx_bomb(tmp_path):
"""Create a 100MB+ deeply nested DOCX."""
filepath = tmp_path / "deeply_nested_bomb.docx"
create_deeply_nested_docx_bomb(str(filepath))
return filepath
@pytest.fixture
def zip_bomb_docx(tmp_path):
"""Create a zip bomb DOCX."""
filepath = tmp_path / "zip_bomb.docx"
create_zip_bomb_docx(str(filepath))
return filepath
@pytest.fixture
def xxe_docx(tmp_path):
"""Create an XXE attack DOCX."""
filepath = tmp_path / "xxe.docx"
create_xxe_docx(str(filepath))
return filepath
# PDF fixture
@pytest.fixture
def malformed_pdf(tmp_path):
"""Create malformed pdf"""
filepath = tmp_path / "malformed.pdf"
create_malformed_pdf(str(filepath))
return filepath
# PNG fixtures
@pytest.fixture
def fake_large_png(tmp_path):
"""Create PNG decompression bomb."""
filepath = tmp_path / "fake_large.png"
create_fake_large_png(str(filepath))
return filepath
@pytest.fixture
def fake_small_png(tmp_path):
filepath = tmp_path / "fake_small.png"
create_fake_small_png(str(filepath))
return filepath
@pytest.fixture
def png_decompression_bomb(tmp_path):
"""Create PNG decompression bomb."""
filepath = tmp_path / "decompression_bomb.png"
create_png_decompression_bomb(str(filepath))
return filepath
@pytest.fixture
def malformed_png(tmp_path):
"""Create malformed PNG."""
filepath = tmp_path / "malformed.png"
create_malformed_png(str(filepath))
return filepath
# JPEG fixtures
@pytest.fixture
def fake_large_jpeg(tmp_path):
"""Create JPEG decompression bomb."""
filepath = tmp_path / "fake_large.jpeg"
create_fake_large_jpeg(str(filepath))
return filepath
@pytest.fixture
def malformed_jpeg(tmp_path):
"""Create malformed JPEG."""
filepath = tmp_path / "malformed.jpeg"
create_malformed_jpeg(str(filepath))
return filepath
@pytest.fixture
def jpeg_with_excessive_markers(tmp_path):
"""Create JPEG with excessive markers."""
filepath = tmp_path / "excessive_markers.jpeg"
create_jpeg_with_excessive_markers(str(filepath))
return filepath
@pytest.fixture
def jpeg_pixel_bomb(tmp_path):
filepath = tmp_path / "pixel_bomb.jpeg"
create_jpeg_pixel_bomb(str(filepath))
return filepath
@pytest.fixture
def jpeg_cpu_scan_bomb(tmp_path):
filepath = tmp_path / "cpu_scan_bomb.jpeg"
create_jpeg_cpu_scan_bomb(str(filepath))
return filepath
class TestUploadFileGeneric:
async def test_missing_size(self, simple_txt):
with open(simple_txt, "rb") as f:
file = UploadFile(f)
with pytest.raises(FileValidationException) as exc_info:
await validate_file(file)
assert exc_info.value.error == FileValidationError.MISSING_SIZE
async def test_courtesy_size_too_large(self, simple_txt):
with open(simple_txt, "rb") as f:
file = UploadFile(f, size=MAX_FILE_SIZE + 1)
with pytest.raises(FileValidationException) as exc_info:
await validate_file(file)
assert exc_info.value.error == FileValidationError.FILE_TOO_LARGE
async def test_missing_name(self, simple_txt):
with open(simple_txt, "rb") as f:
file = UploadFile(f, size=1)
with pytest.raises(FileValidationException) as exc_info:
await validate_file(file)
assert exc_info.value.error == FileValidationError.MISSING_FILE_NAME
async def test_name_too_large(self, simple_txt):
with open(simple_txt, "rb") as f:
file = UploadFile(f, size=1, filename="a" * (MAX_FILE_NAME_LENGTH + 1))
with pytest.raises(FileValidationException) as exc_info:
await validate_file(file)
assert exc_info.value.error == FileValidationError.FILE_NAME_TOO_LARGE
@pytest.mark.parametrize("filename", INVALID_FILE_NAMES)
async def test_invalid_file_names(self, filename, simple_txt):
with open(simple_txt, "rb") as f:
file = UploadFile(f, size=1, filename=filename)
with pytest.raises(FileValidationException) as exc_info:
await validate_file(file)
assert exc_info.value.error == FileValidationError.INVALID_FILE_NAME
async def test_unsupported_extension(self, simple_txt):
with open(simple_txt, "rb") as f:
file = UploadFile(f, size=1, filename="my_file.zip")
with pytest.raises(FileValidationException) as exc_info:
await validate_file(file)
assert exc_info.value.error == FileValidationError.UNSUPPORTED_EXTENSION
async def test_missing_mime(self, simple_txt):
with open(simple_txt, "rb") as f:
file = UploadFile(f, size=1, filename="simple.txt")
with pytest.raises(FileValidationException) as exc_info:
await validate_file(file)
assert exc_info.value.error == FileValidationError.INVALID_MIME_TYPE
async def test_unsupported_mime_type(self, simple_txt):
mock_headers = Headers(headers={"content-type": "image/gif"})
with open(simple_txt, "rb") as f:
file = UploadFile(f, size=1, filename="simple.txt", headers=mock_headers)
with pytest.raises(FileValidationException) as exc_info:
await validate_file(file)
assert exc_info.value.error == FileValidationError.INVALID_MIME_TYPE
async def test_file_too_large(self, large_txt_file):
mock_headers = Headers(headers={"content-type": "text/plain"})
with open(large_txt_file, "rb") as f:
file = UploadFile(f, size=1, filename="simple.txt", headers=mock_headers)
with pytest.raises(FileValidationException) as exc_info:
await validate_file(file)
assert exc_info.value.error == FileValidationError.FILE_TOO_LARGE
async def test_invalid_mime_type(self, exe_disguised_as_txt):
mock_headers = Headers(headers={"content-type": "text/plain"})
with open(exe_disguised_as_txt, "rb") as f:
file = UploadFile(f, size=1, filename="simple.txt", headers=mock_headers)
with pytest.raises(FileValidationException) as exc_info:
await validate_file(file)
assert exc_info.value.error == FileValidationError.INVALID_MIME_TYPE
async def test_empty_file(self, empty_txt):
mock_headers = Headers(headers={"content-type": "text/plain"})
with open(empty_txt, "rb") as f:
file = UploadFile(f, size=1, filename="simple.txt", headers=mock_headers)
with pytest.raises(FileValidationException) as exc_info:
await validate_file(file)
assert exc_info.value.error == FileValidationError.EMPTY_FILE
class TestValidFileUpload:
async def test_txt_file(self, simple_txt):
with open(simple_txt, "rb") as f:
content = f.read()
size = len(content)
expected_validated_file = ValidatedFile(
content=content,
filename="simple.txt",
size=size,
mime_type="text/plain",
)
mock_headers = Headers(headers={"content-type": "text/plain"})
with open(simple_txt, "rb") as f:
file = UploadFile(f, size=1, filename="simple.txt", headers=mock_headers)
validated_file = await validate_file(file)
assert validated_file == expected_validated_file
async def test_pdf_file(self, simple_pdf):
with open(simple_pdf, "rb") as f:
content = f.read()
size = len(content)
expected_validated_file = ValidatedFile(
content=content,
filename="simple.pdf",
size=size,
mime_type="application/pdf",
)
mock_headers = Headers(headers={"content-type": "text/plain"})
with open(simple_pdf, "rb") as f:
file = UploadFile(f, size=1, filename="simple.pdf", headers=mock_headers)
validated_file = await validate_file(file)
assert validated_file == expected_validated_file
async def test_docx_file(self, simple_docx):
with open(simple_docx, "rb") as f:
content = f.read()
size = len(content)
expected_validated_file = ValidatedFile(
content=content,
filename="simple.docx",
size=size,
mime_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
)
mock_headers = Headers(headers={"content-type": "text/plain"})
with open(simple_docx, "rb") as f:
file = UploadFile(f, size=1, filename="simple.docx", headers=mock_headers)
validated_file = await validate_file(file)
assert validated_file == expected_validated_file
async def test_png_file(self, simple_png):
with open(simple_png, "rb") as f:
content = f.read()
size = len(content)
expected_validated_file = ValidatedFile(
content=content,
filename="simple.png",
size=size,
mime_type="image/png",
)
mock_headers = Headers(headers={"content-type": "text/plain"})
with open(simple_png, "rb") as f:
file = UploadFile(f, size=1, filename="simple.png", headers=mock_headers)
validated_file = await validate_file(file)
assert validated_file == expected_validated_file
async def test_jpeg_file(self, simple_jpeg):
with open(simple_jpeg, "rb") as f:
content = f.read()
size = len(content)
expected_validated_file = ValidatedFile(
content=content,
filename="simple.jpeg",
size=size,
mime_type="image/jpeg",
)
mock_headers = Headers(headers={"content-type": "text/plain"})
with open(simple_jpeg, "rb") as f:
file = UploadFile(f, size=1, filename="simple.jpeg", headers=mock_headers)
validated_file = await validate_file(file)
assert validated_file == expected_validated_file
class TestDocxSecurity:
@pytest.mark.resource_intensive
async def test_deeply_nested_docx(self, deeply_nested_docx):
"""Parser should handle deeply nested XML without hanging or crashing."""
with open(deeply_nested_docx, "rb") as f:
file = UploadFile(
f,
size=os.path.getsize(deeply_nested_docx),
filename="deeply_nested.docx",
headers=Headers({"content-type": "text/plain"}),
)
validated_file = await validate_file(file)
with pytest.raises(FileExtractionException) as exc_info:
await extract_text_from_file(
validated_file.content,
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
)
assert exc_info.value.error == FileExtractionError.TEXT_EXTRACTION_TIMEOUT
@pytest.mark.resource_intensive
async def test_deeply_nested_docx_bomb(self, deeply_nested_docx_bomb):
"""Parser should handle deeply nested XML without hanging or crashing."""
with open(deeply_nested_docx_bomb, "rb") as f:
file = UploadFile(
f,
size=os.path.getsize(deeply_nested_docx_bomb),
filename="deeply_nested_bomb.docx",
headers=Headers({"content-type": "text/plain"}),
)
validated_file = await validate_file(file)
with pytest.raises(FileExtractionException) as exc_info:
await extract_text_from_file(
validated_file.content,
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
)
assert exc_info.value.error == FileExtractionError.UNSAFE_ZIP
@pytest.mark.resource_intensive
async def test_zip_bomb_as_docx(self, zip_bomb_docx):
"""Zip bomb should be caught by validate_file."""
with open(zip_bomb_docx, "rb") as f:
file = UploadFile(
f,
size=os.path.getsize(zip_bomb_docx),
filename="zip_bomb.docx",
headers=Headers({"content-type": "text/plain"}),
)
validated_file = await validate_file(file)
with pytest.raises(FileExtractionException) as exc_info:
await extract_text_from_file(
validated_file.content,
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
)
assert exc_info.value.error == FileExtractionError.UNSAFE_ZIP
async def test_xxe_docx_does_not_leak_system_files(self, xxe_docx):
"""XXE attack should not result in local file content being extracted."""
with open(xxe_docx, "rb") as f:
file = UploadFile(
f,
size=os.path.getsize(xxe_docx),
filename="xxe.docx",
headers=Headers({"content-type": "text/plain"}),
)
validated_file = await validate_file(file)
file_text = await extract_text_from_file(
validated_file.content,
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
)
# If XXE worked, the extracted text would contain contents of /etc/passwd
assert "root:" not in file_text
assert "/bin/bash" not in file_text
class TestPDFSecurity:
async def test_malformed_pdf(self, malformed_pdf):
"""Malformed PDFs should be handled gracefully."""
with open(malformed_pdf, "rb") as f:
file = UploadFile(
f,
size=os.path.getsize(malformed_pdf),
filename="zip_bomb.docx",
headers=Headers({"content-type": "text/plain"}),
)
validated_file = await validate_file(file)
with pytest.raises(FileExtractionException) as exc_info:
await extract_text_from_file(
validated_file.content, validated_file.mime_type
)
# There is no easy way to identify if a PDF file is malformed.
# MuPDF runs in C and writes in stderr instead of raising an
# exception when it fails to parse a document. In that case,
# the document will simply be empty and contain no text.
assert exc_info.value.error == FileExtractionError.NO_TEXT
class TestPNGSecurity:
"""Security tests for PNG file extraction."""
async def test_fake_large_png(self, fake_large_png):
"""Zip bomb should be caught by validate_file."""
with open(fake_large_png, "rb") as f:
file = UploadFile(
f,
size=os.path.getsize(fake_large_png),
filename="zip_bomb.docx",
headers=Headers({"content-type": "text/plain"}),
)
validated_file = await validate_file(file)
with pytest.raises(FileExtractionException) as exc_info:
await extract_text_from_file(
validated_file.content,
validated_file.mime_type,
)
assert exc_info.value.error == FileExtractionError.FILE_TOO_LARGE
@pytest.mark.resource_intensive
async def test_fake_small_png(self, fake_small_png):
with open(fake_small_png, "rb") as f:
content = f.read()
with pytest.raises(FileExtractionException) as exc_info:
await extract_text_from_file(content, "image/png")
assert exc_info.value.error == FileExtractionError.MALFORMED_FILE
@pytest.mark.resource_intensive
async def test_png_decompression_bomb(self, png_decompression_bomb):
"""PNG decompression bombs should be caught."""
with open(png_decompression_bomb, "rb") as f:
content = f.read()
with pytest.raises(FileExtractionException) as exc_info:
await extract_text_from_file(content, "image/png")
assert exc_info.value.error == FileExtractionError.FILE_TOO_LARGE
async def test_malformed_png_handled(self, malformed_png):
"""Malformed PNGs should be handled gracefully."""
with open(malformed_png, "rb") as f:
file = UploadFile(
f,
size=os.path.getsize(malformed_png),
filename="zip_bomb.docx",
headers=Headers({"content-type": "text/plain"}),
)
validated_file = await validate_file(file)
with pytest.raises(FileExtractionException) as exc_info:
await extract_text_from_file(
validated_file.content, validated_file.mime_type
)
assert exc_info.value.error == FileExtractionError.MALFORMED_FILE
class TestJPEGSecurity:
"""Security tests for JPEG file extraction."""
async def test_fake_large_jpeg(self, fake_large_jpeg):
"""Malformed JPEGs should be handled gracefully."""
with open(fake_large_jpeg, "rb") as f:
content = f.read()
with pytest.raises(FileExtractionException) as exc_info:
await extract_text_from_file(content, "image/jpeg")
assert exc_info.value.error == FileExtractionError.FILE_TOO_LARGE
async def test_malformed_jpeg_handled(self, malformed_jpeg):
"""Malformed PNGs should be handled gracefully."""
with open(malformed_jpeg, "rb") as f:
file = UploadFile(
f,
size=os.path.getsize(malformed_jpeg),
filename="zip_bomb.docx",
headers=Headers({"content-type": "text/plain"}),
)
validated_file = await validate_file(file)
with pytest.raises(FileExtractionException) as exc_info:
await extract_text_from_file(
validated_file.content, validated_file.mime_type
)
assert exc_info.value.error == FileExtractionError.MALFORMED_FILE
async def test_jpeg_excessive_markers_timeout(self, jpeg_with_excessive_markers):
"""JPEGs with excessive markers should not timeout."""
with open(jpeg_with_excessive_markers, "rb") as f:
content = f.read()
file_text = await extract_text_from_file(content, "image/jpeg")
# A jpeg file of 9MB with excessive markers is not big enough to cause
# timeouts or memory overflows.
assert file_text is not None
async def test_jpeg_pixel_bomb(self, jpeg_pixel_bomb):
"""Malformed PNGs should be handled gracefully."""
with open(jpeg_pixel_bomb, "rb") as f:
file = UploadFile(
f,
size=os.path.getsize(jpeg_pixel_bomb),
filename="zip_bomb.docx",
headers=Headers({"content-type": "text/plain"}),
)
validated_file = await validate_file(file)
with pytest.raises(FileExtractionException) as exc_info:
await extract_text_from_file(
validated_file.content, validated_file.mime_type
)
assert exc_info.value.error == FileExtractionError.FILE_TOO_LARGE
async def test_jpeg_cpu_scan_bomb(self, jpeg_cpu_scan_bomb):
"""Malformed PNGs should be handled gracefully."""
with open(jpeg_cpu_scan_bomb, "rb") as f:
file = UploadFile(
f,
size=os.path.getsize(jpeg_cpu_scan_bomb),
filename="zip_bomb.docx",
headers=Headers({"content-type": "text/plain"}),
)
validated_file = await validate_file(file)
file_text = await extract_text_from_file(
validated_file.content, validated_file.mime_type
)
# A jpeg file of 9MB with many scans is not big enough to cause
# timeouts or memory overflows.
assert file_text is not None
class TestSanitizeImage:
def test_sanitize_image_removes_metadata(self):
# 1. Create a dummy image with EXIF metadata
original = Image.new("RGB", (100, 100), color="red")
exif_data = original.getexif()
exif_data[0x010E] = "Secret Malware Instruction or GPS"
buf = io.BytesIO()
original.save(buf, format="JPEG", exif=exif_data)
dirty_content = buf.getvalue()
# 2. Run Sanity Check
sanitized_content = sanitize_image(dirty_content)
# 3. Verify Results
sanitized_img = Image.open(io.BytesIO(sanitized_content))
# Assert metadata is empty
assert len(sanitized_img.getexif()) == 0
# Assert format changed to PNG (as per your function)
assert sanitized_img.format == "PNG"
# Assert dimensions are the same
assert sanitized_img.size == (100, 100)
class TestCleanTest:
@pytest.mark.parametrize("input_text, expected_output", CLEAN_TEXT_SCENARIOS)
def test_clean_text_logic(self, input_text, expected_output):
assert clean_text(input_text) == expected_output