Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import hashlib | |
| from pathlib import Path | |
| from typing import List, Dict, Tuple | |
| import PyPDF2 | |
| from pdf2image import convert_from_path | |
| from PIL import Image | |
| import pytesseract | |
| from config import DOCSTORE_PATH, PROCESSED_FILES_LOG | |
| class PDFParser: | |
| def __init__(self, debug: bool = True): | |
| self.docstore_path = Path(DOCSTORE_PATH) | |
| self.docstore_path.mkdir(exist_ok=True) | |
| self.processed_files = self._load_processed_files() | |
| self.debug = debug | |
| self._configure_tesseract() | |
| if self.debug: | |
| print("✅ PDFParser initialized") | |
| def _configure_tesseract(self): | |
| try: | |
| if os.name == 'nt': | |
| pytesseract.pytesseract.pytesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe' | |
| pytesseract.get_tesseract_version() | |
| print("✅ Tesseract configured successfully") | |
| except Exception as e: | |
| print(f"⚠️ Tesseract configuration warning: {e}") | |
| def _debug_print(self, label: str, data: any): | |
| if self.debug: | |
| print(f"\n🔍 [PDF Parser] {label}") | |
| if isinstance(data, dict): | |
| for key, val in data.items(): | |
| print(f" {key}: {val}") | |
| elif isinstance(data, (list, tuple)): | |
| print(f" Count: {len(data)}") | |
| for i, item in enumerate(data[:3]): | |
| print(f" [{i}]: {str(item)[:100]}") | |
| else: | |
| print(f" {data}") | |
| def _load_processed_files(self) -> Dict[str, str]: | |
| if os.path.exists(PROCESSED_FILES_LOG): | |
| try: | |
| with open(PROCESSED_FILES_LOG, 'r') as f: | |
| return json.load(f) | |
| except: | |
| return {} | |
| return {} | |
| def _save_processed_files(self): | |
| with open(PROCESSED_FILES_LOG, 'w') as f: | |
| json.dump(self.processed_files, f, indent=2) | |
| def _get_file_hash(self, file_path: str) -> str: | |
| hash_md5 = hashlib.md5() | |
| with open(file_path, "rb") as f: | |
| for chunk in iter(lambda: f.read(4096), b""): | |
| hash_md5.update(chunk) | |
| return hash_md5.hexdigest() | |
| def _extract_text_from_pdf(self, pdf_path: str) -> str: | |
| text = "" | |
| try: | |
| with open(pdf_path, 'rb') as file: | |
| reader = PyPDF2.PdfReader(file) | |
| page_count = len(reader.pages) | |
| self._debug_print("PDF Text Extraction", f"Total pages: {page_count}") | |
| for page_num, page in enumerate(reader.pages): | |
| page_text = page.extract_text() | |
| text += page_text + "\n" | |
| self._debug_print(f"Page {page_num+1} Text Length", len(page_text)) | |
| except Exception as e: | |
| self._debug_print("ERROR extracting text", str(e)) | |
| self._debug_print("Total Text Extracted", len(text)) | |
| return text | |
| def _extract_images_from_pdf(self, pdf_path: str, doc_id: str) -> List[Dict]: | |
| images_data = [] | |
| try: | |
| self._debug_print("Image Extraction Started", f"File: {pdf_path}") | |
| images = convert_from_path(pdf_path, dpi=150) | |
| self._debug_print("PDF to Images Conversion", f"Total images: {len(images)}") | |
| for idx, image in enumerate(images): | |
| self._debug_print(f"Processing Image {idx}", f"Size: {image.size}") | |
| image_path = self.docstore_path / f"{doc_id}_image_{idx}.png" | |
| image.save(image_path) | |
| self._debug_print(f"Image {idx} Saved", str(image_path)) | |
| self._debug_print(f"Image {idx} OCR", "Running Tesseract OCR...") | |
| try: | |
| ocr_text = pytesseract.image_to_string(image, lang='rus') | |
| ocr_text = ocr_text.strip() | |
| if not ocr_text or len(ocr_text) < 5: | |
| self._debug_print(f"Image {idx} OCR Result", f"⚠️ EMPTY or very short ({len(ocr_text)} chars)") | |
| else: | |
| self._debug_print(f"Image {idx} OCR Result", f"✅ Success - {len(ocr_text)} chars: {ocr_text[:150]}") | |
| except Exception as ocr_error: | |
| self._debug_print(f"Image {idx} OCR ERROR", str(ocr_error)) | |
| ocr_text = f"[Image {idx}: OCR failed - {str(ocr_error)}]" | |
| images_data.append({ | |
| 'page': idx, | |
| 'path': str(image_path), | |
| 'ocr_text': ocr_text, | |
| 'description': f"Image from page {idx + 1}" | |
| }) | |
| except Exception as e: | |
| self._debug_print("ERROR extracting images", str(e)) | |
| self._debug_print("Image Extraction Complete", f"Total: {len(images_data)}") | |
| return images_data | |
| def _extract_tables_from_pdf(self, pdf_path: str, doc_id: str) -> List[Dict]: | |
| tables_data = [] | |
| try: | |
| text = self._extract_text_from_pdf(pdf_path) | |
| lines = text.split('\n') | |
| self._debug_print("Table Detection", f"Scanning {len(lines)} lines") | |
| current_table = [] | |
| for line in lines: | |
| if '|' in line or '\t' in line: | |
| current_table.append(line) | |
| elif current_table and line.strip(): | |
| if len(current_table) > 1: | |
| tables_data.append({ | |
| 'content': '\n'.join(current_table), | |
| 'description': f"Table {len(tables_data) + 1}" | |
| }) | |
| current_table = [] | |
| if current_table and len(current_table) > 1: | |
| tables_data.append({ | |
| 'content': '\n'.join(current_table), | |
| 'description': f"Table {len(tables_data) + 1}" | |
| }) | |
| self._debug_print("Tables Found", len(tables_data)) | |
| except Exception as e: | |
| self._debug_print("ERROR extracting tables", str(e)) | |
| return tables_data | |
| def parse_pdf(self, pdf_path: str) -> Tuple[str, List[Dict], List[Dict]]: | |
| file_hash = self._get_file_hash(pdf_path) | |
| doc_id = Path(pdf_path).stem | |
| self._debug_print("PDF Parsing Started", f"File: {doc_id}, Hash: {file_hash}") | |
| if doc_id in self.processed_files: | |
| if self.processed_files[doc_id] == file_hash: | |
| self._debug_print("Status", f"File {doc_id} already processed") | |
| return self._load_extracted_data(doc_id) | |
| print(f"\n📄 Processing PDF: {doc_id}") | |
| text = self._extract_text_from_pdf(pdf_path) | |
| images = self._extract_images_from_pdf(pdf_path, doc_id) | |
| tables = self._extract_tables_from_pdf(pdf_path, doc_id) | |
| self._debug_print("Extraction Summary", { | |
| 'text_length': len(text), | |
| 'images_count': len(images), | |
| 'tables_count': len(tables), | |
| 'images_with_ocr': sum(1 for img in images if img.get('ocr_text', '').strip()) | |
| }) | |
| self._save_extracted_data(doc_id, text, images, tables) | |
| self.processed_files[doc_id] = file_hash | |
| self._save_processed_files() | |
| return text, images, tables | |
| def _save_extracted_data(self, doc_id: str, text: str, images: List[Dict], tables: List[Dict]): | |
| data = { | |
| 'text': text, | |
| 'images': images, | |
| 'tables': tables | |
| } | |
| data_path = self.docstore_path / f"{doc_id}_data.json" | |
| with open(data_path, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| self._debug_print("Data Saved", str(data_path)) | |
| def _load_extracted_data(self, doc_id: str) -> Tuple[str, List[Dict], List[Dict]]: | |
| data_path = self.docstore_path / f"{doc_id}_data.json" | |
| try: | |
| with open(data_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| return data['text'], data['images'], data['tables'] | |
| except: | |
| return "", [], [] | |
| def get_all_documents(self) -> Dict: | |
| all_docs = {} | |
| for json_file in self.docstore_path.glob("*_data.json"): | |
| doc_id = json_file.stem.replace("_data", "") | |
| try: | |
| with open(json_file, 'r', encoding='utf-8') as f: | |
| all_docs[doc_id] = json.load(f) | |
| except: | |
| pass | |
| return all_docs |