Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| # paddle_ocr_standalone.py - Robust version with comprehensive error handling | |
| import sys | |
| import os | |
| import json | |
| import tempfile | |
| import traceback | |
| def safe_print_stderr(message): | |
| """Safely print to stderr""" | |
| try: | |
| print(message, file=sys.stderr, flush=True) | |
| except: | |
| pass | |
| def safe_print_json(data): | |
| """Safely print JSON to stdout""" | |
| try: | |
| print(json.dumps(data), flush=True) | |
| except Exception as e: | |
| safe_print_stderr(f"Error printing JSON: {e}") | |
| print('{"success": false, "error": "JSON serialization failed"}') | |
| # Check if file path was provided | |
| if len(sys.argv) < 2: | |
| safe_print_json({"success": False, "error": "Usage: python paddle_ocr_standalone.py <file_path>"}) | |
| sys.exit(1) | |
| file_path = sys.argv[1] | |
| temp_files = [] | |
| try: | |
| safe_print_stderr(f"=== Starting OCR processing for: {os.path.basename(file_path)} ===") | |
| # Check if file exists and is readable | |
| if not os.path.exists(file_path): | |
| raise Exception(f"File does not exist: {file_path}") | |
| if not os.access(file_path, os.R_OK): | |
| raise Exception(f"File is not readable: {file_path}") | |
| file_size = os.path.getsize(file_path) | |
| safe_print_stderr(f"File size: {file_size} bytes") | |
| # Import dependencies one by one with error handling | |
| safe_print_stderr("Importing PyMuPDF...") | |
| try: | |
| import fitz | |
| safe_print_stderr("β PyMuPDF imported successfully") | |
| except Exception as e: | |
| raise Exception(f"Failed to import PyMuPDF: {e}") | |
| # Apply monkey patch for PyMuPDF compatibility | |
| safe_print_stderr("Applying PyMuPDF compatibility patches...") | |
| try: | |
| if not hasattr(fitz.Document, 'pageCount'): | |
| def pageCount_property(self): | |
| return self.page_count | |
| fitz.Document.pageCount = property(pageCount_property) | |
| safe_print_stderr("β Added pageCount property") | |
| if not hasattr(fitz.Page, 'getPixmap'): | |
| def getPixmap(self, matrix=None, alpha=True): | |
| return self.get_pixmap(matrix=matrix, alpha=alpha) | |
| fitz.Page.getPixmap = getPixmap | |
| safe_print_stderr("β Added getPixmap method") | |
| if not hasattr(fitz.Page, 'getText'): | |
| def getText(self, option="text"): | |
| return self.get_text(option) | |
| fitz.Page.getText = getText | |
| safe_print_stderr("β Added getText method") | |
| except Exception as e: | |
| safe_print_stderr(f"Warning: Monkey patch failed: {e}") | |
| # Test PDF opening | |
| safe_print_stderr("Testing PDF opening...") | |
| try: | |
| test_doc = fitz.open(file_path) | |
| page_count = len(test_doc) | |
| safe_print_stderr(f"β PDF opened successfully, {page_count} pages detected") | |
| test_doc.close() | |
| except Exception as e: | |
| raise Exception(f"Failed to open PDF: {e}") | |
| # Import PaddleOCR | |
| safe_print_stderr("Importing PaddleOCR...") | |
| try: | |
| from paddleocr import PaddleOCR | |
| safe_print_stderr("β PaddleOCR imported successfully") | |
| except Exception as e: | |
| raise Exception(f"Failed to import PaddleOCR: {e}") | |
| # Initialize PaddleOCR | |
| safe_print_stderr("Initializing PaddleOCR...") | |
| try: | |
| ocr = PaddleOCR( | |
| use_angle_cls=True, | |
| lang='en', | |
| show_log=False, | |
| use_gpu=False | |
| ) | |
| safe_print_stderr("β PaddleOCR initialized successfully") | |
| except Exception as e: | |
| raise Exception(f"Failed to initialize PaddleOCR: {e}") | |
| def pdf_to_images(pdf_path, dpi=150): | |
| """Convert PDF pages to images""" | |
| try: | |
| safe_print_stderr(f"Converting PDF to images (DPI: {dpi})...") | |
| doc = fitz.open(pdf_path) | |
| image_paths = [] | |
| total_pages = len(doc) # Store this before we close the document | |
| safe_print_stderr(f"PDF has {total_pages} pages") | |
| for page_num in range(total_pages): | |
| try: | |
| safe_print_stderr(f"Converting page {page_num + 1}...") | |
| page = doc[page_num] | |
| # Create transformation matrix | |
| mat = fitz.Matrix(dpi/72, dpi/72) | |
| # Render page to pixmap | |
| if hasattr(page, 'getPixmap'): | |
| pix = page.getPixmap(matrix=mat) | |
| else: | |
| pix = page.get_pixmap(matrix=mat) | |
| # Save to temporary file | |
| temp_img_path = f"/tmp/ocr_page_{page_num}_{os.getpid()}.png" | |
| pix.save(temp_img_path) | |
| # Verify file creation | |
| if os.path.exists(temp_img_path): | |
| file_size = os.path.getsize(temp_img_path) | |
| safe_print_stderr(f"β Page {page_num + 1} converted: {temp_img_path} (size: {file_size} bytes, {pix.width}x{pix.height})") | |
| image_paths.append(temp_img_path) | |
| else: | |
| safe_print_stderr(f"β Failed to create image: {temp_img_path}") | |
| except Exception as page_error: | |
| safe_print_stderr(f"β Error converting page {page_num + 1}: {page_error}") | |
| continue | |
| doc.close() | |
| safe_print_stderr(f"β Successfully converted {len(image_paths)}/{total_pages} pages") | |
| return image_paths | |
| except Exception as e: | |
| safe_print_stderr(f"β PDF conversion failed: {e}") | |
| traceback.print_exc(file=sys.stderr) | |
| return [] | |
| def cleanup_temp_files(file_paths): | |
| """Clean up temporary files""" | |
| for file_path in file_paths: | |
| try: | |
| if os.path.exists(file_path): | |
| os.unlink(file_path) | |
| safe_print_stderr(f"β Cleaned up: {file_path}") | |
| except Exception as e: | |
| safe_print_stderr(f"Warning: Could not clean up {file_path}: {e}") | |
| # Determine file type and convert if needed | |
| is_pdf = file_path.lower().endswith('.pdf') | |
| if is_pdf: | |
| safe_print_stderr("Processing PDF file...") | |
| image_paths = pdf_to_images(file_path) | |
| temp_files = image_paths | |
| if not image_paths: | |
| raise Exception("PDF conversion produced no images") | |
| total_pages = len(image_paths) | |
| safe_print_stderr(f"Will process {total_pages} images") | |
| else: | |
| safe_print_stderr("Processing image file...") | |
| image_paths = [file_path] | |
| total_pages = 1 | |
| safe_print_stderr(f"TOTAL_PAGES:{total_pages}") | |
| # Process each image with OCR | |
| safe_print_stderr("Starting OCR processing...") | |
| extracted_text = "" | |
| pages_processed = 0 | |
| for i, img_path in enumerate(image_paths): | |
| try: | |
| current_page = i + 1 | |
| safe_print_stderr(f"CURRENT_PAGE:{current_page}") | |
| safe_print_stderr(f"Processing image: {img_path}") | |
| # Verify image exists and is readable | |
| if not os.path.exists(img_path): | |
| safe_print_stderr(f"β Image file does not exist: {img_path}") | |
| continue | |
| img_size = os.path.getsize(img_path) | |
| safe_print_stderr(f"Image size: {img_size} bytes") | |
| # Run OCR on the image | |
| safe_print_stderr(f"Running OCR on page {current_page}...") | |
| result = ocr.ocr(img_path, cls=True) | |
| safe_print_stderr(f"OCR result type: {type(result)}") | |
| if result: | |
| safe_print_stderr(f"OCR result length: {len(result)}") | |
| if result[0]: | |
| safe_print_stderr(f"Page {current_page} has {len(result[0])} text regions detected") | |
| else: | |
| safe_print_stderr(f"Page {current_page}: OCR returned empty result") | |
| else: | |
| safe_print_stderr(f"Page {current_page}: OCR returned None") | |
| continue | |
| if result and result[0]: | |
| pages_processed += 1 | |
| page_text = "" | |
| for line_idx, line in enumerate(result[0]): | |
| try: | |
| if len(line) >= 2: | |
| text_content = line[1][0] if isinstance(line[1], (list, tuple)) else str(line[1]) | |
| confidence = line[1][1] if isinstance(line[1], (list, tuple)) and len(line[1]) > 1 else 1.0 | |
| safe_print_stderr(f"Line {line_idx}: '{text_content}' (confidence: {confidence:.2f})") | |
| if confidence > 0.3: | |
| page_text += text_content + "\n" | |
| except Exception as line_error: | |
| safe_print_stderr(f"Error processing line {line_idx}: {line_error}") | |
| continue | |
| if page_text.strip(): | |
| extracted_text += f"\n--- Page {current_page} ---\n" | |
| extracted_text += page_text | |
| safe_print_stderr(f"β Page {current_page}: Added {len(page_text)} characters of text") | |
| else: | |
| safe_print_stderr(f"Page {current_page}: No text above confidence threshold") | |
| else: | |
| safe_print_stderr(f"Page {current_page}: No OCR results") | |
| except Exception as page_error: | |
| safe_print_stderr(f"β Error processing page {current_page}: {page_error}") | |
| traceback.print_exc(file=sys.stderr) | |
| continue | |
| # Clean up temporary files | |
| if temp_files: | |
| safe_print_stderr("Cleaning up temporary files...") | |
| cleanup_temp_files(temp_files) | |
| # Prepare final result | |
| result_data = { | |
| "success": True, | |
| "text": extracted_text, | |
| "total_pages": total_pages, | |
| "pages_processed": pages_processed, | |
| "method": "pdf_to_images" if is_pdf else "direct_image" | |
| } | |
| safe_print_stderr(f"=== OCR Complete: {pages_processed}/{total_pages} pages processed ===") | |
| safe_print_stderr(f"Total text length: {len(extracted_text)} characters") | |
| # Output final JSON result | |
| safe_print_json(result_data) | |
| except Exception as e: | |
| # Clean up on error | |
| if temp_files: | |
| try: | |
| cleanup_temp_files(temp_files) | |
| except: | |
| pass | |
| safe_print_stderr(f"=== FATAL ERROR ===") | |
| safe_print_stderr(f"Error: {e}") | |
| traceback.print_exc(file=sys.stderr) | |
| error_data = { | |
| "success": False, | |
| "error": str(e) | |
| } | |
| safe_print_json(error_data) | |
| sys.exit(1) |