|
|
import os
|
|
|
import json
|
|
|
import pdfplumber
|
|
|
import hashlib
|
|
|
from pathlib import Path
|
|
|
from typing import Dict, List, Tuple
|
|
|
from PIL import Image
|
|
|
import io
|
|
|
|
|
|
class PDFParser:
|
|
|
def __init__(self, pdf_dir: str, cache_dir: str = ".pdf_cache"):
|
|
|
self.pdf_dir = pdf_dir
|
|
|
self.cache_dir = cache_dir
|
|
|
self.cache_file = os.path.join(cache_dir, "processed_files.json")
|
|
|
|
|
|
|
|
|
os.makedirs(cache_dir, exist_ok=True)
|
|
|
|
|
|
|
|
|
self.processed_files = self._load_cache()
|
|
|
|
|
|
def _load_cache(self) -> Dict:
|
|
|
"""Load cache of processed files"""
|
|
|
if os.path.exists(self.cache_file):
|
|
|
with open(self.cache_file, 'r') as f:
|
|
|
return json.load(f)
|
|
|
return {}
|
|
|
|
|
|
def _save_cache(self):
|
|
|
"""Save cache of processed files"""
|
|
|
with open(self.cache_file, 'w') as f:
|
|
|
json.dump(self.processed_files, f, indent=2)
|
|
|
|
|
|
def _get_file_hash(self, filepath: str) -> str:
|
|
|
"""Generate hash of file to detect changes"""
|
|
|
hash_md5 = hashlib.md5()
|
|
|
with open(filepath, "rb") as f:
|
|
|
for chunk in iter(lambda: f.read(4096), b""):
|
|
|
hash_md5.update(chunk)
|
|
|
return hash_md5.hexdigest()
|
|
|
|
|
|
def _extract_tables(self, page) -> List[Dict]:
|
|
|
"""Extract tables from PDF page"""
|
|
|
tables = []
|
|
|
try:
|
|
|
page_tables = page.extract_tables()
|
|
|
for i, table in enumerate(page_tables):
|
|
|
table_text = "\n".join([" | ".join([str(cell) if cell else "" for cell in row]) for row in table])
|
|
|
tables.append({
|
|
|
"type": "table",
|
|
|
"index": i,
|
|
|
"content": table_text
|
|
|
})
|
|
|
except:
|
|
|
pass
|
|
|
return tables
|
|
|
|
|
|
def _extract_images(self, page, page_num: int, pdf_filename: str) -> List[Dict]:
|
|
|
"""Extract images from PDF page"""
|
|
|
images = []
|
|
|
try:
|
|
|
|
|
|
page_images = page.images
|
|
|
for i, img_dict in enumerate(page_images):
|
|
|
try:
|
|
|
|
|
|
img_name = f"{pdf_filename}_p{page_num}_img{i}.png"
|
|
|
img_path = os.path.join(self.cache_dir, img_name)
|
|
|
|
|
|
|
|
|
xref = img_dict["srcsize"]
|
|
|
if xref:
|
|
|
images.append({
|
|
|
"type": "image",
|
|
|
"index": i,
|
|
|
"path": img_path,
|
|
|
"description": f"Image from page {page_num}"
|
|
|
})
|
|
|
except:
|
|
|
pass
|
|
|
except:
|
|
|
pass
|
|
|
return images
|
|
|
|
|
|
def parse_pdf(self, pdf_path: str) -> Dict:
|
|
|
"""Parse single PDF file"""
|
|
|
pdf_name = os.path.basename(pdf_path)
|
|
|
file_hash = self._get_file_hash(pdf_path)
|
|
|
|
|
|
|
|
|
if pdf_name in self.processed_files:
|
|
|
if self.processed_files[pdf_name]["hash"] == file_hash:
|
|
|
print(f"β Skipping {pdf_name} (already processed)")
|
|
|
return self.processed_files[pdf_name]["data"]
|
|
|
|
|
|
print(f"β Processing {pdf_name}...")
|
|
|
content = {
|
|
|
"filename": pdf_name,
|
|
|
"pages": [],
|
|
|
"total_pages": 0
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
with pdfplumber.open(pdf_path) as pdf:
|
|
|
content["total_pages"] = len(pdf.pages)
|
|
|
|
|
|
for page_num, page in enumerate(pdf.pages):
|
|
|
page_content = {
|
|
|
"page_num": page_num,
|
|
|
"text": page.extract_text() or "",
|
|
|
"tables": self._extract_tables(page),
|
|
|
"images": self._extract_images(page, page_num, pdf_name.replace('.pdf', ''))
|
|
|
}
|
|
|
content["pages"].append(page_content)
|
|
|
|
|
|
|
|
|
self.processed_files[pdf_name] = {
|
|
|
"hash": file_hash,
|
|
|
"data": content
|
|
|
}
|
|
|
self._save_cache()
|
|
|
print(f"β Successfully processed {pdf_name}")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"β Error processing {pdf_name}: {str(e)}")
|
|
|
|
|
|
return content
|
|
|
|
|
|
def parse_all_pdfs(self) -> List[Dict]:
|
|
|
"""Parse all PDFs in directory"""
|
|
|
pdf_files = list(Path(self.pdf_dir).glob("*.pdf"))
|
|
|
|
|
|
if not pdf_files:
|
|
|
print(f"No PDF files found in {self.pdf_dir}")
|
|
|
return []
|
|
|
|
|
|
all_content = []
|
|
|
for pdf_path in pdf_files:
|
|
|
content = self.parse_pdf(str(pdf_path))
|
|
|
all_content.append(content)
|
|
|
|
|
|
return all_content
|
|
|
|
|
|
|
|
|
def extract_text_from_pdfs(pdf_dir: str) -> Tuple[List[str], List[str]]:
|
|
|
"""Extract all text and metadata from PDFs"""
|
|
|
parser = PDFParser(pdf_dir)
|
|
|
all_pdfs = parser.parse_all_pdfs()
|
|
|
|
|
|
documents = []
|
|
|
metadatas = []
|
|
|
|
|
|
for pdf_content in all_pdfs:
|
|
|
for page in pdf_content["pages"]:
|
|
|
|
|
|
text = page["text"]
|
|
|
|
|
|
|
|
|
for table in page["tables"]:
|
|
|
text += "\n\n[TABLE]\n" + table["content"] + "\n[/TABLE]\n"
|
|
|
|
|
|
|
|
|
if text.strip():
|
|
|
|
|
|
sentences = text.split('.')
|
|
|
chunk = ""
|
|
|
for sentence in sentences:
|
|
|
if len(chunk) + len(sentence) < 1000:
|
|
|
chunk += sentence + "."
|
|
|
else:
|
|
|
if chunk.strip():
|
|
|
documents.append(chunk)
|
|
|
metadatas.append({
|
|
|
"filename": pdf_content["filename"],
|
|
|
"page": page["page_num"]
|
|
|
})
|
|
|
chunk = sentence + "."
|
|
|
|
|
|
if chunk.strip():
|
|
|
documents.append(chunk)
|
|
|
metadatas.append({
|
|
|
"filename": pdf_content["filename"],
|
|
|
"page": page["page_num"]
|
|
|
})
|
|
|
|
|
|
return documents, metadatas |