"""Document service — file upload, storage, and preview orchestration.""" from __future__ import annotations import io import logging import os import uuid from pdf2image import convert_from_bytes, pdfinfo_from_bytes from domain.models import Document from infra.settings import settings from persistence import analysis_repo, document_repo logger = logging.getLogger(__name__) UPLOAD_DIR = settings.upload_dir MAX_FILE_SIZE = 5 * 1024 * 1024 # 5 MB MAX_PAGE_COUNT = settings.max_page_count # 0 = unlimited # PDF magic bytes: %PDF _PDF_MAGIC = b"%PDF" _UPLOAD_CHUNK_SIZE = 64 * 1024 # 64 KB chunks for streaming writes async def upload(filename: str, content_type: str, file_content: bytes) -> Document: """Save uploaded file to disk and persist metadata. Writes the file in fixed-size chunks to keep peak memory usage low. """ if len(file_content) > MAX_FILE_SIZE: raise ValueError("File too large (max 5 MB)") if not file_content[:4].startswith(_PDF_MAGIC): raise ValueError("Invalid file: not a PDF document") os.makedirs(UPLOAD_DIR, exist_ok=True) ext = ".pdf" # Content already validated as PDF safe_name = f"{uuid.uuid4()}{ext}" file_path = os.path.join(UPLOAD_DIR, safe_name) # Write in chunks to avoid doubling memory usage for large files with open(file_path, "wb") as f: for offset in range(0, len(file_content), _UPLOAD_CHUNK_SIZE): f.write(file_content[offset : offset + _UPLOAD_CHUNK_SIZE]) # Count PDF pages page_count = _count_pages(file_content) if MAX_PAGE_COUNT > 0 and page_count is not None and page_count > MAX_PAGE_COUNT: os.unlink(file_path) raise ValueError(f"Too many pages ({page_count}). Maximum allowed: {MAX_PAGE_COUNT}") doc = Document( filename=filename, content_type=content_type, file_size=len(file_content), page_count=page_count, storage_path=os.path.abspath(file_path), ) await document_repo.insert(doc) return doc async def find_all() -> list[Document]: """Return all documents, newest first.""" return await document_repo.find_all() async def find_by_id(doc_id: str) -> Document | None: """Find a document by its ID, or return None.""" return await document_repo.find_by_id(doc_id) async def delete(doc_id: str) -> bool: """Delete document file, associated analyses, and database record.""" doc = await document_repo.find_by_id(doc_id) if not doc: return False # Delete associated analyses first (cascade) await analysis_repo.delete_by_document(doc_id) # Delete file from disk (only if inside UPLOAD_DIR) try: real_path = os.path.realpath(doc.storage_path) real_upload_dir = os.path.realpath(UPLOAD_DIR) if real_path.startswith(real_upload_dir + os.sep) and os.path.exists(real_path): os.unlink(real_path) elif os.path.exists(doc.storage_path): logger.warning("Refused to delete file outside upload dir: %s", doc.storage_path) except FileNotFoundError: logger.info("File already removed: %s", doc.storage_path) except PermissionError: logger.error("Permission denied deleting file: %s", doc.storage_path) except OSError: logger.warning("Could not delete file: %s", doc.storage_path, exc_info=True) return await document_repo.delete(doc_id) def generate_preview(file_content: bytes, page: int = 1, dpi: int = 150) -> bytes: """Generate a PNG preview of a specific PDF page.""" images = convert_from_bytes(file_content, first_page=page, last_page=page, dpi=dpi) if not images: raise ValueError(f"Page {page} not found") buf = io.BytesIO() images[0].save(buf, format="PNG") return buf.getvalue() def _count_pages(file_content: bytes) -> int | None: """Count PDF pages using poppler via pdf2image.""" try: info = pdfinfo_from_bytes(file_content) return info.get("Pages") except (FileNotFoundError, OSError) as exc: logger.warning("Could not count pages: %s", exc) return None except Exception: logger.warning("Unexpected error counting pages", exc_info=True) return None