""" Document Loading & Parsing ==================================================== Supports: PDF, DOCX, TXT, MD, PPTX, XLSX """ import os from typing import List, Dict, Optional from dataclasses import dataclass from pathlib import Path import mimetypes # Document parsers from pypdf import PdfReader from docx import Document as DocxDocument from pptx import Presentation import openpyxl import markdown from bs4 import BeautifulSoup @dataclass class LoadedDocument: """Container for loaded document with metadata""" content: str filename: str file_path: str file_type: str file_size: int num_pages: Optional[int] = None metadata: Dict = None def __post_init__(self): if self.metadata is None: self.metadata = {} class DocumentLoader: """Universal document loader supporting multiple formats""" SUPPORTED_EXTENSIONS = { '.pdf': 'application/pdf', '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', '.doc': 'application/msword', '.txt': 'text/plain', '.md': 'text/markdown', '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation', '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', } def __init__(self, upload_dir: str = "./data/uploads"): """ Initialize document loader Args: upload_dir: Directory where uploaded documents are stored """ self.upload_dir = Path(upload_dir) self.upload_dir.mkdir(parents=True, exist_ok=True) def load(self, file_path: str) -> LoadedDocument: """ Load a document from file path Args: file_path: Path to the document Returns: LoadedDocument object Raises: ValueError: If file format is not supported FileNotFoundError: If file doesn't exist """ file_path = Path(file_path) if not file_path.exists(): raise FileNotFoundError(f"File not found: {file_path}") extension = file_path.suffix.lower() if extension not in self.SUPPORTED_EXTENSIONS: raise ValueError( f"Unsupported file format: {extension}. " f"Supported: {list(self.SUPPORTED_EXTENSIONS.keys())}" ) # Get file info file_size = file_path.stat().st_size filename = file_path.name # Load based on file type if extension == '.pdf': content, num_pages = self._load_pdf(file_path) elif extension in ['.docx', '.doc']: content, num_pages = self._load_docx(file_path) elif extension == '.txt': content = self._load_txt(file_path) num_pages = None elif extension == '.md': content = self._load_markdown(file_path) num_pages = None elif extension == '.pptx': content, num_pages = self._load_pptx(file_path) elif extension == '.xlsx': content, num_pages = self._load_xlsx(file_path) else: raise ValueError(f"Unsupported extension: {extension}") return LoadedDocument( content=content, filename=filename, file_path=str(file_path), file_type=extension, file_size=file_size, num_pages=num_pages, metadata={ 'extension': extension, 'size_bytes': file_size, 'size_kb': round(file_size / 1024, 2), } ) def _load_pdf(self, file_path: Path) -> tuple[str, int]: """Load PDF file""" reader = PdfReader(str(file_path)) num_pages = len(reader.pages) text_parts = [] for page_num, page in enumerate(reader.pages, 1): text = page.extract_text() if text.strip(): text_parts.append(f"[Page {page_num}]\n{text}") return "\n\n".join(text_parts), num_pages def _load_docx(self, file_path: Path) -> tuple[str, int]: """Load DOCX file""" doc = DocxDocument(str(file_path)) paragraphs = [] for para in doc.paragraphs: if para.text.strip(): paragraphs.append(para.text) # Rough page estimate (500 words per page) word_count = sum(len(p.split()) for p in paragraphs) estimated_pages = max(1, word_count // 500) return "\n\n".join(paragraphs), estimated_pages def _load_txt(self, file_path: Path) -> str: """Load TXT file""" with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: return f.read() def _load_markdown(self, file_path: Path) -> str: """Load Markdown file and convert to plain text""" with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: md_content = f.read() # Convert markdown to HTML then to plain text html = markdown.markdown(md_content) soup = BeautifulSoup(html, 'html.parser') return soup.get_text() def _load_pptx(self, file_path: Path) -> tuple[str, int]: """Load PowerPoint file""" prs = Presentation(str(file_path)) num_slides = len(prs.slides) slides_text = [] for slide_num, slide in enumerate(prs.slides, 1): slide_text = [f"[Slide {slide_num}]"] for shape in slide.shapes: if hasattr(shape, "text") and shape.text.strip(): slide_text.append(shape.text) if len(slide_text) > 1: # Has content beyond title slides_text.append("\n".join(slide_text)) return "\n\n".join(slides_text), num_slides def _load_xlsx(self, file_path: Path) -> tuple[str, int]: """Load Excel file""" workbook = openpyxl.load_workbook(str(file_path), data_only=True) num_sheets = len(workbook.sheetnames) sheets_text = [] for sheet_name in workbook.sheetnames: sheet = workbook[sheet_name] sheet_text = [f"[Sheet: {sheet_name}]"] for row in sheet.iter_rows(values_only=True): row_text = " | ".join(str(cell) if cell is not None else "" for cell in row) if row_text.strip(): sheet_text.append(row_text) if len(sheet_text) > 1: sheets_text.append("\n".join(sheet_text)) return "\n\n".join(sheets_text), num_sheets def load_multiple(self, file_paths: List[str]) -> List[LoadedDocument]: """ Load multiple documents Args: file_paths: List of file paths Returns: List of LoadedDocument objects """ documents = [] for file_path in file_paths: try: doc = self.load(file_path) documents.append(doc) except Exception as e: print(f"āš ļø Failed to load {file_path}: {e}") return documents def get_stats(self, doc: LoadedDocument) -> Dict: """Get statistics about a document""" return { 'filename': doc.filename, 'type': doc.file_type, 'size_kb': doc.metadata.get('size_kb', 0), 'num_pages': doc.num_pages or 'N/A', 'char_count': len(doc.content), 'word_count': len(doc.content.split()), 'line_count': len(doc.content.split('\n')), } # ============================================================================ # USAGE EXAMPLE # ============================================================================ if __name__ == "__main__": loader = DocumentLoader() print("šŸ“„ Document Loader Test") print("=" * 80) # Create a test document test_file = Path("./data/uploads/test_document.txt") test_file.parent.mkdir(parents=True, exist_ok=True) with open(test_file, 'w') as f: f.write("""# RAG Pipeline Test Document This is a test document for the RAG Pipeline Optimizer. ## Key Features - Multi-model support - Cost optimization - Parallel evaluation This document will be chunked and embedded for retrieval testing. """) # Load the document doc = loader.load(test_file) print(f"āœ… Loaded: {doc.filename}") print(f" Type: {doc.file_type}") print(f" Size: {doc.file_size} bytes") print(f" Content length: {len(doc.content)} chars") print(f"\nšŸ“Š Stats:") stats = loader.get_stats(doc) for key, value in stats.items(): print(f" {key}: {value}") print(f"\nšŸ“ Content preview:") print("-" * 80) print(doc.content[:200] + "..." if len(doc.content) > 200 else doc.content)