Spaces:
Build error
Build error
File size: 10,944 Bytes
9847531 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 | import fitz # PyMuPDF
import os
import logging
from pathlib import Path
import numpy as np
from PIL import Image
import io
import cv2 # Add this import
from storage import StorageInterface
from typing import List, Dict, Tuple, Any
import json
from text_detection_combined import process_drawing
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DocumentProcessor:
def __init__(self, storage: StorageInterface):
self.storage = storage
self.logger = logging.getLogger(__name__)
# Configure optimal processing parameters
self.target_dpi = 600 # Increased from 300 to 600 DPI
self.min_dimension = 2000 # Minimum width/height
self.max_dimension = 8000 # Increased max dimension for higher DPI
self.quality = 95 # JPEG quality for saving
def process_document(self, file_path: str, output_dir: str) -> list:
"""Process document (PDF/PNG/JPG) and return paths to processed pages"""
file_ext = Path(file_path).suffix.lower()
if file_ext == '.pdf':
return self._process_pdf(file_path, output_dir)
elif file_ext in ['.png', '.jpg', '.jpeg']:
return self._process_image(file_path, output_dir)
else:
raise ValueError(f"Unsupported file format: {file_ext}")
def _process_pdf(self, pdf_path: str, output_dir: str) -> list:
"""Process PDF document"""
processed_pages = []
processing_results = {}
try:
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Clean up any existing files for this document
base_name = Path(pdf_path).stem
for file in os.listdir(output_dir):
if file.startswith(base_name) and file != os.path.basename(pdf_path):
file_path = os.path.join(output_dir, file)
try:
if os.path.isfile(file_path):
os.unlink(file_path)
except Exception as e:
self.logger.error(f"Error deleting file {file_path}: {e}")
# Read PDF file directly since it's already in the results directory
with open(pdf_path, 'rb') as f:
pdf_data = f.read()
doc = fitz.open(stream=pdf_data, filetype="pdf")
for page_num in range(len(doc)):
page = doc[page_num]
# Calculate zoom factor for 600 DPI
zoom = self.target_dpi / 72
matrix = fitz.Matrix(zoom, zoom)
# Get high-resolution image
pix = page.get_pixmap(matrix=matrix)
img_data = pix.tobytes()
# Convert to numpy array
nparr = np.frombuffer(img_data, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
# Create base filename
base_filename = f"{Path(pdf_path).stem}_page_{page_num + 1}"
# Process and save different versions
optimized_versions = {
'text': self._optimize_for_text(img.copy()),
'symbol': self._optimize_for_symbols(img.copy()),
'line': self._optimize_for_lines(img.copy())
}
paths = {
'text': os.path.join(output_dir, f"{base_filename}_text.png"),
'symbol': os.path.join(output_dir, f"{base_filename}_symbol.png"),
'line': os.path.join(output_dir, f"{base_filename}_line.png")
}
# Save each version
for version_type, optimized_img in optimized_versions.items():
self._save_image(optimized_img, paths[version_type])
processed_pages.append(paths[version_type])
# Store processing results
processing_results[str(page_num + 1)] = {
"page_number": page_num + 1,
"dimensions": {
"width": img.shape[1],
"height": img.shape[0]
},
"paths": paths,
"dpi": self.target_dpi,
"zoom_factor": zoom
}
# Save processing results JSON
results_json_path = os.path.join(
output_dir,
f"{Path(pdf_path).stem}_processing_results.json"
)
with open(results_json_path, 'w') as f:
json.dump(processing_results, f, indent=4)
return processed_pages
except Exception as e:
self.logger.error(f"Error processing PDF: {str(e)}")
raise
def _process_image(self, image_path: str, output_dir: str) -> list:
"""Process single image file"""
try:
# Load image
image_data = self.storage.load_file(image_path)
nparr = np.frombuffer(image_data, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
# Process the image
processed_img = self._optimize_image(img)
# Save processed image
output_path = os.path.join(
output_dir,
f"{Path(image_path).stem}_text.png"
)
self._save_image(processed_img, output_path)
return [output_path]
except Exception as e:
self.logger.error(f"Error processing image: {str(e)}")
raise
def _optimize_image(self, img: np.ndarray) -> np.ndarray:
"""Optimize image for best detection results"""
# Convert to grayscale for processing
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Enhance contrast
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
enhanced = clahe.apply(gray)
# Denoise
denoised = cv2.fastNlMeansDenoising(enhanced)
# Binarize
_, binary = cv2.threshold(denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
# Resize while maintaining aspect ratio
height, width = binary.shape
scale = min(self.max_dimension / max(width, height),
max(self.min_dimension / min(width, height), 1.0))
if scale != 1.0:
new_width = int(width * scale)
new_height = int(height * scale)
resized = cv2.resize(binary, (new_width, new_height),
interpolation=cv2.INTER_LANCZOS4)
else:
resized = binary
# Convert back to BGR for compatibility
return cv2.cvtColor(resized, cv2.COLOR_GRAY2BGR)
def _optimize_for_text(self, img: np.ndarray) -> np.ndarray:
"""Optimize image for text detection"""
# Convert to grayscale
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Enhance contrast using CLAHE
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
enhanced = clahe.apply(gray)
# Denoise
denoised = cv2.fastNlMeansDenoising(enhanced)
# Adaptive thresholding for better text separation
binary = cv2.adaptiveThreshold(denoised, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2)
# Convert back to BGR
return cv2.cvtColor(binary, cv2.COLOR_GRAY2BGR)
def _optimize_for_symbols(self, img: np.ndarray) -> np.ndarray:
"""Optimize image for symbol detection"""
# Convert to grayscale
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Bilateral filter to preserve edges while reducing noise
bilateral = cv2.bilateralFilter(gray, 9, 75, 75)
# Enhance contrast
clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
enhanced = clahe.apply(bilateral)
# Sharpen image
kernel = np.array([[-1,-1,-1],
[-1, 9,-1],
[-1,-1,-1]])
sharpened = cv2.filter2D(enhanced, -1, kernel)
# Convert back to BGR
return cv2.cvtColor(sharpened, cv2.COLOR_GRAY2BGR)
def _optimize_for_lines(self, img: np.ndarray) -> np.ndarray:
"""Optimize image for line detection"""
# Convert to grayscale
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Reduce noise while preserving edges
denoised = cv2.GaussianBlur(gray, (3,3), 0)
# Edge enhancement
edges = cv2.Canny(denoised, 50, 150)
# Dilate edges to connect broken lines
kernel = np.ones((2,2), np.uint8)
dilated = cv2.dilate(edges, kernel, iterations=1)
# Convert back to BGR
return cv2.cvtColor(dilated, cv2.COLOR_GRAY2BGR)
def _save_image(self, img: np.ndarray, output_path: str):
"""Save processed image with optimal quality"""
# Encode image with high quality
_, buffer = cv2.imencode('.png', img, [
cv2.IMWRITE_PNG_COMPRESSION, 0
])
# Save to storage
self.storage.save_file(output_path, buffer.tobytes())
if __name__ == "__main__":
from storage import StorageFactory
import shutil
# Initialize storage and processor
storage = StorageFactory.get_storage()
processor = DocumentProcessor(storage)
# Process PDF
pdf_path = "samples/001.pdf"
output_dir = "results" # Changed from "processed_pages" to "results"
try:
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
results = processor.process_document(
file_path=pdf_path,
output_dir=output_dir
)
# Print detailed results
print("\nProcessing Results:")
print(f"Output Directory: {os.path.abspath(output_dir)}")
for page_path in results:
abs_path = os.path.abspath(page_path)
file_size = os.path.getsize(page_path) / (1024 * 1024) # Convert to MB
print(f"- {os.path.basename(page_path)} ({file_size:.2f} MB)")
# Calculate total size of output
total_size = sum(os.path.getsize(os.path.join(output_dir, f))
for f in os.listdir(output_dir)) / (1024 * 1024)
print(f"\nTotal output size: {total_size:.2f} MB")
except Exception as e:
logger.error(f"Error processing PDF: {str(e)}")
raise |