Spaces:
Paused
Paused
| import io | |
| import os | |
| from pathlib import Path | |
| from fastapi import UploadFile | |
| from fastapi.datastructures import Headers | |
| from PIL import Image | |
| import pytest | |
| from constants import MAX_FILE_NAME_LENGTH, MAX_FILE_SIZE | |
| from exceptions import ( | |
| FileExtractionError, | |
| FileExtractionException, | |
| FileValidationError, | |
| FileValidationException, | |
| ) | |
| from helpers.file_helper import ( | |
| ValidatedFile, | |
| clean_text, | |
| extract_text_from_file, | |
| sanitize_image, | |
| validate_file, | |
| ) | |
| from tests.file_factory import ( | |
| create_empty_txt, | |
| create_fake_large_jpeg, | |
| create_fake_large_png, | |
| create_fake_small_png, | |
| create_jpeg_cpu_scan_bomb, | |
| create_jpeg_pixel_bomb, | |
| create_jpeg_with_excessive_markers, | |
| create_malformed_jpeg, | |
| create_malformed_pdf, | |
| create_malformed_png, | |
| create_png_decompression_bomb, | |
| create_simple_txt, | |
| create_simple_pdf, | |
| create_simple_docx, | |
| create_simple_png, | |
| create_simple_jpeg, | |
| create_deeply_nested_docx, | |
| create_deeply_nested_docx_bomb, | |
| create_zip_bomb_docx, | |
| create_xxe_docx, | |
| ) | |
| DATA_DIR = os.path.join(os.path.dirname(__file__), "data") | |
| INVALID_FILE_NAMES = [ | |
| # Path traversal attacks | |
| "../etc/passwd", | |
| "../../secret.txt", | |
| # Null bytes | |
| "file\x00.txt", | |
| # Special characters | |
| "file;rm -rf.txt", | |
| "file|cmd.txt", | |
| "file&cmd.txt", | |
| "file>redirect.txt", | |
| "<script>.txt", | |
| # Hidden files | |
| ".hidden.txt", | |
| # Just an extension | |
| ".txt", | |
| # Empty name | |
| "", | |
| # Only spaces | |
| " ", | |
| # Double extensions | |
| "malware.exe.txt", | |
| "script.php.pdf", | |
| # Windows reserved names | |
| "CON.txt", | |
| "PRN.txt", | |
| "AUX.txt", | |
| "NUL.txt", | |
| "COM1.txt", | |
| "COM2.txt", | |
| "COM3.txt", | |
| "COM4.txt", | |
| "LPT1.txt", | |
| "LPT2.txt", | |
| "LPT3.txt", | |
| "LPT4.txt", | |
| "CON", | |
| "NUL", | |
| "AUX", | |
| ] | |
| CLEAN_TEXT_SCENARIOS = [ | |
| ("Hello\n\n\nWorld", "Hello\n\nWorld"), | |
| ("Hello\n\nWorld", "Hello\n\nWorld"), | |
| ("Hello\nWorld", "Hello\nWorld"), | |
| ("Line 1\n \n\nLine 2", "Line 1\n\nLine 2"), | |
| ("Hello World", "Hello World"), | |
| ("\n Hello World \n", "Hello World"), | |
| ("", ""), | |
| (" ", ""), | |
| ] | |
| def large_txt_file(tmp_path: Path): | |
| f = tmp_path / "large.txt" | |
| f.write_text("a" * (MAX_FILE_SIZE + 1)) | |
| return f | |
| def exe_disguised_as_txt(tmp_path: Path): | |
| f = tmp_path / "innocent.txt" | |
| # MZ is the magic header for Windows executables | |
| f.write_bytes(b"MZ" + b"\x00" * 100) | |
| return f | |
| def empty_txt(tmp_path): | |
| """Create an empty text file.""" | |
| filepath = tmp_path / "empty.txt" | |
| create_empty_txt(str(filepath)) | |
| return filepath | |
| def simple_txt(tmp_path): | |
| """Create a simple text file.""" | |
| filepath = tmp_path / "simple.txt" | |
| create_simple_txt(str(filepath)) | |
| return filepath | |
| def simple_pdf(tmp_path): | |
| """Create a simple PDF file.""" | |
| filepath = tmp_path / "simple.pdf" | |
| create_simple_pdf(str(filepath)) | |
| return filepath | |
| def simple_docx(tmp_path): | |
| """Create a simple DOCX file.""" | |
| filepath = tmp_path / "simple.docx" | |
| create_simple_docx(str(filepath)) | |
| return filepath | |
| def simple_png(tmp_path): | |
| """Create a simple PNG file.""" | |
| filepath = tmp_path / "simple.png" | |
| create_simple_png(str(filepath)) | |
| return filepath | |
| def simple_jpeg(tmp_path): | |
| """Create a simple JPEG file.""" | |
| filepath = tmp_path / "simple.jpeg" | |
| create_simple_jpeg(str(filepath)) | |
| return filepath | |
| def deeply_nested_docx(tmp_path): | |
| """Create a deeply nested DOCX that's slow to parse.""" | |
| filepath = tmp_path / "deeply_nested.docx" | |
| create_deeply_nested_docx(str(filepath)) | |
| return filepath | |
| def deeply_nested_docx_bomb(tmp_path): | |
| """Create a 100MB+ deeply nested DOCX.""" | |
| filepath = tmp_path / "deeply_nested_bomb.docx" | |
| create_deeply_nested_docx_bomb(str(filepath)) | |
| return filepath | |
| def zip_bomb_docx(tmp_path): | |
| """Create a zip bomb DOCX.""" | |
| filepath = tmp_path / "zip_bomb.docx" | |
| create_zip_bomb_docx(str(filepath)) | |
| return filepath | |
| def xxe_docx(tmp_path): | |
| """Create an XXE attack DOCX.""" | |
| filepath = tmp_path / "xxe.docx" | |
| create_xxe_docx(str(filepath)) | |
| return filepath | |
| # PDF fixture | |
| def malformed_pdf(tmp_path): | |
| """Create malformed pdf""" | |
| filepath = tmp_path / "malformed.pdf" | |
| create_malformed_pdf(str(filepath)) | |
| return filepath | |
| # PNG fixtures | |
| def fake_large_png(tmp_path): | |
| """Create PNG decompression bomb.""" | |
| filepath = tmp_path / "fake_large.png" | |
| create_fake_large_png(str(filepath)) | |
| return filepath | |
| def fake_small_png(tmp_path): | |
| filepath = tmp_path / "fake_small.png" | |
| create_fake_small_png(str(filepath)) | |
| return filepath | |
| def png_decompression_bomb(tmp_path): | |
| """Create PNG decompression bomb.""" | |
| filepath = tmp_path / "decompression_bomb.png" | |
| create_png_decompression_bomb(str(filepath)) | |
| return filepath | |
| def malformed_png(tmp_path): | |
| """Create malformed PNG.""" | |
| filepath = tmp_path / "malformed.png" | |
| create_malformed_png(str(filepath)) | |
| return filepath | |
| # JPEG fixtures | |
| def fake_large_jpeg(tmp_path): | |
| """Create JPEG decompression bomb.""" | |
| filepath = tmp_path / "fake_large.jpeg" | |
| create_fake_large_jpeg(str(filepath)) | |
| return filepath | |
| def malformed_jpeg(tmp_path): | |
| """Create malformed JPEG.""" | |
| filepath = tmp_path / "malformed.jpeg" | |
| create_malformed_jpeg(str(filepath)) | |
| return filepath | |
| def jpeg_with_excessive_markers(tmp_path): | |
| """Create JPEG with excessive markers.""" | |
| filepath = tmp_path / "excessive_markers.jpeg" | |
| create_jpeg_with_excessive_markers(str(filepath)) | |
| return filepath | |
| def jpeg_pixel_bomb(tmp_path): | |
| filepath = tmp_path / "pixel_bomb.jpeg" | |
| create_jpeg_pixel_bomb(str(filepath)) | |
| return filepath | |
| def jpeg_cpu_scan_bomb(tmp_path): | |
| filepath = tmp_path / "cpu_scan_bomb.jpeg" | |
| create_jpeg_cpu_scan_bomb(str(filepath)) | |
| return filepath | |
| class TestUploadFileGeneric: | |
| async def test_missing_size(self, simple_txt): | |
| with open(simple_txt, "rb") as f: | |
| file = UploadFile(f) | |
| with pytest.raises(FileValidationException) as exc_info: | |
| await validate_file(file) | |
| assert exc_info.value.error == FileValidationError.MISSING_SIZE | |
| async def test_courtesy_size_too_large(self, simple_txt): | |
| with open(simple_txt, "rb") as f: | |
| file = UploadFile(f, size=MAX_FILE_SIZE + 1) | |
| with pytest.raises(FileValidationException) as exc_info: | |
| await validate_file(file) | |
| assert exc_info.value.error == FileValidationError.FILE_TOO_LARGE | |
| async def test_missing_name(self, simple_txt): | |
| with open(simple_txt, "rb") as f: | |
| file = UploadFile(f, size=1) | |
| with pytest.raises(FileValidationException) as exc_info: | |
| await validate_file(file) | |
| assert exc_info.value.error == FileValidationError.MISSING_FILE_NAME | |
| async def test_name_too_large(self, simple_txt): | |
| with open(simple_txt, "rb") as f: | |
| file = UploadFile(f, size=1, filename="a" * (MAX_FILE_NAME_LENGTH + 1)) | |
| with pytest.raises(FileValidationException) as exc_info: | |
| await validate_file(file) | |
| assert exc_info.value.error == FileValidationError.FILE_NAME_TOO_LARGE | |
| async def test_invalid_file_names(self, filename, simple_txt): | |
| with open(simple_txt, "rb") as f: | |
| file = UploadFile(f, size=1, filename=filename) | |
| with pytest.raises(FileValidationException) as exc_info: | |
| await validate_file(file) | |
| assert exc_info.value.error == FileValidationError.INVALID_FILE_NAME | |
| async def test_unsupported_extension(self, simple_txt): | |
| with open(simple_txt, "rb") as f: | |
| file = UploadFile(f, size=1, filename="my_file.zip") | |
| with pytest.raises(FileValidationException) as exc_info: | |
| await validate_file(file) | |
| assert exc_info.value.error == FileValidationError.UNSUPPORTED_EXTENSION | |
| async def test_missing_mime(self, simple_txt): | |
| with open(simple_txt, "rb") as f: | |
| file = UploadFile(f, size=1, filename="simple.txt") | |
| with pytest.raises(FileValidationException) as exc_info: | |
| await validate_file(file) | |
| assert exc_info.value.error == FileValidationError.INVALID_MIME_TYPE | |
| async def test_unsupported_mime_type(self, simple_txt): | |
| mock_headers = Headers(headers={"content-type": "image/gif"}) | |
| with open(simple_txt, "rb") as f: | |
| file = UploadFile(f, size=1, filename="simple.txt", headers=mock_headers) | |
| with pytest.raises(FileValidationException) as exc_info: | |
| await validate_file(file) | |
| assert exc_info.value.error == FileValidationError.INVALID_MIME_TYPE | |
| async def test_file_too_large(self, large_txt_file): | |
| mock_headers = Headers(headers={"content-type": "text/plain"}) | |
| with open(large_txt_file, "rb") as f: | |
| file = UploadFile(f, size=1, filename="simple.txt", headers=mock_headers) | |
| with pytest.raises(FileValidationException) as exc_info: | |
| await validate_file(file) | |
| assert exc_info.value.error == FileValidationError.FILE_TOO_LARGE | |
| async def test_invalid_mime_type(self, exe_disguised_as_txt): | |
| mock_headers = Headers(headers={"content-type": "text/plain"}) | |
| with open(exe_disguised_as_txt, "rb") as f: | |
| file = UploadFile(f, size=1, filename="simple.txt", headers=mock_headers) | |
| with pytest.raises(FileValidationException) as exc_info: | |
| await validate_file(file) | |
| assert exc_info.value.error == FileValidationError.INVALID_MIME_TYPE | |
| async def test_empty_file(self, empty_txt): | |
| mock_headers = Headers(headers={"content-type": "text/plain"}) | |
| with open(empty_txt, "rb") as f: | |
| file = UploadFile(f, size=1, filename="simple.txt", headers=mock_headers) | |
| with pytest.raises(FileValidationException) as exc_info: | |
| await validate_file(file) | |
| assert exc_info.value.error == FileValidationError.EMPTY_FILE | |
| class TestValidFileUpload: | |
| async def test_txt_file(self, simple_txt): | |
| with open(simple_txt, "rb") as f: | |
| content = f.read() | |
| size = len(content) | |
| expected_validated_file = ValidatedFile( | |
| content=content, | |
| filename="simple.txt", | |
| size=size, | |
| mime_type="text/plain", | |
| ) | |
| mock_headers = Headers(headers={"content-type": "text/plain"}) | |
| with open(simple_txt, "rb") as f: | |
| file = UploadFile(f, size=1, filename="simple.txt", headers=mock_headers) | |
| validated_file = await validate_file(file) | |
| assert validated_file == expected_validated_file | |
| async def test_pdf_file(self, simple_pdf): | |
| with open(simple_pdf, "rb") as f: | |
| content = f.read() | |
| size = len(content) | |
| expected_validated_file = ValidatedFile( | |
| content=content, | |
| filename="simple.pdf", | |
| size=size, | |
| mime_type="application/pdf", | |
| ) | |
| mock_headers = Headers(headers={"content-type": "text/plain"}) | |
| with open(simple_pdf, "rb") as f: | |
| file = UploadFile(f, size=1, filename="simple.pdf", headers=mock_headers) | |
| validated_file = await validate_file(file) | |
| assert validated_file == expected_validated_file | |
| async def test_docx_file(self, simple_docx): | |
| with open(simple_docx, "rb") as f: | |
| content = f.read() | |
| size = len(content) | |
| expected_validated_file = ValidatedFile( | |
| content=content, | |
| filename="simple.docx", | |
| size=size, | |
| mime_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
| ) | |
| mock_headers = Headers(headers={"content-type": "text/plain"}) | |
| with open(simple_docx, "rb") as f: | |
| file = UploadFile(f, size=1, filename="simple.docx", headers=mock_headers) | |
| validated_file = await validate_file(file) | |
| assert validated_file == expected_validated_file | |
| async def test_png_file(self, simple_png): | |
| with open(simple_png, "rb") as f: | |
| content = f.read() | |
| size = len(content) | |
| expected_validated_file = ValidatedFile( | |
| content=content, | |
| filename="simple.png", | |
| size=size, | |
| mime_type="image/png", | |
| ) | |
| mock_headers = Headers(headers={"content-type": "text/plain"}) | |
| with open(simple_png, "rb") as f: | |
| file = UploadFile(f, size=1, filename="simple.png", headers=mock_headers) | |
| validated_file = await validate_file(file) | |
| assert validated_file == expected_validated_file | |
| async def test_jpeg_file(self, simple_jpeg): | |
| with open(simple_jpeg, "rb") as f: | |
| content = f.read() | |
| size = len(content) | |
| expected_validated_file = ValidatedFile( | |
| content=content, | |
| filename="simple.jpeg", | |
| size=size, | |
| mime_type="image/jpeg", | |
| ) | |
| mock_headers = Headers(headers={"content-type": "text/plain"}) | |
| with open(simple_jpeg, "rb") as f: | |
| file = UploadFile(f, size=1, filename="simple.jpeg", headers=mock_headers) | |
| validated_file = await validate_file(file) | |
| assert validated_file == expected_validated_file | |
| class TestDocxSecurity: | |
| async def test_deeply_nested_docx(self, deeply_nested_docx): | |
| """Parser should handle deeply nested XML without hanging or crashing.""" | |
| with open(deeply_nested_docx, "rb") as f: | |
| file = UploadFile( | |
| f, | |
| size=os.path.getsize(deeply_nested_docx), | |
| filename="deeply_nested.docx", | |
| headers=Headers({"content-type": "text/plain"}), | |
| ) | |
| validated_file = await validate_file(file) | |
| with pytest.raises(FileExtractionException) as exc_info: | |
| await extract_text_from_file( | |
| validated_file.content, | |
| "application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
| ) | |
| assert exc_info.value.error == FileExtractionError.TEXT_EXTRACTION_TIMEOUT | |
| async def test_deeply_nested_docx_bomb(self, deeply_nested_docx_bomb): | |
| """Parser should handle deeply nested XML without hanging or crashing.""" | |
| with open(deeply_nested_docx_bomb, "rb") as f: | |
| file = UploadFile( | |
| f, | |
| size=os.path.getsize(deeply_nested_docx_bomb), | |
| filename="deeply_nested_bomb.docx", | |
| headers=Headers({"content-type": "text/plain"}), | |
| ) | |
| validated_file = await validate_file(file) | |
| with pytest.raises(FileExtractionException) as exc_info: | |
| await extract_text_from_file( | |
| validated_file.content, | |
| "application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
| ) | |
| assert exc_info.value.error == FileExtractionError.UNSAFE_ZIP | |
| async def test_zip_bomb_as_docx(self, zip_bomb_docx): | |
| """Zip bomb should be caught by validate_file.""" | |
| with open(zip_bomb_docx, "rb") as f: | |
| file = UploadFile( | |
| f, | |
| size=os.path.getsize(zip_bomb_docx), | |
| filename="zip_bomb.docx", | |
| headers=Headers({"content-type": "text/plain"}), | |
| ) | |
| validated_file = await validate_file(file) | |
| with pytest.raises(FileExtractionException) as exc_info: | |
| await extract_text_from_file( | |
| validated_file.content, | |
| "application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
| ) | |
| assert exc_info.value.error == FileExtractionError.UNSAFE_ZIP | |
| async def test_xxe_docx_does_not_leak_system_files(self, xxe_docx): | |
| """XXE attack should not result in local file content being extracted.""" | |
| with open(xxe_docx, "rb") as f: | |
| file = UploadFile( | |
| f, | |
| size=os.path.getsize(xxe_docx), | |
| filename="xxe.docx", | |
| headers=Headers({"content-type": "text/plain"}), | |
| ) | |
| validated_file = await validate_file(file) | |
| file_text = await extract_text_from_file( | |
| validated_file.content, | |
| "application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
| ) | |
| # If XXE worked, the extracted text would contain contents of /etc/passwd | |
| assert "root:" not in file_text | |
| assert "/bin/bash" not in file_text | |
| class TestPDFSecurity: | |
| async def test_malformed_pdf(self, malformed_pdf): | |
| """Malformed PDFs should be handled gracefully.""" | |
| with open(malformed_pdf, "rb") as f: | |
| file = UploadFile( | |
| f, | |
| size=os.path.getsize(malformed_pdf), | |
| filename="zip_bomb.docx", | |
| headers=Headers({"content-type": "text/plain"}), | |
| ) | |
| validated_file = await validate_file(file) | |
| with pytest.raises(FileExtractionException) as exc_info: | |
| await extract_text_from_file( | |
| validated_file.content, validated_file.mime_type | |
| ) | |
| # There is no easy way to identify if a PDF file is malformed. | |
| # MuPDF runs in C and writes in stderr instead of raising an | |
| # exception when it fails to parse a document. In that case, | |
| # the document will simply be empty and contain no text. | |
| assert exc_info.value.error == FileExtractionError.NO_TEXT | |
| class TestPNGSecurity: | |
| """Security tests for PNG file extraction.""" | |
| async def test_fake_large_png(self, fake_large_png): | |
| """Zip bomb should be caught by validate_file.""" | |
| with open(fake_large_png, "rb") as f: | |
| file = UploadFile( | |
| f, | |
| size=os.path.getsize(fake_large_png), | |
| filename="zip_bomb.docx", | |
| headers=Headers({"content-type": "text/plain"}), | |
| ) | |
| validated_file = await validate_file(file) | |
| with pytest.raises(FileExtractionException) as exc_info: | |
| await extract_text_from_file( | |
| validated_file.content, | |
| validated_file.mime_type, | |
| ) | |
| assert exc_info.value.error == FileExtractionError.FILE_TOO_LARGE | |
| async def test_fake_small_png(self, fake_small_png): | |
| with open(fake_small_png, "rb") as f: | |
| content = f.read() | |
| with pytest.raises(FileExtractionException) as exc_info: | |
| await extract_text_from_file(content, "image/png") | |
| assert exc_info.value.error == FileExtractionError.MALFORMED_FILE | |
| async def test_png_decompression_bomb(self, png_decompression_bomb): | |
| """PNG decompression bombs should be caught.""" | |
| with open(png_decompression_bomb, "rb") as f: | |
| content = f.read() | |
| with pytest.raises(FileExtractionException) as exc_info: | |
| await extract_text_from_file(content, "image/png") | |
| assert exc_info.value.error == FileExtractionError.FILE_TOO_LARGE | |
| async def test_malformed_png_handled(self, malformed_png): | |
| """Malformed PNGs should be handled gracefully.""" | |
| with open(malformed_png, "rb") as f: | |
| file = UploadFile( | |
| f, | |
| size=os.path.getsize(malformed_png), | |
| filename="zip_bomb.docx", | |
| headers=Headers({"content-type": "text/plain"}), | |
| ) | |
| validated_file = await validate_file(file) | |
| with pytest.raises(FileExtractionException) as exc_info: | |
| await extract_text_from_file( | |
| validated_file.content, validated_file.mime_type | |
| ) | |
| assert exc_info.value.error == FileExtractionError.MALFORMED_FILE | |
| class TestJPEGSecurity: | |
| """Security tests for JPEG file extraction.""" | |
| async def test_fake_large_jpeg(self, fake_large_jpeg): | |
| """Malformed JPEGs should be handled gracefully.""" | |
| with open(fake_large_jpeg, "rb") as f: | |
| content = f.read() | |
| with pytest.raises(FileExtractionException) as exc_info: | |
| await extract_text_from_file(content, "image/jpeg") | |
| assert exc_info.value.error == FileExtractionError.FILE_TOO_LARGE | |
| async def test_malformed_jpeg_handled(self, malformed_jpeg): | |
| """Malformed PNGs should be handled gracefully.""" | |
| with open(malformed_jpeg, "rb") as f: | |
| file = UploadFile( | |
| f, | |
| size=os.path.getsize(malformed_jpeg), | |
| filename="zip_bomb.docx", | |
| headers=Headers({"content-type": "text/plain"}), | |
| ) | |
| validated_file = await validate_file(file) | |
| with pytest.raises(FileExtractionException) as exc_info: | |
| await extract_text_from_file( | |
| validated_file.content, validated_file.mime_type | |
| ) | |
| assert exc_info.value.error == FileExtractionError.MALFORMED_FILE | |
| async def test_jpeg_excessive_markers_timeout(self, jpeg_with_excessive_markers): | |
| """JPEGs with excessive markers should not timeout.""" | |
| with open(jpeg_with_excessive_markers, "rb") as f: | |
| content = f.read() | |
| file_text = await extract_text_from_file(content, "image/jpeg") | |
| # A jpeg file of 9MB with excessive markers is not big enough to cause | |
| # timeouts or memory overflows. | |
| assert file_text is not None | |
| async def test_jpeg_pixel_bomb(self, jpeg_pixel_bomb): | |
| """Malformed PNGs should be handled gracefully.""" | |
| with open(jpeg_pixel_bomb, "rb") as f: | |
| file = UploadFile( | |
| f, | |
| size=os.path.getsize(jpeg_pixel_bomb), | |
| filename="zip_bomb.docx", | |
| headers=Headers({"content-type": "text/plain"}), | |
| ) | |
| validated_file = await validate_file(file) | |
| with pytest.raises(FileExtractionException) as exc_info: | |
| await extract_text_from_file( | |
| validated_file.content, validated_file.mime_type | |
| ) | |
| assert exc_info.value.error == FileExtractionError.FILE_TOO_LARGE | |
| async def test_jpeg_cpu_scan_bomb(self, jpeg_cpu_scan_bomb): | |
| """Malformed PNGs should be handled gracefully.""" | |
| with open(jpeg_cpu_scan_bomb, "rb") as f: | |
| file = UploadFile( | |
| f, | |
| size=os.path.getsize(jpeg_cpu_scan_bomb), | |
| filename="zip_bomb.docx", | |
| headers=Headers({"content-type": "text/plain"}), | |
| ) | |
| validated_file = await validate_file(file) | |
| file_text = await extract_text_from_file( | |
| validated_file.content, validated_file.mime_type | |
| ) | |
| # A jpeg file of 9MB with many scans is not big enough to cause | |
| # timeouts or memory overflows. | |
| assert file_text is not None | |
| class TestSanitizeImage: | |
| def test_sanitize_image_removes_metadata(self): | |
| # 1. Create a dummy image with EXIF metadata | |
| original = Image.new("RGB", (100, 100), color="red") | |
| exif_data = original.getexif() | |
| exif_data[0x010E] = "Secret Malware Instruction or GPS" | |
| buf = io.BytesIO() | |
| original.save(buf, format="JPEG", exif=exif_data) | |
| dirty_content = buf.getvalue() | |
| # 2. Run Sanity Check | |
| sanitized_content = sanitize_image(dirty_content) | |
| # 3. Verify Results | |
| sanitized_img = Image.open(io.BytesIO(sanitized_content)) | |
| # Assert metadata is empty | |
| assert len(sanitized_img.getexif()) == 0 | |
| # Assert format changed to PNG (as per your function) | |
| assert sanitized_img.format == "PNG" | |
| # Assert dimensions are the same | |
| assert sanitized_img.size == (100, 100) | |
| class TestCleanTest: | |
| def test_clean_text_logic(self, input_text, expected_output): | |
| assert clean_text(input_text) == expected_output | |