Spaces:
Runtime error
Runtime error
File size: 5,295 Bytes
910e0d4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 | import os
import fitz # PyMuPDF
import cv2
import numpy as np
from pathlib import Path
import logging
from storage import StorageInterface
import shutil
logger = logging.getLogger(__name__)
class DocumentProcessor:
def __init__(self, storage: StorageInterface):
self.storage = storage
self.target_dpi = 600 # Fixed at 600 DPI
def clean_results_folder(self, output_dir: str):
"""Clean the results directory before processing new files"""
if os.path.exists(output_dir):
try:
shutil.rmtree(output_dir)
logger.info(f"Cleaned results directory: {output_dir}")
except Exception as e:
logger.error(f"Error cleaning results directory: {str(e)}")
raise
os.makedirs(output_dir, exist_ok=True)
def process_document(self, file_path: str, output_dir: str) -> list:
"""Process document (PDF/PNG/JPG) and return paths to processed pages"""
# Clean results folder first
self.clean_results_folder(output_dir)
file_ext = Path(file_path).suffix.lower()
if file_ext == '.pdf':
return self._process_pdf(file_path, output_dir)
elif file_ext in ['.png', '.jpg', '.jpeg']:
return self._process_image(file_path, output_dir)
else:
raise ValueError(f"Unsupported file format: {file_ext}")
def _process_pdf(self, pdf_path: str, output_dir: str) -> list:
"""Process PDF document"""
processed_pages = []
base_name = Path(pdf_path).stem
try:
# Open PDF
doc = fitz.open(pdf_path)
for page_num in range(len(doc)):
page = doc[page_num]
# Get high-res image
pix = page.get_pixmap(matrix=fitz.Matrix(self.target_dpi/72, self.target_dpi/72))
# Convert to numpy array
img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, pix.n)
if pix.n == 4: # RGBA
img = cv2.cvtColor(img, cv2.COLOR_RGBA2RGB)
# Save image
output_path = os.path.join(output_dir, f"{base_name}_page_{page_num + 1}.png")
self._save_image(img, output_path)
processed_pages.append(output_path)
return processed_pages
except Exception as e:
logger.error(f"Error processing PDF: {str(e)}")
raise
def _process_image(self, image_path: str, output_dir: str) -> list:
"""Process single image"""
try:
# Read image
img = cv2.imread(image_path)
if img is None:
raise ValueError(f"Could not read image: {image_path}")
# Convert BGR to RGB
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
# Calculate scaling factor for 600 DPI
current_dpi = 72 # Assume standard screen resolution
scale = self.target_dpi / current_dpi
# Resize image
img = cv2.resize(img, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)
# Save image
base_name = Path(image_path).stem
output_path = os.path.join(output_dir, f"{base_name}_page_1.png")
self._save_image(img, output_path)
return [output_path]
except Exception as e:
logger.error(f"Error processing image: {str(e)}")
raise
def _save_image(self, img: np.ndarray, output_path: str):
"""Save processed image"""
# Encode image with high quality PNG
_, buffer = cv2.imencode('.png', cv2.cvtColor(img, cv2.COLOR_RGB2BGR))
self.storage.save_file(output_path, buffer.tobytes())
if __name__ == "__main__":
from storage import StorageFactory
# Initialize storage and processor
storage = StorageFactory.get_storage()
processor = DocumentProcessor(storage)
# Process PDF
pdf_path = "samples/001.pdf"
output_dir = "results" # Changed from "processed_pages" to "results"
try:
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
results = processor.process_document(
file_path=pdf_path,
output_dir=output_dir
)
# Print detailed results
print("\nProcessing Results:")
print(f"Output Directory: {os.path.abspath(output_dir)}")
for page_path in results:
abs_path = os.path.abspath(page_path)
file_size = os.path.getsize(page_path) / (1024 * 1024) # Convert to MB
print(f"- {os.path.basename(page_path)} ({file_size:.2f} MB)")
# Calculate total size of output
total_size = sum(os.path.getsize(os.path.join(output_dir, f))
for f in os.listdir(output_dir)) / (1024 * 1024)
print(f"\nTotal output size: {total_size:.2f} MB")
except Exception as e:
logger.error(f"Error processing PDF: {str(e)}")
raise |