Spaces:
Build error
Build error
| import fitz # PyMuPDF | |
| import os | |
| import logging | |
| from pathlib import Path | |
| import numpy as np | |
| from PIL import Image | |
| import io | |
| import cv2 # Add this import | |
| from storage import StorageInterface | |
| from typing import List, Dict, Tuple, Any | |
| import json | |
| from text_detection_combined import process_drawing | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class DocumentProcessor: | |
| def __init__(self, storage: StorageInterface): | |
| self.storage = storage | |
| self.logger = logging.getLogger(__name__) | |
| # Configure optimal processing parameters | |
| self.target_dpi = 600 # Increased from 300 to 600 DPI | |
| self.min_dimension = 2000 # Minimum width/height | |
| self.max_dimension = 8000 # Increased max dimension for higher DPI | |
| self.quality = 95 # JPEG quality for saving | |
| def process_document(self, file_path: str, output_dir: str) -> list: | |
| """Process document (PDF/PNG/JPG) and return paths to processed pages""" | |
| file_ext = Path(file_path).suffix.lower() | |
| if file_ext == '.pdf': | |
| return self._process_pdf(file_path, output_dir) | |
| elif file_ext in ['.png', '.jpg', '.jpeg']: | |
| return self._process_image(file_path, output_dir) | |
| else: | |
| raise ValueError(f"Unsupported file format: {file_ext}") | |
| def _process_pdf(self, pdf_path: str, output_dir: str) -> list: | |
| """Process PDF document""" | |
| processed_pages = [] | |
| processing_results = {} | |
| try: | |
| # Create output directory if it doesn't exist | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Clean up any existing files for this document | |
| base_name = Path(pdf_path).stem | |
| for file in os.listdir(output_dir): | |
| if file.startswith(base_name) and file != os.path.basename(pdf_path): | |
| file_path = os.path.join(output_dir, file) | |
| try: | |
| if os.path.isfile(file_path): | |
| os.unlink(file_path) | |
| except Exception as e: | |
| self.logger.error(f"Error deleting file {file_path}: {e}") | |
| # Read PDF file directly since it's already in the results directory | |
| with open(pdf_path, 'rb') as f: | |
| pdf_data = f.read() | |
| doc = fitz.open(stream=pdf_data, filetype="pdf") | |
| for page_num in range(len(doc)): | |
| page = doc[page_num] | |
| # Calculate zoom factor for 600 DPI | |
| zoom = self.target_dpi / 72 | |
| matrix = fitz.Matrix(zoom, zoom) | |
| # Get high-resolution image | |
| pix = page.get_pixmap(matrix=matrix) | |
| img_data = pix.tobytes() | |
| # Convert to numpy array | |
| nparr = np.frombuffer(img_data, np.uint8) | |
| img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| # Create base filename | |
| base_filename = f"{Path(pdf_path).stem}_page_{page_num + 1}" | |
| # Process and save different versions | |
| optimized_versions = { | |
| 'text': self._optimize_for_text(img.copy()), | |
| 'symbol': self._optimize_for_symbols(img.copy()), | |
| 'line': self._optimize_for_lines(img.copy()) | |
| } | |
| paths = { | |
| 'text': os.path.join(output_dir, f"{base_filename}_text.png"), | |
| 'symbol': os.path.join(output_dir, f"{base_filename}_symbol.png"), | |
| 'line': os.path.join(output_dir, f"{base_filename}_line.png") | |
| } | |
| # Save each version | |
| for version_type, optimized_img in optimized_versions.items(): | |
| self._save_image(optimized_img, paths[version_type]) | |
| processed_pages.append(paths[version_type]) | |
| # Store processing results | |
| processing_results[str(page_num + 1)] = { | |
| "page_number": page_num + 1, | |
| "dimensions": { | |
| "width": img.shape[1], | |
| "height": img.shape[0] | |
| }, | |
| "paths": paths, | |
| "dpi": self.target_dpi, | |
| "zoom_factor": zoom | |
| } | |
| # Save processing results JSON | |
| results_json_path = os.path.join( | |
| output_dir, | |
| f"{Path(pdf_path).stem}_processing_results.json" | |
| ) | |
| with open(results_json_path, 'w') as f: | |
| json.dump(processing_results, f, indent=4) | |
| return processed_pages | |
| except Exception as e: | |
| self.logger.error(f"Error processing PDF: {str(e)}") | |
| raise | |
| def _process_image(self, image_path: str, output_dir: str) -> list: | |
| """Process single image file""" | |
| try: | |
| # Load image | |
| image_data = self.storage.load_file(image_path) | |
| nparr = np.frombuffer(image_data, np.uint8) | |
| img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| # Process the image | |
| processed_img = self._optimize_image(img) | |
| # Save processed image | |
| output_path = os.path.join( | |
| output_dir, | |
| f"{Path(image_path).stem}_text.png" | |
| ) | |
| self._save_image(processed_img, output_path) | |
| return [output_path] | |
| except Exception as e: | |
| self.logger.error(f"Error processing image: {str(e)}") | |
| raise | |
| def _optimize_image(self, img: np.ndarray) -> np.ndarray: | |
| """Optimize image for best detection results""" | |
| # Convert to grayscale for processing | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| # Enhance contrast | |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) | |
| enhanced = clahe.apply(gray) | |
| # Denoise | |
| denoised = cv2.fastNlMeansDenoising(enhanced) | |
| # Binarize | |
| _, binary = cv2.threshold(denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) | |
| # Resize while maintaining aspect ratio | |
| height, width = binary.shape | |
| scale = min(self.max_dimension / max(width, height), | |
| max(self.min_dimension / min(width, height), 1.0)) | |
| if scale != 1.0: | |
| new_width = int(width * scale) | |
| new_height = int(height * scale) | |
| resized = cv2.resize(binary, (new_width, new_height), | |
| interpolation=cv2.INTER_LANCZOS4) | |
| else: | |
| resized = binary | |
| # Convert back to BGR for compatibility | |
| return cv2.cvtColor(resized, cv2.COLOR_GRAY2BGR) | |
| def _optimize_for_text(self, img: np.ndarray) -> np.ndarray: | |
| """Optimize image for text detection""" | |
| # Convert to grayscale | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| # Enhance contrast using CLAHE | |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8)) | |
| enhanced = clahe.apply(gray) | |
| # Denoise | |
| denoised = cv2.fastNlMeansDenoising(enhanced) | |
| # Adaptive thresholding for better text separation | |
| binary = cv2.adaptiveThreshold(denoised, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
| cv2.THRESH_BINARY, 11, 2) | |
| # Convert back to BGR | |
| return cv2.cvtColor(binary, cv2.COLOR_GRAY2BGR) | |
| def _optimize_for_symbols(self, img: np.ndarray) -> np.ndarray: | |
| """Optimize image for symbol detection""" | |
| # Convert to grayscale | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| # Bilateral filter to preserve edges while reducing noise | |
| bilateral = cv2.bilateralFilter(gray, 9, 75, 75) | |
| # Enhance contrast | |
| clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8)) | |
| enhanced = clahe.apply(bilateral) | |
| # Sharpen image | |
| kernel = np.array([[-1,-1,-1], | |
| [-1, 9,-1], | |
| [-1,-1,-1]]) | |
| sharpened = cv2.filter2D(enhanced, -1, kernel) | |
| # Convert back to BGR | |
| return cv2.cvtColor(sharpened, cv2.COLOR_GRAY2BGR) | |
| def _optimize_for_lines(self, img: np.ndarray) -> np.ndarray: | |
| """Optimize image for line detection""" | |
| # Convert to grayscale | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| # Reduce noise while preserving edges | |
| denoised = cv2.GaussianBlur(gray, (3,3), 0) | |
| # Edge enhancement | |
| edges = cv2.Canny(denoised, 50, 150) | |
| # Dilate edges to connect broken lines | |
| kernel = np.ones((2,2), np.uint8) | |
| dilated = cv2.dilate(edges, kernel, iterations=1) | |
| # Convert back to BGR | |
| return cv2.cvtColor(dilated, cv2.COLOR_GRAY2BGR) | |
| def _save_image(self, img: np.ndarray, output_path: str): | |
| """Save processed image with optimal quality""" | |
| # Encode image with high quality | |
| _, buffer = cv2.imencode('.png', img, [ | |
| cv2.IMWRITE_PNG_COMPRESSION, 0 | |
| ]) | |
| # Save to storage | |
| self.storage.save_file(output_path, buffer.tobytes()) | |
| if __name__ == "__main__": | |
| from storage import StorageFactory | |
| import shutil | |
| # Initialize storage and processor | |
| storage = StorageFactory.get_storage() | |
| processor = DocumentProcessor(storage) | |
| # Process PDF | |
| pdf_path = "samples/001.pdf" | |
| output_dir = "results" # Changed from "processed_pages" to "results" | |
| try: | |
| # Ensure output directory exists | |
| os.makedirs(output_dir, exist_ok=True) | |
| results = processor.process_document( | |
| file_path=pdf_path, | |
| output_dir=output_dir | |
| ) | |
| # Print detailed results | |
| print("\nProcessing Results:") | |
| print(f"Output Directory: {os.path.abspath(output_dir)}") | |
| for page_path in results: | |
| abs_path = os.path.abspath(page_path) | |
| file_size = os.path.getsize(page_path) / (1024 * 1024) # Convert to MB | |
| print(f"- {os.path.basename(page_path)} ({file_size:.2f} MB)") | |
| # Calculate total size of output | |
| total_size = sum(os.path.getsize(os.path.join(output_dir, f)) | |
| for f in os.listdir(output_dir)) / (1024 * 1024) | |
| print(f"\nTotal output size: {total_size:.2f} MB") | |
| except Exception as e: | |
| logger.error(f"Error processing PDF: {str(e)}") | |
| raise |