champ-chatbot / helpers /file_helper.py
qyle's picture
deployment
8b9e569 verified
import asyncio
from dataclasses import dataclass
import os
import zipfile
import cv2
from fastapi import UploadFile
import fitz # PyMuPDF
import io
import magic
import numpy as np
import re
from docx import Document
from lxml.etree import XMLSyntaxError
import PIL
from PIL import Image
from classes.ocr_reader import OCRReader
from constants import (
FILE_CHUNK_SIZE,
MAX_FILE_NAME_LENGTH,
MAX_FILE_SIZE,
SUPPORTED_FILE_EXTENSIONS,
SUPPORTED_FILE_TYPES,
TEXT_EXTRACTION_TIMEOUT,
)
from exceptions import FileExtractionError, FileExtractionException, FileValidationError
from exceptions import FileValidationException
def clean_text(raw_text: str):
# 1. Strip whitespace from the beginning and end of every line
# We keep the resulting empty strings to preserve the "gap" locations
lines = [line.strip() for line in raw_text.splitlines()]
# 2. Join them back together with a single newline
# This turns empty lines into sequences of \n
text = "\n".join(lines)
# 3. Merge 3+ newlines into 2, and 2 newlines into 2
# This specifically looks for 2 or more newlines and replaces them with \n\n
# Hello\n\n\nWorld (3) -> Hello\n\nWorld
# Hello\n\nWorld (2) -> Hello\n\nWorld
# Hello\nWorld (1) -> Not matched, stays Hello\nWorld
text = re.sub(r"\n{2,}", "\n\n", text)
# 4. Final pass: replace any remaining double-spaces with single ones
text = re.sub(r" {2,}", " ", text)
return text.strip()
async def extract_text_from_pdf(binary_content: bytes):
# Load the binary data into a stream
stream = io.BytesIO(binary_content)
# Open the PDF from the stream
doc = fitz.open(stream=stream, filetype="pdf")
full_text = ""
for page in doc:
full_text += page.get_text()
if len(full_text.strip()) == 0:
raise FileExtractionException(FileExtractionError.NO_TEXT)
doc.close()
return clean_text(full_text)
async def extract_text_from_txt(binary_content: bytes):
full_text = binary_content.decode("utf-8")
return clean_text(full_text)
def safe_unzip_check(file_bytes: bytes) -> bool:
try:
with zipfile.ZipFile(io.BytesIO(file_bytes)) as zf:
total = 0
for entry in zf.infolist():
with zf.open(entry) as f:
while True:
chunk = f.read(FILE_CHUNK_SIZE)
if not chunk:
break
total += len(chunk)
if total > MAX_FILE_SIZE:
raise FileExtractionException(
FileExtractionError.FILE_TOO_LARGE
)
return True
except zipfile.BadZipFile:
raise FileExtractionException(FileExtractionError.UNSAFE_ZIP)
def extract_text_from_docx(binary_content: bytes):
if not safe_unzip_check(binary_content):
return None
# Load the binary data into a stream
stream = io.BytesIO(binary_content)
# Load the docx document
try:
doc = Document(stream)
except XMLSyntaxError:
raise FileExtractionException(FileExtractionError.UNSAFE_ZIP)
# Extract text from all paragraphs
paragraphs = []
for para in doc.paragraphs:
paragraphs.append(para.text)
full_text = "\n".join(paragraphs)
return clean_text(full_text)
def sanitize_image(binary_content: bytes):
with Image.open(io.BytesIO(binary_content)) as img:
img = img.convert("RGB")
output = io.BytesIO()
img.save(output, format="PNG")
return output.getvalue()
def extract_text_from_img(binary_content: bytes) -> str | None:
# 1. Convert bytes to a numpy array
nparr = np.frombuffer(binary_content, np.uint8)
# 2. Decode the array into an image (OpenCV format)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
# 3. Pass the image variable directly
return OCRReader().read_text(img)
def replace_spaces_in_filename(filename: str) -> str:
# 1. Supprimer les espaces au début et à la fin
filename = filename.strip()
# 2. Remplacer un ou plusieurs espaces consécutifs par un seul underscore
# \s+ détecte " ", " ", " ", etc.
filename = re.sub(r"\s+", "_", filename)
return filename
WINDOWS_RESERVED_NAMES = re.compile(
r"^(CON|PRN|AUX|NUL|COM[1-9¹²³]|LPT[1-9¹²³])(\.|$)", re.IGNORECASE
)
def is_reserved_windows_name(filename: str) -> bool:
return bool(WINDOWS_RESERVED_NAMES.match(filename))
def is_valid_filename(filename: str) -> bool:
if not filename or len(filename) > 255:
return False
pattern = r"^[a-zA-Z0-9_()\-]+(\.[a-zA-Z0-9_()\-]+)?$"
if not re.match(pattern, filename):
return False
if is_reserved_windows_name(filename):
return False
return True
@dataclass
class ValidatedFile:
content: bytes
filename: str
mime_type: str
async def validate_file(file: UploadFile) -> ValidatedFile:
# Preliminary checks
file_size = file.size
if file_size is None:
raise FileValidationException(FileValidationError.MISSING_SIZE)
if file_size > MAX_FILE_SIZE:
raise FileValidationException(FileValidationError.FILE_TOO_LARGE)
# Check filename and extension
file_name = file.filename
if file_name is None:
raise FileValidationException(FileValidationError.MISSING_FILE_NAME)
if len(file_name) > MAX_FILE_NAME_LENGTH:
raise FileValidationException(FileValidationError.FILE_NAME_TOO_LARGE)
file_name = replace_spaces_in_filename(file_name)
if not is_valid_filename(file_name):
raise FileValidationException(FileValidationError.INVALID_FILE_NAME)
_, extension = os.path.splitext(file_name)
if extension not in SUPPORTED_FILE_EXTENSIONS:
raise FileValidationException(FileValidationError.UNSUPPORTED_EXTENSION)
# Check mime type from headers
file_mime = file.headers.get("content-type")
if file_mime is None or file_mime not in SUPPORTED_FILE_TYPES:
raise FileValidationException(FileValidationError.INVALID_MIME_TYPE)
# Read in chunks to avoid RAM spikes
file_content = b""
actual_size = 0
while True:
chunk = await file.read(FILE_CHUNK_SIZE)
if not chunk:
break
actual_size += len(chunk)
if actual_size > MAX_FILE_SIZE:
raise FileValidationException(FileValidationError.FILE_TOO_LARGE)
file_content += chunk
if actual_size == 0:
raise FileValidationException(FileValidationError.EMPTY_FILE)
# Verify mime type from actual file content
file_mime = magic.from_buffer(file_content[:2048], mime=True)
if file_mime not in SUPPORTED_FILE_TYPES:
raise FileValidationException(FileValidationError.INVALID_MIME_TYPE)
return ValidatedFile(
content=file_content,
filename=file_name,
mime_type=file_mime,
)
async def extract_text_from_file(file_content: bytes, file_mime: str) -> str:
file_text = None
try:
if file_mime == "application/pdf":
file_text = await asyncio.wait_for(
extract_text_from_pdf(file_content), timeout=TEXT_EXTRACTION_TIMEOUT
)
elif file_mime == "text/plain":
file_text = await asyncio.wait_for(
extract_text_from_txt(file_content), timeout=TEXT_EXTRACTION_TIMEOUT
)
elif (
file_mime
== "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
):
loop = asyncio.get_event_loop()
file_text = await asyncio.wait_for(
loop.run_in_executor(
None,
extract_text_from_docx,
file_content,
),
timeout=TEXT_EXTRACTION_TIMEOUT,
)
elif file_mime in ["image/jpeg", "image/png"]:
loop = asyncio.get_event_loop()
sanitized_file_content = await asyncio.wait_for(
loop.run_in_executor(
None,
sanitize_image,
file_content,
),
timeout=TEXT_EXTRACTION_TIMEOUT,
)
file_text = await asyncio.wait_for(
loop.run_in_executor(
None,
extract_text_from_img,
sanitized_file_content,
),
timeout=TEXT_EXTRACTION_TIMEOUT,
)
else:
raise FileExtractionException(FileExtractionError.INVALID_MIME_TYPE)
except asyncio.TimeoutError:
raise FileExtractionException(FileExtractionError.TEXT_EXTRACTION_TIMEOUT)
except Image.DecompressionBombError:
# TODO: Log the decompression bomb DOS attack
raise FileExtractionException(FileExtractionError.FILE_TOO_LARGE)
except (PIL.UnidentifiedImageError, OSError):
raise FileExtractionException(FileExtractionError.MALFORMED_FILE)
if file_text is None:
raise FileExtractionException(FileExtractionError.NO_TEXT)
return file_text