Spaces:
Paused
Paused
| import asyncio | |
| from dataclasses import dataclass | |
| import os | |
| import zipfile | |
| import cv2 | |
| from fastapi import UploadFile | |
| import fitz # PyMuPDF | |
| import io | |
| import magic | |
| import numpy as np | |
| import re | |
| from docx import Document | |
| from lxml.etree import XMLSyntaxError | |
| import PIL | |
| from PIL import Image | |
| from classes.ocr_reader import OCRReader | |
| from constants import ( | |
| FILE_CHUNK_SIZE, | |
| MAX_FILE_NAME_LENGTH, | |
| MAX_FILE_SIZE, | |
| SUPPORTED_FILE_EXTENSIONS, | |
| SUPPORTED_FILE_TYPES, | |
| TEXT_EXTRACTION_TIMEOUT, | |
| ) | |
| from exceptions import FileExtractionError, FileExtractionException, FileValidationError | |
| from exceptions import FileValidationException | |
| def clean_text(raw_text: str): | |
| # 1. Strip whitespace from the beginning and end of every line | |
| # We keep the resulting empty strings to preserve the "gap" locations | |
| lines = [line.strip() for line in raw_text.splitlines()] | |
| # 2. Join them back together with a single newline | |
| # This turns empty lines into sequences of \n | |
| text = "\n".join(lines) | |
| # 3. Merge 3+ newlines into 2, and 2 newlines into 2 | |
| # This specifically looks for 2 or more newlines and replaces them with \n\n | |
| # Hello\n\n\nWorld (3) -> Hello\n\nWorld | |
| # Hello\n\nWorld (2) -> Hello\n\nWorld | |
| # Hello\nWorld (1) -> Not matched, stays Hello\nWorld | |
| text = re.sub(r"\n{2,}", "\n\n", text) | |
| # 4. Final pass: replace any remaining double-spaces with single ones | |
| text = re.sub(r" {2,}", " ", text) | |
| return text.strip() | |
| async def extract_text_from_pdf(binary_content: bytes): | |
| # Load the binary data into a stream | |
| stream = io.BytesIO(binary_content) | |
| # Open the PDF from the stream | |
| doc = fitz.open(stream=stream, filetype="pdf") | |
| full_text = "" | |
| for page in doc: | |
| full_text += page.get_text() | |
| if len(full_text.strip()) == 0: | |
| raise FileExtractionException(FileExtractionError.NO_TEXT) | |
| doc.close() | |
| return clean_text(full_text) | |
| async def extract_text_from_txt(binary_content: bytes): | |
| full_text = binary_content.decode("utf-8") | |
| return clean_text(full_text) | |
| def safe_unzip_check(file_bytes: bytes) -> bool: | |
| try: | |
| with zipfile.ZipFile(io.BytesIO(file_bytes)) as zf: | |
| total = 0 | |
| for entry in zf.infolist(): | |
| with zf.open(entry) as f: | |
| while True: | |
| chunk = f.read(FILE_CHUNK_SIZE) | |
| if not chunk: | |
| break | |
| total += len(chunk) | |
| if total > MAX_FILE_SIZE: | |
| raise FileExtractionException( | |
| FileExtractionError.FILE_TOO_LARGE | |
| ) | |
| return True | |
| except zipfile.BadZipFile: | |
| raise FileExtractionException(FileExtractionError.UNSAFE_ZIP) | |
| def extract_text_from_docx(binary_content: bytes): | |
| if not safe_unzip_check(binary_content): | |
| return None | |
| # Load the binary data into a stream | |
| stream = io.BytesIO(binary_content) | |
| # Load the docx document | |
| try: | |
| doc = Document(stream) | |
| except XMLSyntaxError: | |
| raise FileExtractionException(FileExtractionError.UNSAFE_ZIP) | |
| # Extract text from all paragraphs | |
| paragraphs = [] | |
| for para in doc.paragraphs: | |
| paragraphs.append(para.text) | |
| full_text = "\n".join(paragraphs) | |
| return clean_text(full_text) | |
| def sanitize_image(binary_content: bytes): | |
| with Image.open(io.BytesIO(binary_content)) as img: | |
| img = img.convert("RGB") | |
| output = io.BytesIO() | |
| img.save(output, format="PNG") | |
| return output.getvalue() | |
| def extract_text_from_img(binary_content: bytes) -> str | None: | |
| # 1. Convert bytes to a numpy array | |
| nparr = np.frombuffer(binary_content, np.uint8) | |
| # 2. Decode the array into an image (OpenCV format) | |
| img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| # 3. Pass the image variable directly | |
| return OCRReader().read_text(img) | |
| def replace_spaces_in_filename(filename: str) -> str: | |
| # 1. Supprimer les espaces au début et à la fin | |
| filename = filename.strip() | |
| # 2. Remplacer un ou plusieurs espaces consécutifs par un seul underscore | |
| # \s+ détecte " ", " ", " ", etc. | |
| filename = re.sub(r"\s+", "_", filename) | |
| return filename | |
| WINDOWS_RESERVED_NAMES = re.compile( | |
| r"^(CON|PRN|AUX|NUL|COM[1-9¹²³]|LPT[1-9¹²³])(\.|$)", re.IGNORECASE | |
| ) | |
| def is_reserved_windows_name(filename: str) -> bool: | |
| return bool(WINDOWS_RESERVED_NAMES.match(filename)) | |
| def is_valid_filename(filename: str) -> bool: | |
| if not filename or len(filename) > 255: | |
| return False | |
| pattern = r"^[a-zA-Z0-9_()\-]+(\.[a-zA-Z0-9_()\-]+)?$" | |
| if not re.match(pattern, filename): | |
| return False | |
| if is_reserved_windows_name(filename): | |
| return False | |
| return True | |
| class ValidatedFile: | |
| content: bytes | |
| filename: str | |
| mime_type: str | |
| async def validate_file(file: UploadFile) -> ValidatedFile: | |
| # Preliminary checks | |
| file_size = file.size | |
| if file_size is None: | |
| raise FileValidationException(FileValidationError.MISSING_SIZE) | |
| if file_size > MAX_FILE_SIZE: | |
| raise FileValidationException(FileValidationError.FILE_TOO_LARGE) | |
| # Check filename and extension | |
| file_name = file.filename | |
| if file_name is None: | |
| raise FileValidationException(FileValidationError.MISSING_FILE_NAME) | |
| if len(file_name) > MAX_FILE_NAME_LENGTH: | |
| raise FileValidationException(FileValidationError.FILE_NAME_TOO_LARGE) | |
| file_name = replace_spaces_in_filename(file_name) | |
| if not is_valid_filename(file_name): | |
| raise FileValidationException(FileValidationError.INVALID_FILE_NAME) | |
| _, extension = os.path.splitext(file_name) | |
| if extension not in SUPPORTED_FILE_EXTENSIONS: | |
| raise FileValidationException(FileValidationError.UNSUPPORTED_EXTENSION) | |
| # Check mime type from headers | |
| file_mime = file.headers.get("content-type") | |
| if file_mime is None or file_mime not in SUPPORTED_FILE_TYPES: | |
| raise FileValidationException(FileValidationError.INVALID_MIME_TYPE) | |
| # Read in chunks to avoid RAM spikes | |
| file_content = b"" | |
| actual_size = 0 | |
| while True: | |
| chunk = await file.read(FILE_CHUNK_SIZE) | |
| if not chunk: | |
| break | |
| actual_size += len(chunk) | |
| if actual_size > MAX_FILE_SIZE: | |
| raise FileValidationException(FileValidationError.FILE_TOO_LARGE) | |
| file_content += chunk | |
| if actual_size == 0: | |
| raise FileValidationException(FileValidationError.EMPTY_FILE) | |
| # Verify mime type from actual file content | |
| file_mime = magic.from_buffer(file_content[:2048], mime=True) | |
| if file_mime not in SUPPORTED_FILE_TYPES: | |
| raise FileValidationException(FileValidationError.INVALID_MIME_TYPE) | |
| return ValidatedFile( | |
| content=file_content, | |
| filename=file_name, | |
| mime_type=file_mime, | |
| ) | |
| async def extract_text_from_file(file_content: bytes, file_mime: str) -> str: | |
| file_text = None | |
| try: | |
| if file_mime == "application/pdf": | |
| file_text = await asyncio.wait_for( | |
| extract_text_from_pdf(file_content), timeout=TEXT_EXTRACTION_TIMEOUT | |
| ) | |
| elif file_mime == "text/plain": | |
| file_text = await asyncio.wait_for( | |
| extract_text_from_txt(file_content), timeout=TEXT_EXTRACTION_TIMEOUT | |
| ) | |
| elif ( | |
| file_mime | |
| == "application/vnd.openxmlformats-officedocument.wordprocessingml.document" | |
| ): | |
| loop = asyncio.get_event_loop() | |
| file_text = await asyncio.wait_for( | |
| loop.run_in_executor( | |
| None, | |
| extract_text_from_docx, | |
| file_content, | |
| ), | |
| timeout=TEXT_EXTRACTION_TIMEOUT, | |
| ) | |
| elif file_mime in ["image/jpeg", "image/png"]: | |
| loop = asyncio.get_event_loop() | |
| sanitized_file_content = await asyncio.wait_for( | |
| loop.run_in_executor( | |
| None, | |
| sanitize_image, | |
| file_content, | |
| ), | |
| timeout=TEXT_EXTRACTION_TIMEOUT, | |
| ) | |
| file_text = await asyncio.wait_for( | |
| loop.run_in_executor( | |
| None, | |
| extract_text_from_img, | |
| sanitized_file_content, | |
| ), | |
| timeout=TEXT_EXTRACTION_TIMEOUT, | |
| ) | |
| else: | |
| raise FileExtractionException(FileExtractionError.INVALID_MIME_TYPE) | |
| except asyncio.TimeoutError: | |
| raise FileExtractionException(FileExtractionError.TEXT_EXTRACTION_TIMEOUT) | |
| except Image.DecompressionBombError: | |
| # TODO: Log the decompression bomb DOS attack | |
| raise FileExtractionException(FileExtractionError.FILE_TOO_LARGE) | |
| except (PIL.UnidentifiedImageError, OSError): | |
| raise FileExtractionException(FileExtractionError.MALFORMED_FILE) | |
| if file_text is None: | |
| raise FileExtractionException(FileExtractionError.NO_TEXT) | |
| return file_text | |