File size: 6,745 Bytes
555c75a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
import os
import json
import pdfplumber
import hashlib
from pathlib import Path
from typing import Dict, List, Tuple
from PIL import Image
import io
class PDFParser:
def __init__(self, pdf_dir: str, cache_dir: str = ".pdf_cache"):
self.pdf_dir = pdf_dir
self.cache_dir = cache_dir
self.cache_file = os.path.join(cache_dir, "processed_files.json")
# Create cache directory
os.makedirs(cache_dir, exist_ok=True)
# Load processed files cache
self.processed_files = self._load_cache()
def _load_cache(self) -> Dict:
"""Load cache of processed files"""
if os.path.exists(self.cache_file):
with open(self.cache_file, 'r') as f:
return json.load(f)
return {}
def _save_cache(self):
"""Save cache of processed files"""
with open(self.cache_file, 'w') as f:
json.dump(self.processed_files, f, indent=2)
def _get_file_hash(self, filepath: str) -> str:
"""Generate hash of file to detect changes"""
hash_md5 = hashlib.md5()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def _extract_tables(self, page) -> List[Dict]:
"""Extract tables from PDF page"""
tables = []
try:
page_tables = page.extract_tables()
for i, table in enumerate(page_tables):
table_text = "\n".join([" | ".join([str(cell) if cell else "" for cell in row]) for row in table])
tables.append({
"type": "table",
"index": i,
"content": table_text
})
except:
pass
return tables
def _extract_images(self, page, page_num: int, pdf_filename: str) -> List[Dict]:
"""Extract images from PDF page"""
images = []
try:
# Get page images
page_images = page.images
for i, img_dict in enumerate(page_images):
try:
# Get image as bytes and save locally
img_name = f"{pdf_filename}_p{page_num}_img{i}.png"
img_path = os.path.join(self.cache_dir, img_name)
# Extract image bytes
xref = img_dict["srcsize"]
if xref:
images.append({
"type": "image",
"index": i,
"path": img_path,
"description": f"Image from page {page_num}"
})
except:
pass
except:
pass
return images
def parse_pdf(self, pdf_path: str) -> Dict:
"""Parse single PDF file"""
pdf_name = os.path.basename(pdf_path)
file_hash = self._get_file_hash(pdf_path)
# Check if already processed
if pdf_name in self.processed_files:
if self.processed_files[pdf_name]["hash"] == file_hash:
print(f"✓ Skipping {pdf_name} (already processed)")
return self.processed_files[pdf_name]["data"]
print(f"→ Processing {pdf_name}...")
content = {
"filename": pdf_name,
"pages": [],
"total_pages": 0
}
try:
with pdfplumber.open(pdf_path) as pdf:
content["total_pages"] = len(pdf.pages)
for page_num, page in enumerate(pdf.pages):
page_content = {
"page_num": page_num,
"text": page.extract_text() or "",
"tables": self._extract_tables(page),
"images": self._extract_images(page, page_num, pdf_name.replace('.pdf', ''))
}
content["pages"].append(page_content)
# Update cache
self.processed_files[pdf_name] = {
"hash": file_hash,
"data": content
}
self._save_cache()
print(f"✓ Successfully processed {pdf_name}")
except Exception as e:
print(f"✗ Error processing {pdf_name}: {str(e)}")
return content
def parse_all_pdfs(self) -> List[Dict]:
"""Parse all PDFs in directory"""
pdf_files = list(Path(self.pdf_dir).glob("*.pdf"))
if not pdf_files:
print(f"No PDF files found in {self.pdf_dir}")
return []
all_content = []
for pdf_path in pdf_files:
content = self.parse_pdf(str(pdf_path))
all_content.append(content)
return all_content
def extract_text_from_pdfs(pdf_dir: str) -> Tuple[List[str], List[str]]:
"""Extract all text and metadata from PDFs"""
parser = PDFParser(pdf_dir)
all_pdfs = parser.parse_all_pdfs()
documents = []
metadatas = []
for pdf_content in all_pdfs:
for page in pdf_content["pages"]:
# Extract text
text = page["text"]
# Extract table content
for table in page["tables"]:
text += "\n\n[TABLE]\n" + table["content"] + "\n[/TABLE]\n"
# Split into chunks if too long
if text.strip():
# Split by sentences for better chunking
sentences = text.split('.')
chunk = ""
for sentence in sentences:
if len(chunk) + len(sentence) < 1000:
chunk += sentence + "."
else:
if chunk.strip():
documents.append(chunk)
metadatas.append({
"filename": pdf_content["filename"],
"page": page["page_num"]
})
chunk = sentence + "."
if chunk.strip():
documents.append(chunk)
metadatas.append({
"filename": pdf_content["filename"],
"page": page["page_num"]
})
return documents, metadatas |