ASTROIQ / app /document_loaders /pdf_loader.py
Ndg07's picture
Manual update from local script
ddffdb8
import os
import fitz # PyMuPDF
from typing import List, Optional
from langchain.schema import Document
class PDFLoader:
def __init__(self, chunk_size: int = 4000, chunk_overlap: int = 200):
"""
Initialize the PDF document loader
Args:
chunk_size: Maximum size of each chunk
chunk_overlap: Overlap between chunks
"""
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def load_file(self, file_path: str) -> List[Document]:
"""
Load a PDF file and convert it to a list of documents
Args:
file_path: Path to the PDF file
Returns:
List of Document objects
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
try:
print(f"Loading PDF: {file_path}")
# Extract metadata
file_name = os.path.basename(file_path)
# Open the PDF
pdf = fitz.open(file_path)
# Extract common metadata
metadata = {
"source": file_path,
"title": pdf.metadata.get("title") or file_name,
"author": pdf.metadata.get("author", ""),
"creation_date": pdf.metadata.get("creationDate", ""),
"file_type": "pdf",
"page_count": len(pdf),
}
documents = []
text_chunks = []
# Extract text from each page
for page_num, page in enumerate(pdf):
text = page.get_text()
if text.strip():
# Create page metadata
page_metadata = metadata.copy()
page_metadata.update({
"page_number": page_num + 1,
})
# Chunking
if len(text) <= self.chunk_size:
documents.append(Document(
page_content=text,
metadata=page_metadata
))
else:
# Simple chunking strategy - can be improved
chunks = self._chunk_text(text)
for i, chunk in enumerate(chunks):
chunk_metadata = page_metadata.copy()
chunk_metadata.update({"chunk": i + 1})
documents.append(Document(
page_content=chunk,
metadata=chunk_metadata
))
print(f"Extracted {len(documents)} chunks from PDF")
return documents
except Exception as e:
print(f"Error loading PDF {file_path}: {str(e)}")
return []
def _chunk_text(self, text: str) -> List[str]:
"""
Chunk text into smaller pieces
Args:
text: Text to chunk
Returns:
List of text chunks
"""
chunks = []
start = 0
while start < len(text):
end = min(start + self.chunk_size, len(text))
# Try to find a good breaking point
if end < len(text):
# Look for a line break or period near the end
for break_char in ['\n\n', '\n', '. ', '? ', '! ']:
last_break = text.rfind(break_char, start, end)
if last_break > start + self.chunk_size / 2:
end = last_break + len(break_char)
break
chunks.append(text[start:end])
start = end - self.chunk_overlap if end < len(text) else end
return chunks