Spaces:
Runtime error
Runtime error
| import os | |
| import fitz # PyMuPDF | |
| import cv2 | |
| import numpy as np | |
| from pathlib import Path | |
| import logging | |
| from storage import StorageInterface | |
| import shutil | |
| logger = logging.getLogger(__name__) | |
| class DocumentProcessor: | |
| def __init__(self, storage: StorageInterface): | |
| self.storage = storage | |
| self.target_dpi = 600 # Fixed at 600 DPI | |
| def clean_results_folder(self, output_dir: str): | |
| """Clean the results directory before processing new files""" | |
| if os.path.exists(output_dir): | |
| try: | |
| shutil.rmtree(output_dir) | |
| logger.info(f"Cleaned results directory: {output_dir}") | |
| except Exception as e: | |
| logger.error(f"Error cleaning results directory: {str(e)}") | |
| raise | |
| os.makedirs(output_dir, exist_ok=True) | |
| def process_document(self, file_path: str, output_dir: str) -> list: | |
| """Process document (PDF/PNG/JPG) and return paths to processed pages""" | |
| # Clean results folder first | |
| self.clean_results_folder(output_dir) | |
| file_ext = Path(file_path).suffix.lower() | |
| if file_ext == '.pdf': | |
| return self._process_pdf(file_path, output_dir) | |
| elif file_ext in ['.png', '.jpg', '.jpeg']: | |
| return self._process_image(file_path, output_dir) | |
| else: | |
| raise ValueError(f"Unsupported file format: {file_ext}") | |
| def _process_pdf(self, pdf_path: str, output_dir: str) -> list: | |
| """Process PDF document""" | |
| processed_pages = [] | |
| base_name = Path(pdf_path).stem | |
| try: | |
| # Open PDF | |
| doc = fitz.open(pdf_path) | |
| for page_num in range(len(doc)): | |
| page = doc[page_num] | |
| # Get high-res image | |
| pix = page.get_pixmap(matrix=fitz.Matrix(self.target_dpi/72, self.target_dpi/72)) | |
| # Convert to numpy array | |
| img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, pix.n) | |
| if pix.n == 4: # RGBA | |
| img = cv2.cvtColor(img, cv2.COLOR_RGBA2RGB) | |
| # Save image | |
| output_path = os.path.join(output_dir, f"{base_name}_page_{page_num + 1}.png") | |
| self._save_image(img, output_path) | |
| processed_pages.append(output_path) | |
| return processed_pages | |
| except Exception as e: | |
| logger.error(f"Error processing PDF: {str(e)}") | |
| raise | |
| def _process_image(self, image_path: str, output_dir: str) -> list: | |
| """Process single image""" | |
| try: | |
| # Read image | |
| img = cv2.imread(image_path) | |
| if img is None: | |
| raise ValueError(f"Could not read image: {image_path}") | |
| # Convert BGR to RGB | |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| # Calculate scaling factor for 600 DPI | |
| current_dpi = 72 # Assume standard screen resolution | |
| scale = self.target_dpi / current_dpi | |
| # Resize image | |
| img = cv2.resize(img, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC) | |
| # Save image | |
| base_name = Path(image_path).stem | |
| output_path = os.path.join(output_dir, f"{base_name}_page_1.png") | |
| self._save_image(img, output_path) | |
| return [output_path] | |
| except Exception as e: | |
| logger.error(f"Error processing image: {str(e)}") | |
| raise | |
| def _save_image(self, img: np.ndarray, output_path: str): | |
| """Save processed image""" | |
| # Encode image with high quality PNG | |
| _, buffer = cv2.imencode('.png', cv2.cvtColor(img, cv2.COLOR_RGB2BGR)) | |
| self.storage.save_file(output_path, buffer.tobytes()) | |
| if __name__ == "__main__": | |
| from storage import StorageFactory | |
| # Initialize storage and processor | |
| storage = StorageFactory.get_storage() | |
| processor = DocumentProcessor(storage) | |
| # Process PDF | |
| pdf_path = "samples/001.pdf" | |
| output_dir = "results" # Changed from "processed_pages" to "results" | |
| try: | |
| # Ensure output directory exists | |
| os.makedirs(output_dir, exist_ok=True) | |
| results = processor.process_document( | |
| file_path=pdf_path, | |
| output_dir=output_dir | |
| ) | |
| # Print detailed results | |
| print("\nProcessing Results:") | |
| print(f"Output Directory: {os.path.abspath(output_dir)}") | |
| for page_path in results: | |
| abs_path = os.path.abspath(page_path) | |
| file_size = os.path.getsize(page_path) / (1024 * 1024) # Convert to MB | |
| print(f"- {os.path.basename(page_path)} ({file_size:.2f} MB)") | |
| # Calculate total size of output | |
| total_size = sum(os.path.getsize(os.path.join(output_dir, f)) | |
| for f in os.listdir(output_dir)) / (1024 * 1024) | |
| print(f"\nTotal output size: {total_size:.2f} MB") | |
| except Exception as e: | |
| logger.error(f"Error processing PDF: {str(e)}") | |
| raise |