import os import fitz # PyMuPDF from typing import List, Optional from langchain.schema import Document class PDFLoader: def __init__(self, chunk_size: int = 4000, chunk_overlap: int = 200): """ Initialize the PDF document loader Args: chunk_size: Maximum size of each chunk chunk_overlap: Overlap between chunks """ self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap def load_file(self, file_path: str) -> List[Document]: """ Load a PDF file and convert it to a list of documents Args: file_path: Path to the PDF file Returns: List of Document objects """ if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") try: print(f"Loading PDF: {file_path}") # Extract metadata file_name = os.path.basename(file_path) # Open the PDF pdf = fitz.open(file_path) # Extract common metadata metadata = { "source": file_path, "title": pdf.metadata.get("title") or file_name, "author": pdf.metadata.get("author", ""), "creation_date": pdf.metadata.get("creationDate", ""), "file_type": "pdf", "page_count": len(pdf), } documents = [] text_chunks = [] # Extract text from each page for page_num, page in enumerate(pdf): text = page.get_text() if text.strip(): # Create page metadata page_metadata = metadata.copy() page_metadata.update({ "page_number": page_num + 1, }) # Chunking if len(text) <= self.chunk_size: documents.append(Document( page_content=text, metadata=page_metadata )) else: # Simple chunking strategy - can be improved chunks = self._chunk_text(text) for i, chunk in enumerate(chunks): chunk_metadata = page_metadata.copy() chunk_metadata.update({"chunk": i + 1}) documents.append(Document( page_content=chunk, metadata=chunk_metadata )) print(f"Extracted {len(documents)} chunks from PDF") return documents except Exception as e: print(f"Error loading PDF {file_path}: {str(e)}") return [] def _chunk_text(self, text: str) -> List[str]: """ Chunk text into smaller pieces Args: text: Text to chunk Returns: List of text chunks """ chunks = [] start = 0 while start < len(text): end = min(start + self.chunk_size, len(text)) # Try to find a good breaking point if end < len(text): # Look for a line break or period near the end for break_char in ['\n\n', '\n', '. ', '? ', '! ']: last_break = text.rfind(break_char, start, end) if last_break > start + self.chunk_size / 2: end = last_break + len(break_char) break chunks.append(text[start:end]) start = end - self.chunk_overlap if end < len(text) else end return chunks