import os import json import hashlib from pathlib import Path from typing import List, Dict, Tuple import PyPDF2 from pdf2image import convert_from_path from PIL import Image import pytesseract from config import DOCSTORE_PATH, PROCESSED_FILES_LOG class PDFParser: def __init__(self, debug: bool = True): self.docstore_path = Path(DOCSTORE_PATH) self.docstore_path.mkdir(exist_ok=True) self.processed_files = self._load_processed_files() self.debug = debug self._configure_tesseract() if self.debug: print("āœ… PDFParser initialized") def _configure_tesseract(self): try: if os.name == 'nt': pytesseract.pytesseract.pytesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe' pytesseract.get_tesseract_version() print("āœ… Tesseract configured successfully") except Exception as e: print(f"āš ļø Tesseract configuration warning: {e}") def _debug_print(self, label: str, data: any): if self.debug: print(f"\nšŸ” [PDF Parser] {label}") if isinstance(data, dict): for key, val in data.items(): print(f" {key}: {val}") elif isinstance(data, (list, tuple)): print(f" Count: {len(data)}") for i, item in enumerate(data[:3]): print(f" [{i}]: {str(item)[:100]}") else: print(f" {data}") def _load_processed_files(self) -> Dict[str, str]: if os.path.exists(PROCESSED_FILES_LOG): try: with open(PROCESSED_FILES_LOG, 'r') as f: return json.load(f) except: return {} return {} def _save_processed_files(self): with open(PROCESSED_FILES_LOG, 'w') as f: json.dump(self.processed_files, f, indent=2) def _get_file_hash(self, file_path: str) -> str: hash_md5 = hashlib.md5() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def _extract_text_from_pdf(self, pdf_path: str) -> str: text = "" try: with open(pdf_path, 'rb') as file: reader = PyPDF2.PdfReader(file) page_count = len(reader.pages) self._debug_print("PDF Text Extraction", f"Total pages: {page_count}") for page_num, page in enumerate(reader.pages): page_text = page.extract_text() text += page_text + "\n" self._debug_print(f"Page {page_num+1} Text Length", len(page_text)) except Exception as e: self._debug_print("ERROR extracting text", str(e)) self._debug_print("Total Text Extracted", len(text)) return text def _extract_images_from_pdf(self, pdf_path: str, doc_id: str) -> List[Dict]: images_data = [] try: self._debug_print("Image Extraction Started", f"File: {pdf_path}") images = convert_from_path(pdf_path, dpi=150) self._debug_print("PDF to Images Conversion", f"Total images: {len(images)}") for idx, image in enumerate(images): self._debug_print(f"Processing Image {idx}", f"Size: {image.size}") image_path = self.docstore_path / f"{doc_id}_image_{idx}.png" image.save(image_path) self._debug_print(f"Image {idx} Saved", str(image_path)) self._debug_print(f"Image {idx} OCR", "Running Tesseract OCR...") try: ocr_text = pytesseract.image_to_string(image, lang='rus') ocr_text = ocr_text.strip() if not ocr_text or len(ocr_text) < 5: self._debug_print(f"Image {idx} OCR Result", f"āš ļø EMPTY or very short ({len(ocr_text)} chars)") else: self._debug_print(f"Image {idx} OCR Result", f"āœ… Success - {len(ocr_text)} chars: {ocr_text[:150]}") except Exception as ocr_error: self._debug_print(f"Image {idx} OCR ERROR", str(ocr_error)) ocr_text = f"[Image {idx}: OCR failed - {str(ocr_error)}]" images_data.append({ 'page': idx, 'path': str(image_path), 'ocr_text': ocr_text, 'description': f"Image from page {idx + 1}" }) except Exception as e: self._debug_print("ERROR extracting images", str(e)) self._debug_print("Image Extraction Complete", f"Total: {len(images_data)}") return images_data def _extract_tables_from_pdf(self, pdf_path: str, doc_id: str) -> List[Dict]: tables_data = [] try: text = self._extract_text_from_pdf(pdf_path) lines = text.split('\n') self._debug_print("Table Detection", f"Scanning {len(lines)} lines") current_table = [] for line in lines: if '|' in line or '\t' in line: current_table.append(line) elif current_table and line.strip(): if len(current_table) > 1: tables_data.append({ 'content': '\n'.join(current_table), 'description': f"Table {len(tables_data) + 1}" }) current_table = [] if current_table and len(current_table) > 1: tables_data.append({ 'content': '\n'.join(current_table), 'description': f"Table {len(tables_data) + 1}" }) self._debug_print("Tables Found", len(tables_data)) except Exception as e: self._debug_print("ERROR extracting tables", str(e)) return tables_data def parse_pdf(self, pdf_path: str) -> Tuple[str, List[Dict], List[Dict]]: file_hash = self._get_file_hash(pdf_path) doc_id = Path(pdf_path).stem self._debug_print("PDF Parsing Started", f"File: {doc_id}, Hash: {file_hash}") if doc_id in self.processed_files: if self.processed_files[doc_id] == file_hash: self._debug_print("Status", f"File {doc_id} already processed") return self._load_extracted_data(doc_id) print(f"\nšŸ“„ Processing PDF: {doc_id}") text = self._extract_text_from_pdf(pdf_path) images = self._extract_images_from_pdf(pdf_path, doc_id) tables = self._extract_tables_from_pdf(pdf_path, doc_id) self._debug_print("Extraction Summary", { 'text_length': len(text), 'images_count': len(images), 'tables_count': len(tables), 'images_with_ocr': sum(1 for img in images if img.get('ocr_text', '').strip()) }) self._save_extracted_data(doc_id, text, images, tables) self.processed_files[doc_id] = file_hash self._save_processed_files() return text, images, tables def _save_extracted_data(self, doc_id: str, text: str, images: List[Dict], tables: List[Dict]): data = { 'text': text, 'images': images, 'tables': tables } data_path = self.docstore_path / f"{doc_id}_data.json" with open(data_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) self._debug_print("Data Saved", str(data_path)) def _load_extracted_data(self, doc_id: str) -> Tuple[str, List[Dict], List[Dict]]: data_path = self.docstore_path / f"{doc_id}_data.json" try: with open(data_path, 'r', encoding='utf-8') as f: data = json.load(f) return data['text'], data['images'], data['tables'] except: return "", [], [] def get_all_documents(self) -> Dict: all_docs = {} for json_file in self.docstore_path.glob("*_data.json"): doc_id = json_file.stem.replace("_data", "") try: with open(json_file, 'r', encoding='utf-8') as f: all_docs[doc_id] = json.load(f) except: pass return all_docs