final_project2 / src /pdf_parser.py
dnj0's picture
Update src/pdf_parser.py
ed4a4a3 verified
import os
import json
import hashlib
from pathlib import Path
from typing import List, Dict, Tuple
import PyPDF2
from pdf2image import convert_from_path
from PIL import Image
import pytesseract
from config import DOCSTORE_PATH, PROCESSED_FILES_LOG
class PDFParser:
def __init__(self, debug: bool = True):
self.docstore_path = Path(DOCSTORE_PATH)
self.docstore_path.mkdir(exist_ok=True)
self.processed_files = self._load_processed_files()
self.debug = debug
self._configure_tesseract()
if self.debug:
print("✅ PDFParser initialized")
def _configure_tesseract(self):
try:
if os.name == 'nt':
pytesseract.pytesseract.pytesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe'
pytesseract.get_tesseract_version()
print("✅ Tesseract configured successfully")
except Exception as e:
print(f"⚠️ Tesseract configuration warning: {e}")
def _debug_print(self, label: str, data: any):
if self.debug:
print(f"\n🔍 [PDF Parser] {label}")
if isinstance(data, dict):
for key, val in data.items():
print(f" {key}: {val}")
elif isinstance(data, (list, tuple)):
print(f" Count: {len(data)}")
for i, item in enumerate(data[:3]):
print(f" [{i}]: {str(item)[:100]}")
else:
print(f" {data}")
def _load_processed_files(self) -> Dict[str, str]:
if os.path.exists(PROCESSED_FILES_LOG):
try:
with open(PROCESSED_FILES_LOG, 'r') as f:
return json.load(f)
except:
return {}
return {}
def _save_processed_files(self):
with open(PROCESSED_FILES_LOG, 'w') as f:
json.dump(self.processed_files, f, indent=2)
def _get_file_hash(self, file_path: str) -> str:
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def _extract_text_from_pdf(self, pdf_path: str) -> str:
text = ""
try:
with open(pdf_path, 'rb') as file:
reader = PyPDF2.PdfReader(file)
page_count = len(reader.pages)
self._debug_print("PDF Text Extraction", f"Total pages: {page_count}")
for page_num, page in enumerate(reader.pages):
page_text = page.extract_text()
text += page_text + "\n"
self._debug_print(f"Page {page_num+1} Text Length", len(page_text))
except Exception as e:
self._debug_print("ERROR extracting text", str(e))
self._debug_print("Total Text Extracted", len(text))
return text
def _extract_images_from_pdf(self, pdf_path: str, doc_id: str) -> List[Dict]:
images_data = []
try:
self._debug_print("Image Extraction Started", f"File: {pdf_path}")
images = convert_from_path(pdf_path, dpi=150)
self._debug_print("PDF to Images Conversion", f"Total images: {len(images)}")
for idx, image in enumerate(images):
self._debug_print(f"Processing Image {idx}", f"Size: {image.size}")
image_path = self.docstore_path / f"{doc_id}_image_{idx}.png"
image.save(image_path)
self._debug_print(f"Image {idx} Saved", str(image_path))
self._debug_print(f"Image {idx} OCR", "Running Tesseract OCR...")
try:
ocr_text = pytesseract.image_to_string(image, lang='rus')
ocr_text = ocr_text.strip()
if not ocr_text or len(ocr_text) < 5:
self._debug_print(f"Image {idx} OCR Result", f"⚠️ EMPTY or very short ({len(ocr_text)} chars)")
else:
self._debug_print(f"Image {idx} OCR Result", f"✅ Success - {len(ocr_text)} chars: {ocr_text[:150]}")
except Exception as ocr_error:
self._debug_print(f"Image {idx} OCR ERROR", str(ocr_error))
ocr_text = f"[Image {idx}: OCR failed - {str(ocr_error)}]"
images_data.append({
'page': idx,
'path': str(image_path),
'ocr_text': ocr_text,
'description': f"Image from page {idx + 1}"
})
except Exception as e:
self._debug_print("ERROR extracting images", str(e))
self._debug_print("Image Extraction Complete", f"Total: {len(images_data)}")
return images_data
def _extract_tables_from_pdf(self, pdf_path: str, doc_id: str) -> List[Dict]:
tables_data = []
try:
text = self._extract_text_from_pdf(pdf_path)
lines = text.split('\n')
self._debug_print("Table Detection", f"Scanning {len(lines)} lines")
current_table = []
for line in lines:
if '|' in line or '\t' in line:
current_table.append(line)
elif current_table and line.strip():
if len(current_table) > 1:
tables_data.append({
'content': '\n'.join(current_table),
'description': f"Table {len(tables_data) + 1}"
})
current_table = []
if current_table and len(current_table) > 1:
tables_data.append({
'content': '\n'.join(current_table),
'description': f"Table {len(tables_data) + 1}"
})
self._debug_print("Tables Found", len(tables_data))
except Exception as e:
self._debug_print("ERROR extracting tables", str(e))
return tables_data
def parse_pdf(self, pdf_path: str) -> Tuple[str, List[Dict], List[Dict]]:
file_hash = self._get_file_hash(pdf_path)
doc_id = Path(pdf_path).stem
self._debug_print("PDF Parsing Started", f"File: {doc_id}, Hash: {file_hash}")
if doc_id in self.processed_files:
if self.processed_files[doc_id] == file_hash:
self._debug_print("Status", f"File {doc_id} already processed")
return self._load_extracted_data(doc_id)
print(f"\n📄 Processing PDF: {doc_id}")
text = self._extract_text_from_pdf(pdf_path)
images = self._extract_images_from_pdf(pdf_path, doc_id)
tables = self._extract_tables_from_pdf(pdf_path, doc_id)
self._debug_print("Extraction Summary", {
'text_length': len(text),
'images_count': len(images),
'tables_count': len(tables),
'images_with_ocr': sum(1 for img in images if img.get('ocr_text', '').strip())
})
self._save_extracted_data(doc_id, text, images, tables)
self.processed_files[doc_id] = file_hash
self._save_processed_files()
return text, images, tables
def _save_extracted_data(self, doc_id: str, text: str, images: List[Dict], tables: List[Dict]):
data = {
'text': text,
'images': images,
'tables': tables
}
data_path = self.docstore_path / f"{doc_id}_data.json"
with open(data_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
self._debug_print("Data Saved", str(data_path))
def _load_extracted_data(self, doc_id: str) -> Tuple[str, List[Dict], List[Dict]]:
data_path = self.docstore_path / f"{doc_id}_data.json"
try:
with open(data_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data['text'], data['images'], data['tables']
except:
return "", [], []
def get_all_documents(self) -> Dict:
all_docs = {}
for json_file in self.docstore_path.glob("*_data.json"):
doc_id = json_file.stem.replace("_data", "")
try:
with open(json_file, 'r', encoding='utf-8') as f:
all_docs[doc_id] = json.load(f)
except:
pass
return all_docs