RAG-Pipeline-Optimizer / core /document_loader.py
puji4ml's picture
Upload 30 files
2b22a59 verified
"""
Document Loading & Parsing
====================================================
Supports: PDF, DOCX, TXT, MD, PPTX, XLSX
"""
import os
from typing import List, Dict, Optional
from dataclasses import dataclass
from pathlib import Path
import mimetypes
# Document parsers
from pypdf import PdfReader
from docx import Document as DocxDocument
from pptx import Presentation
import openpyxl
import markdown
from bs4 import BeautifulSoup
@dataclass
class LoadedDocument:
"""Container for loaded document with metadata"""
content: str
filename: str
file_path: str
file_type: str
file_size: int
num_pages: Optional[int] = None
metadata: Dict = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
class DocumentLoader:
"""Universal document loader supporting multiple formats"""
SUPPORTED_EXTENSIONS = {
'.pdf': 'application/pdf',
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'.doc': 'application/msword',
'.txt': 'text/plain',
'.md': 'text/markdown',
'.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
'.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
}
def __init__(self, upload_dir: str = "./data/uploads"):
"""
Initialize document loader
Args:
upload_dir: Directory where uploaded documents are stored
"""
self.upload_dir = Path(upload_dir)
self.upload_dir.mkdir(parents=True, exist_ok=True)
def load(self, file_path: str) -> LoadedDocument:
"""
Load a document from file path
Args:
file_path: Path to the document
Returns:
LoadedDocument object
Raises:
ValueError: If file format is not supported
FileNotFoundError: If file doesn't exist
"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
extension = file_path.suffix.lower()
if extension not in self.SUPPORTED_EXTENSIONS:
raise ValueError(
f"Unsupported file format: {extension}. "
f"Supported: {list(self.SUPPORTED_EXTENSIONS.keys())}"
)
# Get file info
file_size = file_path.stat().st_size
filename = file_path.name
# Load based on file type
if extension == '.pdf':
content, num_pages = self._load_pdf(file_path)
elif extension in ['.docx', '.doc']:
content, num_pages = self._load_docx(file_path)
elif extension == '.txt':
content = self._load_txt(file_path)
num_pages = None
elif extension == '.md':
content = self._load_markdown(file_path)
num_pages = None
elif extension == '.pptx':
content, num_pages = self._load_pptx(file_path)
elif extension == '.xlsx':
content, num_pages = self._load_xlsx(file_path)
else:
raise ValueError(f"Unsupported extension: {extension}")
return LoadedDocument(
content=content,
filename=filename,
file_path=str(file_path),
file_type=extension,
file_size=file_size,
num_pages=num_pages,
metadata={
'extension': extension,
'size_bytes': file_size,
'size_kb': round(file_size / 1024, 2),
}
)
def _load_pdf(self, file_path: Path) -> tuple[str, int]:
"""Load PDF file"""
reader = PdfReader(str(file_path))
num_pages = len(reader.pages)
text_parts = []
for page_num, page in enumerate(reader.pages, 1):
text = page.extract_text()
if text.strip():
text_parts.append(f"[Page {page_num}]\n{text}")
return "\n\n".join(text_parts), num_pages
def _load_docx(self, file_path: Path) -> tuple[str, int]:
"""Load DOCX file"""
doc = DocxDocument(str(file_path))
paragraphs = []
for para in doc.paragraphs:
if para.text.strip():
paragraphs.append(para.text)
# Rough page estimate (500 words per page)
word_count = sum(len(p.split()) for p in paragraphs)
estimated_pages = max(1, word_count // 500)
return "\n\n".join(paragraphs), estimated_pages
def _load_txt(self, file_path: Path) -> str:
"""Load TXT file"""
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
return f.read()
def _load_markdown(self, file_path: Path) -> str:
"""Load Markdown file and convert to plain text"""
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
md_content = f.read()
# Convert markdown to HTML then to plain text
html = markdown.markdown(md_content)
soup = BeautifulSoup(html, 'html.parser')
return soup.get_text()
def _load_pptx(self, file_path: Path) -> tuple[str, int]:
"""Load PowerPoint file"""
prs = Presentation(str(file_path))
num_slides = len(prs.slides)
slides_text = []
for slide_num, slide in enumerate(prs.slides, 1):
slide_text = [f"[Slide {slide_num}]"]
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
slide_text.append(shape.text)
if len(slide_text) > 1: # Has content beyond title
slides_text.append("\n".join(slide_text))
return "\n\n".join(slides_text), num_slides
def _load_xlsx(self, file_path: Path) -> tuple[str, int]:
"""Load Excel file"""
workbook = openpyxl.load_workbook(str(file_path), data_only=True)
num_sheets = len(workbook.sheetnames)
sheets_text = []
for sheet_name in workbook.sheetnames:
sheet = workbook[sheet_name]
sheet_text = [f"[Sheet: {sheet_name}]"]
for row in sheet.iter_rows(values_only=True):
row_text = " | ".join(str(cell) if cell is not None else "" for cell in row)
if row_text.strip():
sheet_text.append(row_text)
if len(sheet_text) > 1:
sheets_text.append("\n".join(sheet_text))
return "\n\n".join(sheets_text), num_sheets
def load_multiple(self, file_paths: List[str]) -> List[LoadedDocument]:
"""
Load multiple documents
Args:
file_paths: List of file paths
Returns:
List of LoadedDocument objects
"""
documents = []
for file_path in file_paths:
try:
doc = self.load(file_path)
documents.append(doc)
except Exception as e:
print(f"โš ๏ธ Failed to load {file_path}: {e}")
return documents
def get_stats(self, doc: LoadedDocument) -> Dict:
"""Get statistics about a document"""
return {
'filename': doc.filename,
'type': doc.file_type,
'size_kb': doc.metadata.get('size_kb', 0),
'num_pages': doc.num_pages or 'N/A',
'char_count': len(doc.content),
'word_count': len(doc.content.split()),
'line_count': len(doc.content.split('\n')),
}
# ============================================================================
# USAGE EXAMPLE
# ============================================================================
if __name__ == "__main__":
loader = DocumentLoader()
print("๐Ÿ“„ Document Loader Test")
print("=" * 80)
# Create a test document
test_file = Path("./data/uploads/test_document.txt")
test_file.parent.mkdir(parents=True, exist_ok=True)
with open(test_file, 'w') as f:
f.write("""# RAG Pipeline Test Document
This is a test document for the RAG Pipeline Optimizer.
## Key Features
- Multi-model support
- Cost optimization
- Parallel evaluation
This document will be chunked and embedded for retrieval testing.
""")
# Load the document
doc = loader.load(test_file)
print(f"โœ… Loaded: {doc.filename}")
print(f" Type: {doc.file_type}")
print(f" Size: {doc.file_size} bytes")
print(f" Content length: {len(doc.content)} chars")
print(f"\n๐Ÿ“Š Stats:")
stats = loader.get_stats(doc)
for key, value in stats.items():
print(f" {key}: {value}")
print(f"\n๐Ÿ“ Content preview:")
print("-" * 80)
print(doc.content[:200] + "..." if len(doc.content) > 200 else doc.content)