project2 / src /pdf_parser.py
dnj0's picture
Upload 6 files
555c75a verified
import os
import json
import pdfplumber
import hashlib
from pathlib import Path
from typing import Dict, List, Tuple
from PIL import Image
import io
class PDFParser:
def __init__(self, pdf_dir: str, cache_dir: str = ".pdf_cache"):
self.pdf_dir = pdf_dir
self.cache_dir = cache_dir
self.cache_file = os.path.join(cache_dir, "processed_files.json")
# Create cache directory
os.makedirs(cache_dir, exist_ok=True)
# Load processed files cache
self.processed_files = self._load_cache()
def _load_cache(self) -> Dict:
"""Load cache of processed files"""
if os.path.exists(self.cache_file):
with open(self.cache_file, 'r') as f:
return json.load(f)
return {}
def _save_cache(self):
"""Save cache of processed files"""
with open(self.cache_file, 'w') as f:
json.dump(self.processed_files, f, indent=2)
def _get_file_hash(self, filepath: str) -> str:
"""Generate hash of file to detect changes"""
hash_md5 = hashlib.md5()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def _extract_tables(self, page) -> List[Dict]:
"""Extract tables from PDF page"""
tables = []
try:
page_tables = page.extract_tables()
for i, table in enumerate(page_tables):
table_text = "\n".join([" | ".join([str(cell) if cell else "" for cell in row]) for row in table])
tables.append({
"type": "table",
"index": i,
"content": table_text
})
except:
pass
return tables
def _extract_images(self, page, page_num: int, pdf_filename: str) -> List[Dict]:
"""Extract images from PDF page"""
images = []
try:
# Get page images
page_images = page.images
for i, img_dict in enumerate(page_images):
try:
# Get image as bytes and save locally
img_name = f"{pdf_filename}_p{page_num}_img{i}.png"
img_path = os.path.join(self.cache_dir, img_name)
# Extract image bytes
xref = img_dict["srcsize"]
if xref:
images.append({
"type": "image",
"index": i,
"path": img_path,
"description": f"Image from page {page_num}"
})
except:
pass
except:
pass
return images
def parse_pdf(self, pdf_path: str) -> Dict:
"""Parse single PDF file"""
pdf_name = os.path.basename(pdf_path)
file_hash = self._get_file_hash(pdf_path)
# Check if already processed
if pdf_name in self.processed_files:
if self.processed_files[pdf_name]["hash"] == file_hash:
print(f"βœ“ Skipping {pdf_name} (already processed)")
return self.processed_files[pdf_name]["data"]
print(f"β†’ Processing {pdf_name}...")
content = {
"filename": pdf_name,
"pages": [],
"total_pages": 0
}
try:
with pdfplumber.open(pdf_path) as pdf:
content["total_pages"] = len(pdf.pages)
for page_num, page in enumerate(pdf.pages):
page_content = {
"page_num": page_num,
"text": page.extract_text() or "",
"tables": self._extract_tables(page),
"images": self._extract_images(page, page_num, pdf_name.replace('.pdf', ''))
}
content["pages"].append(page_content)
# Update cache
self.processed_files[pdf_name] = {
"hash": file_hash,
"data": content
}
self._save_cache()
print(f"βœ“ Successfully processed {pdf_name}")
except Exception as e:
print(f"βœ— Error processing {pdf_name}: {str(e)}")
return content
def parse_all_pdfs(self) -> List[Dict]:
"""Parse all PDFs in directory"""
pdf_files = list(Path(self.pdf_dir).glob("*.pdf"))
if not pdf_files:
print(f"No PDF files found in {self.pdf_dir}")
return []
all_content = []
for pdf_path in pdf_files:
content = self.parse_pdf(str(pdf_path))
all_content.append(content)
return all_content
def extract_text_from_pdfs(pdf_dir: str) -> Tuple[List[str], List[str]]:
"""Extract all text and metadata from PDFs"""
parser = PDFParser(pdf_dir)
all_pdfs = parser.parse_all_pdfs()
documents = []
metadatas = []
for pdf_content in all_pdfs:
for page in pdf_content["pages"]:
# Extract text
text = page["text"]
# Extract table content
for table in page["tables"]:
text += "\n\n[TABLE]\n" + table["content"] + "\n[/TABLE]\n"
# Split into chunks if too long
if text.strip():
# Split by sentences for better chunking
sentences = text.split('.')
chunk = ""
for sentence in sentences:
if len(chunk) + len(sentence) < 1000:
chunk += sentence + "."
else:
if chunk.strip():
documents.append(chunk)
metadatas.append({
"filename": pdf_content["filename"],
"page": page["page_num"]
})
chunk = sentence + "."
if chunk.strip():
documents.append(chunk)
metadatas.append({
"filename": pdf_content["filename"],
"page": page["page_num"]
})
return documents, metadatas