Spaces:
Running
Running
File size: 11,314 Bytes
900df0b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 | """
HandwrittenOCR - معالجة الصور المسبقة v5.1
==============================================
تحسين الصور قبل التعرف مع تسجيل مفصّل لكل خطوة:
- تسوية الميل (Deskewing) مع borderMode=BORDER_REPLICATE
- CLAHE + Denoising + Thresholding (OTSU + Adaptive)
- تجزئة ذكية: EasyOCR أولاً + IoU matching + الكنتورات كبديل
- مُحصّن: crops من img_bgr الأصلية (ليس من الصورة الثنائية)
- detect_columns(): كشف الأعمدة في الصفحة
- column_aware_sort(): ترتيب الصناديق عمودياً داخل كل عمود
- smart_segmentation() محسّن لدعم التقسيم المتقدم
"""
import cv2
import numpy as np
import logging
import time
from typing import Tuple, Optional, List
from config import Config
logger = logging.getLogger("HandwrittenOCR")
# استيراد أدوات اللوق المفصّل
try:
from src.logger import log_step, log_error_full, log_result
except ImportError:
def log_step(lg, name, data=None):
lg.info(f"STEP [{name}]")
if data:
for k, v in data.items():
lg.info(f" {k}: {v}")
def log_error_full(lg, ctx, err, extra=None):
lg.error(f"ERROR [{ctx}] {type(err).__name__}: {err}", exc_info=True)
def log_result(lg, name, result):
lg.info(f"RESULT [{name}] {result}")
def deskew(gray: np.ndarray, config: Config = None) -> np.ndarray:
"""تسوية ميل الصورة مع تسجيل مفصّل للزاوية والنتيجة."""
coords = np.column_stack(np.where(gray < 250))
if len(coords) < 50:
logger.debug("deskew: نقاط قليلة جداً (<50) — يتجاوز التسوية")
return gray
angle = cv2.minAreaRect(coords)[-1]
angle = -(90 + angle) if angle < -45 else -angle
if abs(angle) < 0.3:
logger.debug(f"deskew: زاوية صغيرة جداً ({angle:.3f}°) — لا حاجة للتسوية")
return gray
h, w = gray.shape[:2]
M = cv2.getRotationMatrix2D((w // 2, h // 2), angle, 1.0)
result = cv2.warpAffine(gray, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)
logger.info(f"deskew: زاوية={angle:.2f}°, حجم الصورة={w}x{h}")
return result
def preprocess_image(img_bgr: np.ndarray, config: Config = None, adaptive: bool = False) -> Tuple[np.ndarray, np.ndarray]:
"""
معالجة الصورة المسبقة مع تسجيل مفصّل لكل خطوة.
الخطوات:
1. تحويل لرمادي
2. تسوية الميل (Deskewing)
3. CLAHE (تحسين التباين)
4. Denoising (إزالة الضوضاء)
5. Thresholding (ثنائية)
Returns:
tuple: (binary, gray)
"""
if config is None:
config = Config()
logger.debug(f"preprocess_image: أبعاد={img_bgr.shape[:2]}, dtype={img_bgr.dtype}, adaptive={adaptive}")
start = time.time()
# 1. تحويل لرمادي
if img_bgr.ndim == 3:
gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
logger.debug(f" تحويل لرمادي: {img_bgr.shape} => {gray.shape}")
else:
gray = img_bgr.copy()
# 2. تسوية الميل
if config.enable_deskew:
gray = deskew(gray, config)
logger.debug(" تم تسوية الميل")
else:
logger.debug(" تخطي تسوية الميل (enable_deskew=False)")
# 3. CLAHE
clahe = cv2.createCLAHE(clipLimit=config.clahe_clip, tileGridSize=config.clahe_tile)
gray = clahe.apply(gray)
logger.debug(f" CLAHE: clipLimit={config.clahe_clip}, tile={config.clahe_tile}")
# 4. Denoising
gray = cv2.fastNlMeansDenoising(gray, h=config.denoise_h)
logger.debug(f" Denoising: h={config.denoise_h}")
# 5. Thresholding
if adaptive:
binary = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 31, 11)
logger.debug(" Thresholding: Adaptive (GAUSSIAN_C, block=31, C=11)")
else:
_, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
logger.debug(" Thresholding: OTSU (تلقائي)")
elapsed = time.time() - start
logger.info(f"preprocess_image اكتمل في {elapsed:.3f}s | أبعاد الصورة={gray.shape}")
return binary, gray
def compute_iou(b1, b2) -> float:
"""حساب IoU بين صندوقين."""
x1, y1, w1, h1 = b1
x2, y2, w2, h2 = b2
xi1, yi1 = max(x1, x2), max(y1, y2)
xi2, yi2 = min(x1 + w1, x2 + w2), min(y1 + h1, y2 + h2)
inter = max(0, xi2 - xi1) * max(0, yi2 - yi1)
union = w1 * h1 + w2 * h2 - inter
return inter / union if union > 0 else 0
def detect_columns(boxes, img_width, gap_threshold=0.15, min_column_words=3):
"""كشف الأعمدة في الصفحة مع تسجيل مفصّل."""
if not boxes or len(boxes) < min_column_words * 2:
logger.debug(f"detect_columns: صناديق قليلة ({len(boxes) if boxes else 0}) — عمود واحد")
return [sorted(boxes, key=lambda b: (b[1], b[0]))]
gap_px = img_width * gap_threshold
box_centers = [(x + w / 2, x, y, w, h) for x, y, w, h in boxes]
sorted_by_x = sorted(box_centers, key=lambda c: c[0])
column_breaks = []
for i in range(1, len(sorted_by_x)):
prev_cx = sorted_by_x[i - 1][0]
curr_cx = sorted_by_x[i][0]
gap = abs(curr_cx - prev_cx)
if gap > gap_px:
column_breaks.append(i)
logger.debug(f" فاصل عمود عند الفهرس {i}: gap={gap:.0f}px > threshold={gap_px:.0f}px")
if not column_breaks:
logger.debug("detect_columns: لا فواصل عمود — عمود واحد")
return [sorted(boxes, key=lambda b: (b[1], b[0]))]
columns = []
prev_break = 0
for brk in column_breaks + [len(sorted_by_x)]:
col_boxes = [(x, y, w, h) for _, x, y, w, h in sorted_by_x[prev_break:brk]]
if col_boxes:
col_boxes_sorted = sorted(col_boxes, key=lambda b: (b[1], b[0]))
if len(col_boxes_sorted) >= min_column_words:
columns.append(col_boxes_sorted)
else:
if columns:
columns[-1].extend(col_boxes_sorted)
columns[-1].sort(key=lambda b: (b[1], b[0]))
else:
columns.append(col_boxes_sorted)
prev_break = brk
logger.info(f"detect_columns: {len(columns)} عمود (من {len(boxes)} صندوق)")
for i, col in enumerate(columns):
logger.debug(f" عمود {i}: {len(col)} كلمة, x_range=[{col[0][0]}, {col[-1][0] + col[-1][2]}]")
return columns if columns else [sorted(boxes, key=lambda b: (b[1], b[0]))]
def column_aware_sort(boxes, img_width, lang="en"):
"""ترتيب الصناديق مع مراعاة الأعمدة والاتجاه."""
if not boxes:
return []
columns = detect_columns(boxes, img_width)
if len(columns) == 1:
return columns[0]
# ترتيب الأعمدة حسب اللغة
if lang == "ar":
columns.sort(key=lambda col: -col[0][0]) # RTL
logger.debug("column_aware_sort: ترتيب RTL (عربي)")
else:
columns.sort(key=lambda col: col[0][0]) # LTR
logger.debug("column_aware_sort: ترتيب LTR")
result = []
for col in columns:
result.extend(col)
return result
def smart_segmentation(img_bgr, binary, easyocr_detections=None, config=None):
"""
تجزئة ذكية للنص مع تسجيل مفصّل.
يفضل EasyOCR detections أولاً، ثم يلجأ للكنتورات.
"""
if config is None:
config = Config()
log_step(logger, "smart_segmentation", {
"img_size": f"{img_bgr.shape[:2]}",
"detections_provided": str(easyocr_detections is not None),
"num_detections": str(len(easyocr_detections) if easyocr_detections else 0),
"min_word_w": config.min_word_w,
"min_word_h": config.min_word_h,
})
# === الطريقة 1: EasyOCR detections ===
if easyocr_detections:
boxes = []
skipped = 0
for det in easyocr_detections:
pts = np.array(det[0], dtype=np.int32)
x, y, w, h = cv2.boundingRect(pts)
if w > config.min_word_w and h > config.min_word_h:
boxes.append((x, y, w, h))
else:
skipped += 1
logger.info(f" EasyOCR boxes: {len(boxes)} صالح, {skipped} متجاوز (صغير)")
if boxes:
sorted_boxes = sorted(boxes, key=lambda b: (b[1], b[0]))
logger.debug(f" smart_segmentation => {len(sorted_boxes)} صندوق من EasyOCR")
return sorted_boxes
logger.info(" لم ينتج عن EasyOCR صناديق صالحة — الانتقال للكنتورات")
# === الطريقة 2: الكنتورات ===
logger.debug(" استخدام طريقة الكنتورات")
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, config.dilation_kernel)
dilated = cv2.dilate(binary, kernel, iterations=1)
contours, _ = cv2.findContours(dilated, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
boxes = [
(x, y, w, h) for c in contours
for x, y, w, h in [cv2.boundingRect(c)]
if w > config.min_word_w and h > config.min_word_h
]
sorted_boxes = sorted(boxes, key=lambda b: (b[1], b[0]))
logger.info(f" الكنتورات: {len(contours)} كانتور => {len(boxes)} صندوق صالح")
return sorted_boxes
def match_boxes_with_detections(boxes, detections, iou_threshold=0.3):
"""مطابقة الصناديق مع detections EasyOCR باستخدام IoU مع تسجيل مفصّل."""
if not detections:
logger.debug(f"match_boxes: لا detections — {len(boxes)} صندوق بدون مطابقة")
return [(b, None) for b in boxes]
det_boxes = []
for det in detections:
pts = np.array(det[0], dtype=np.int32)
det_boxes.append((cv2.boundingRect(pts), det))
result, used = [], set()
matched_count = 0
unmatched = 0
for i, box in enumerate(boxes):
best_det, best_iou = None, 0
for j, (db, det) in enumerate(det_boxes):
if j in used:
continue
iou = compute_iou(box, db)
if iou > best_iou and iou > iou_threshold:
best_iou, best_det = iou, det
used.add(j)
if best_det:
matched_count += 1
else:
unmatched += 1
result.append((box, best_det))
logger.info(f"match_boxes: {matched_count} مطابق, {unmatched} غير مطابق (IoU>{iou_threshold})")
return result
def crop_safe(img, x, y, w, h):
"""قص آمن مع تسجيل الحالات الشاذة."""
H, W = img.shape[:2]
crop = img[max(0, y): min(H, y + h), max(0, x): min(W, x + w)]
if crop.size == 0:
logger.warning(f"crop_safe: صورة فارغة! img={img.shape[:2]}, box=({x},{y},{w},{h})")
elif crop.shape[0] < 3 or crop.shape[1] < 3:
logger.debug(f"crop_safe: صندوق صغير جداً: {crop.shape[:2]}")
return crop
|