Spaces:
Sleeping
Sleeping
| """ | |
| Document Loading & Parsing | |
| ==================================================== | |
| Supports: PDF, DOCX, TXT, MD, PPTX, XLSX | |
| """ | |
| import os | |
| from typing import List, Dict, Optional | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| import mimetypes | |
| # Document parsers | |
| from pypdf import PdfReader | |
| from docx import Document as DocxDocument | |
| from pptx import Presentation | |
| import openpyxl | |
| import markdown | |
| from bs4 import BeautifulSoup | |
| class LoadedDocument: | |
| """Container for loaded document with metadata""" | |
| content: str | |
| filename: str | |
| file_path: str | |
| file_type: str | |
| file_size: int | |
| num_pages: Optional[int] = None | |
| metadata: Dict = None | |
| def __post_init__(self): | |
| if self.metadata is None: | |
| self.metadata = {} | |
| class DocumentLoader: | |
| """Universal document loader supporting multiple formats""" | |
| SUPPORTED_EXTENSIONS = { | |
| '.pdf': 'application/pdf', | |
| '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', | |
| '.doc': 'application/msword', | |
| '.txt': 'text/plain', | |
| '.md': 'text/markdown', | |
| '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation', | |
| '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', | |
| } | |
| def __init__(self, upload_dir: str = "./data/uploads"): | |
| """ | |
| Initialize document loader | |
| Args: | |
| upload_dir: Directory where uploaded documents are stored | |
| """ | |
| self.upload_dir = Path(upload_dir) | |
| self.upload_dir.mkdir(parents=True, exist_ok=True) | |
| def load(self, file_path: str) -> LoadedDocument: | |
| """ | |
| Load a document from file path | |
| Args: | |
| file_path: Path to the document | |
| Returns: | |
| LoadedDocument object | |
| Raises: | |
| ValueError: If file format is not supported | |
| FileNotFoundError: If file doesn't exist | |
| """ | |
| file_path = Path(file_path) | |
| if not file_path.exists(): | |
| raise FileNotFoundError(f"File not found: {file_path}") | |
| extension = file_path.suffix.lower() | |
| if extension not in self.SUPPORTED_EXTENSIONS: | |
| raise ValueError( | |
| f"Unsupported file format: {extension}. " | |
| f"Supported: {list(self.SUPPORTED_EXTENSIONS.keys())}" | |
| ) | |
| # Get file info | |
| file_size = file_path.stat().st_size | |
| filename = file_path.name | |
| # Load based on file type | |
| if extension == '.pdf': | |
| content, num_pages = self._load_pdf(file_path) | |
| elif extension in ['.docx', '.doc']: | |
| content, num_pages = self._load_docx(file_path) | |
| elif extension == '.txt': | |
| content = self._load_txt(file_path) | |
| num_pages = None | |
| elif extension == '.md': | |
| content = self._load_markdown(file_path) | |
| num_pages = None | |
| elif extension == '.pptx': | |
| content, num_pages = self._load_pptx(file_path) | |
| elif extension == '.xlsx': | |
| content, num_pages = self._load_xlsx(file_path) | |
| else: | |
| raise ValueError(f"Unsupported extension: {extension}") | |
| return LoadedDocument( | |
| content=content, | |
| filename=filename, | |
| file_path=str(file_path), | |
| file_type=extension, | |
| file_size=file_size, | |
| num_pages=num_pages, | |
| metadata={ | |
| 'extension': extension, | |
| 'size_bytes': file_size, | |
| 'size_kb': round(file_size / 1024, 2), | |
| } | |
| ) | |
| def _load_pdf(self, file_path: Path) -> tuple[str, int]: | |
| """Load PDF file""" | |
| reader = PdfReader(str(file_path)) | |
| num_pages = len(reader.pages) | |
| text_parts = [] | |
| for page_num, page in enumerate(reader.pages, 1): | |
| text = page.extract_text() | |
| if text.strip(): | |
| text_parts.append(f"[Page {page_num}]\n{text}") | |
| return "\n\n".join(text_parts), num_pages | |
| def _load_docx(self, file_path: Path) -> tuple[str, int]: | |
| """Load DOCX file""" | |
| doc = DocxDocument(str(file_path)) | |
| paragraphs = [] | |
| for para in doc.paragraphs: | |
| if para.text.strip(): | |
| paragraphs.append(para.text) | |
| # Rough page estimate (500 words per page) | |
| word_count = sum(len(p.split()) for p in paragraphs) | |
| estimated_pages = max(1, word_count // 500) | |
| return "\n\n".join(paragraphs), estimated_pages | |
| def _load_txt(self, file_path: Path) -> str: | |
| """Load TXT file""" | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| return f.read() | |
| def _load_markdown(self, file_path: Path) -> str: | |
| """Load Markdown file and convert to plain text""" | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| md_content = f.read() | |
| # Convert markdown to HTML then to plain text | |
| html = markdown.markdown(md_content) | |
| soup = BeautifulSoup(html, 'html.parser') | |
| return soup.get_text() | |
| def _load_pptx(self, file_path: Path) -> tuple[str, int]: | |
| """Load PowerPoint file""" | |
| prs = Presentation(str(file_path)) | |
| num_slides = len(prs.slides) | |
| slides_text = [] | |
| for slide_num, slide in enumerate(prs.slides, 1): | |
| slide_text = [f"[Slide {slide_num}]"] | |
| for shape in slide.shapes: | |
| if hasattr(shape, "text") and shape.text.strip(): | |
| slide_text.append(shape.text) | |
| if len(slide_text) > 1: # Has content beyond title | |
| slides_text.append("\n".join(slide_text)) | |
| return "\n\n".join(slides_text), num_slides | |
| def _load_xlsx(self, file_path: Path) -> tuple[str, int]: | |
| """Load Excel file""" | |
| workbook = openpyxl.load_workbook(str(file_path), data_only=True) | |
| num_sheets = len(workbook.sheetnames) | |
| sheets_text = [] | |
| for sheet_name in workbook.sheetnames: | |
| sheet = workbook[sheet_name] | |
| sheet_text = [f"[Sheet: {sheet_name}]"] | |
| for row in sheet.iter_rows(values_only=True): | |
| row_text = " | ".join(str(cell) if cell is not None else "" for cell in row) | |
| if row_text.strip(): | |
| sheet_text.append(row_text) | |
| if len(sheet_text) > 1: | |
| sheets_text.append("\n".join(sheet_text)) | |
| return "\n\n".join(sheets_text), num_sheets | |
| def load_multiple(self, file_paths: List[str]) -> List[LoadedDocument]: | |
| """ | |
| Load multiple documents | |
| Args: | |
| file_paths: List of file paths | |
| Returns: | |
| List of LoadedDocument objects | |
| """ | |
| documents = [] | |
| for file_path in file_paths: | |
| try: | |
| doc = self.load(file_path) | |
| documents.append(doc) | |
| except Exception as e: | |
| print(f"โ ๏ธ Failed to load {file_path}: {e}") | |
| return documents | |
| def get_stats(self, doc: LoadedDocument) -> Dict: | |
| """Get statistics about a document""" | |
| return { | |
| 'filename': doc.filename, | |
| 'type': doc.file_type, | |
| 'size_kb': doc.metadata.get('size_kb', 0), | |
| 'num_pages': doc.num_pages or 'N/A', | |
| 'char_count': len(doc.content), | |
| 'word_count': len(doc.content.split()), | |
| 'line_count': len(doc.content.split('\n')), | |
| } | |
| # ============================================================================ | |
| # USAGE EXAMPLE | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| loader = DocumentLoader() | |
| print("๐ Document Loader Test") | |
| print("=" * 80) | |
| # Create a test document | |
| test_file = Path("./data/uploads/test_document.txt") | |
| test_file.parent.mkdir(parents=True, exist_ok=True) | |
| with open(test_file, 'w') as f: | |
| f.write("""# RAG Pipeline Test Document | |
| This is a test document for the RAG Pipeline Optimizer. | |
| ## Key Features | |
| - Multi-model support | |
| - Cost optimization | |
| - Parallel evaluation | |
| This document will be chunked and embedded for retrieval testing. | |
| """) | |
| # Load the document | |
| doc = loader.load(test_file) | |
| print(f"โ Loaded: {doc.filename}") | |
| print(f" Type: {doc.file_type}") | |
| print(f" Size: {doc.file_size} bytes") | |
| print(f" Content length: {len(doc.content)} chars") | |
| print(f"\n๐ Stats:") | |
| stats = loader.get_stats(doc) | |
| for key, value in stats.items(): | |
| print(f" {key}: {value}") | |
| print(f"\n๐ Content preview:") | |
| print("-" * 80) | |
| print(doc.content[:200] + "..." if len(doc.content) > 200 else doc.content) | |