import fitz # PyMuPDF import os import logging from pathlib import Path import numpy as np from PIL import Image import io import cv2 # Add this import from storage import StorageInterface from typing import List, Dict, Tuple, Any import json from text_detection_combined import process_drawing # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class DocumentProcessor: def __init__(self, storage: StorageInterface): self.storage = storage self.logger = logging.getLogger(__name__) # Configure optimal processing parameters self.target_dpi = 600 # Increased from 300 to 600 DPI self.min_dimension = 2000 # Minimum width/height self.max_dimension = 8000 # Increased max dimension for higher DPI self.quality = 95 # JPEG quality for saving def process_document(self, file_path: str, output_dir: str) -> list: """Process document (PDF/PNG/JPG) and return paths to processed pages""" file_ext = Path(file_path).suffix.lower() if file_ext == '.pdf': return self._process_pdf(file_path, output_dir) elif file_ext in ['.png', '.jpg', '.jpeg']: return self._process_image(file_path, output_dir) else: raise ValueError(f"Unsupported file format: {file_ext}") def _process_pdf(self, pdf_path: str, output_dir: str) -> list: """Process PDF document""" processed_pages = [] processing_results = {} try: # Create output directory if it doesn't exist os.makedirs(output_dir, exist_ok=True) # Clean up any existing files for this document base_name = Path(pdf_path).stem for file in os.listdir(output_dir): if file.startswith(base_name) and file != os.path.basename(pdf_path): file_path = os.path.join(output_dir, file) try: if os.path.isfile(file_path): os.unlink(file_path) except Exception as e: self.logger.error(f"Error deleting file {file_path}: {e}") # Read PDF file directly since it's already in the results directory with open(pdf_path, 'rb') as f: pdf_data = f.read() doc = fitz.open(stream=pdf_data, filetype="pdf") for page_num in range(len(doc)): page = doc[page_num] # Calculate zoom factor for 600 DPI zoom = self.target_dpi / 72 matrix = fitz.Matrix(zoom, zoom) # Get high-resolution image pix = page.get_pixmap(matrix=matrix) img_data = pix.tobytes() # Convert to numpy array nparr = np.frombuffer(img_data, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) # Create base filename base_filename = f"{Path(pdf_path).stem}_page_{page_num + 1}" # Process and save different versions optimized_versions = { 'text': self._optimize_for_text(img.copy()), 'symbol': self._optimize_for_symbols(img.copy()), 'line': self._optimize_for_lines(img.copy()) } paths = { 'text': os.path.join(output_dir, f"{base_filename}_text.png"), 'symbol': os.path.join(output_dir, f"{base_filename}_symbol.png"), 'line': os.path.join(output_dir, f"{base_filename}_line.png") } # Save each version for version_type, optimized_img in optimized_versions.items(): self._save_image(optimized_img, paths[version_type]) processed_pages.append(paths[version_type]) # Store processing results processing_results[str(page_num + 1)] = { "page_number": page_num + 1, "dimensions": { "width": img.shape[1], "height": img.shape[0] }, "paths": paths, "dpi": self.target_dpi, "zoom_factor": zoom } # Save processing results JSON results_json_path = os.path.join( output_dir, f"{Path(pdf_path).stem}_processing_results.json" ) with open(results_json_path, 'w') as f: json.dump(processing_results, f, indent=4) return processed_pages except Exception as e: self.logger.error(f"Error processing PDF: {str(e)}") raise def _process_image(self, image_path: str, output_dir: str) -> list: """Process single image file""" try: # Load image image_data = self.storage.load_file(image_path) nparr = np.frombuffer(image_data, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) # Process the image processed_img = self._optimize_image(img) # Save processed image output_path = os.path.join( output_dir, f"{Path(image_path).stem}_text.png" ) self._save_image(processed_img, output_path) return [output_path] except Exception as e: self.logger.error(f"Error processing image: {str(e)}") raise def _optimize_image(self, img: np.ndarray) -> np.ndarray: """Optimize image for best detection results""" # Convert to grayscale for processing gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # Enhance contrast clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) enhanced = clahe.apply(gray) # Denoise denoised = cv2.fastNlMeansDenoising(enhanced) # Binarize _, binary = cv2.threshold(denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) # Resize while maintaining aspect ratio height, width = binary.shape scale = min(self.max_dimension / max(width, height), max(self.min_dimension / min(width, height), 1.0)) if scale != 1.0: new_width = int(width * scale) new_height = int(height * scale) resized = cv2.resize(binary, (new_width, new_height), interpolation=cv2.INTER_LANCZOS4) else: resized = binary # Convert back to BGR for compatibility return cv2.cvtColor(resized, cv2.COLOR_GRAY2BGR) def _optimize_for_text(self, img: np.ndarray) -> np.ndarray: """Optimize image for text detection""" # Convert to grayscale gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # Enhance contrast using CLAHE clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) enhanced = clahe.apply(gray) # Denoise denoised = cv2.fastNlMeansDenoising(enhanced) # Adaptive thresholding for better text separation binary = cv2.adaptiveThreshold(denoised, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2) # Convert back to BGR return cv2.cvtColor(binary, cv2.COLOR_GRAY2BGR) def _optimize_for_symbols(self, img: np.ndarray) -> np.ndarray: """Optimize image for symbol detection""" # Convert to grayscale gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # Bilateral filter to preserve edges while reducing noise bilateral = cv2.bilateralFilter(gray, 9, 75, 75) # Enhance contrast clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) enhanced = clahe.apply(bilateral) # Sharpen image kernel = np.array([[-1,-1,-1], [-1, 9,-1], [-1,-1,-1]]) sharpened = cv2.filter2D(enhanced, -1, kernel) # Convert back to BGR return cv2.cvtColor(sharpened, cv2.COLOR_GRAY2BGR) def _optimize_for_lines(self, img: np.ndarray) -> np.ndarray: """Optimize image for line detection""" # Convert to grayscale gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # Reduce noise while preserving edges denoised = cv2.GaussianBlur(gray, (3,3), 0) # Edge enhancement edges = cv2.Canny(denoised, 50, 150) # Dilate edges to connect broken lines kernel = np.ones((2,2), np.uint8) dilated = cv2.dilate(edges, kernel, iterations=1) # Convert back to BGR return cv2.cvtColor(dilated, cv2.COLOR_GRAY2BGR) def _save_image(self, img: np.ndarray, output_path: str): """Save processed image with optimal quality""" # Encode image with high quality _, buffer = cv2.imencode('.png', img, [ cv2.IMWRITE_PNG_COMPRESSION, 0 ]) # Save to storage self.storage.save_file(output_path, buffer.tobytes()) if __name__ == "__main__": from storage import StorageFactory import shutil # Initialize storage and processor storage = StorageFactory.get_storage() processor = DocumentProcessor(storage) # Process PDF pdf_path = "samples/001.pdf" output_dir = "results" # Changed from "processed_pages" to "results" try: # Ensure output directory exists os.makedirs(output_dir, exist_ok=True) results = processor.process_document( file_path=pdf_path, output_dir=output_dir ) # Print detailed results print("\nProcessing Results:") print(f"Output Directory: {os.path.abspath(output_dir)}") for page_path in results: abs_path = os.path.abspath(page_path) file_size = os.path.getsize(page_path) / (1024 * 1024) # Convert to MB print(f"- {os.path.basename(page_path)} ({file_size:.2f} MB)") # Calculate total size of output total_size = sum(os.path.getsize(os.path.join(output_dir, f)) for f in os.listdir(output_dir)) / (1024 * 1024) print(f"\nTotal output size: {total_size:.2f} MB") except Exception as e: logger.error(f"Error processing PDF: {str(e)}") raise