Spaces:
Running
Running
| """ | |
| HandwrittenOCR - محرك التعرف على النصوص v4.1 | |
| ================================================= | |
| المحسنات الرئيسية: | |
| - Batch TrOCR inference (3-6x تسريع) | |
| - Beam search (num_beams) لدقة أعلى | |
| - Smart Ensemble: تخطي TrOCR إذا ثقة EasyOCR > easy_conf_threshold | |
| - LoRA auto-loading | |
| - دعم cache_dir + HF_TOKEN | |
| - تسجيل مفصّل لكل عملية تعرف مع الوقت والنتائج | |
| """ | |
| import os | |
| import cv2 | |
| import io | |
| import time | |
| import traceback | |
| import numpy as np | |
| import torch | |
| import logging | |
| from typing import Tuple, Optional, List | |
| from PIL import Image | |
| from transformers import TrOCRProcessor, VisionEncoderDecoderModel | |
| import easyocr | |
| logger = logging.getLogger("HandwrittenOCR") | |
| # استيراد أدوات اللوق المفصّل | |
| try: | |
| from src.logger import log_step, log_error_full, log_result | |
| except ImportError: | |
| def log_step(lg, name, data=None): | |
| lg.info(f"STEP [{name}]") | |
| if data: | |
| for k, v in data.items(): | |
| lg.info(f" {k}: {v}") | |
| def log_error_full(lg, ctx, err, extra=None): | |
| lg.error(f"ERROR [{ctx}] {type(err).__name__}: {err}", exc_info=True) | |
| def log_result(lg, name, result): | |
| lg.info(f"RESULT [{name}] {result}") | |
| def normalize_text(x) -> str: | |
| """تنظيف النص من NaN وNone مع تسجيل القيم المشبوهة.""" | |
| import pandas as pd | |
| if x is None or (isinstance(x, float) and pd.isna(x)): | |
| return "" | |
| result = str(x).strip() | |
| return result | |
| def detect_lang(text: str) -> str: | |
| """كشف اللغة مع تسجيل النتائج.""" | |
| try: | |
| from langdetect import detect | |
| lang = detect(str(text)) if text and text.strip() else "unknown" | |
| logger.debug(f"كشف اللغة: '{text[:50]}...' => '{lang}'") | |
| return lang | |
| except Exception as e: | |
| logger.debug(f"فشل كشف اللغة: {e}") | |
| return "unknown" | |
| class OCREngine: | |
| """ | |
| محرك التعرف يدعم: | |
| - recognize_word_ensemble: TrOCR + EasyOCR مع اختيار الأفضل | |
| - batch_predict: استدلال دفعي لـ TrOCR | |
| - auto-load LoRA adapter | |
| """ | |
| def __init__( | |
| self, | |
| trocr_model_name: str = "David-Magdy/TR_OCR_LARGE", | |
| ocr_languages: Optional[list] = None, | |
| max_text_length: int = 64, | |
| device: Optional[str] = None, | |
| cache_dir: str = "", | |
| hf_token: str = "", | |
| trocr_default_confidence: float = 0.70, | |
| easy_conf_threshold: float = 0.80, | |
| num_beams: int = 5, | |
| trocr_batch_size: int = 16, | |
| lora_save_path: str = "", | |
| skip_trocr: bool = False, | |
| ): | |
| logger.info("=" * 55) | |
| logger.info("تهيئة محرك التعرف OCREngine") | |
| logger.info("=" * 55) | |
| self.lora_loaded = False | |
| self.max_text_length = max_text_length | |
| self.trocr_default_confidence = trocr_default_confidence | |
| self.easy_conf_threshold = easy_conf_threshold | |
| self.num_beams = num_beams | |
| self.trocr_batch_size = trocr_batch_size | |
| if device is None: | |
| self.device = torch.device( | |
| "cuda" if torch.cuda.is_available() else "cpu" | |
| ) | |
| else: | |
| self.device = torch.device(device) | |
| self.skip_trocr = skip_trocr | |
| self.trocr_processor = None | |
| self.trocr_model = None | |
| log_step(logger, "إعدادات محرك التعرف", { | |
| "device": str(self.device), | |
| "CUDA_available": str(torch.cuda.is_available()), | |
| "trocr_model": trocr_model_name, | |
| "languages": str(ocr_languages or ["en", "ar", "de"]), | |
| "skip_trocr": str(skip_trocr), | |
| "num_beams": str(num_beams), | |
| "batch_size": str(trocr_batch_size), | |
| "max_text_length": str(max_text_length), | |
| "easy_conf_threshold": str(easy_conf_threshold), | |
| "trocr_default_confidence": str(trocr_default_confidence), | |
| "cache_dir": cache_dir or "(default)", | |
| "hf_token_set": str(bool(hf_token)), | |
| "lora_save_path": lora_save_path or "(none)", | |
| }) | |
| # تحميل TrOCR (اختياري — يمكن تخطيه لتوفير ~600 MB) | |
| if not skip_trocr: | |
| hf_kwargs = {} | |
| if cache_dir: | |
| hf_kwargs["cache_dir"] = cache_dir | |
| if hf_token: | |
| hf_kwargs["token"] = hf_token | |
| logger.info(f"جاري تحميل TrOCR: {trocr_model_name}") | |
| logger.debug(f" HF kwargs: {list(hf_kwargs.keys())}") | |
| start = time.time() | |
| try: | |
| self.trocr_processor = TrOCRProcessor.from_pretrained( | |
| trocr_model_name, **hf_kwargs | |
| ) | |
| self.trocr_model = VisionEncoderDecoderModel.from_pretrained( | |
| trocr_model_name, **hf_kwargs | |
| ).to(self.device) | |
| elapsed = time.time() - start | |
| logger.info(f"تم تحميل TrOCR بنجاح في {elapsed:.2f}s") | |
| log_result(logger, "تحميل TrOCR", { | |
| "model": trocr_model_name, | |
| "device": str(self.device), | |
| "time_sec": round(elapsed, 2), | |
| "params_M": f"{sum(p.numel() for p in self.trocr_model.parameters()) / 1e6:.1f}", | |
| }) | |
| except Exception as e: | |
| log_error_full(logger, "تحميل TrOCR", e, extra={ | |
| "model": trocr_model_name, | |
| "kwargs_keys": list(hf_kwargs.keys()), | |
| }) | |
| raise | |
| # تحميل LoRA المُحسَّن إذا كان موجوداً | |
| if lora_save_path and os.path.exists(lora_save_path): | |
| self._load_lora_model(lora_save_path) | |
| elif lora_save_path: | |
| logger.info(f"مسار LoRA غير موجود: {lora_save_path} — يتجاوز") | |
| else: | |
| logger.info("تخطي TrOCR — وضع EasyOCR فقط (توفير ~600 MB)") | |
| # تحميل EasyOCR | |
| if ocr_languages is None: | |
| ocr_languages = ["en", "ar", "de"] | |
| logger.info(f"جاري تحميل EasyOCR بلغات: {ocr_languages}") | |
| start = time.time() | |
| try: | |
| self.easy_reader = easyocr.Reader( | |
| ocr_languages, gpu=torch.cuda.is_available() | |
| ) | |
| elapsed = time.time() - start | |
| logger.info(f"تم تحميل EasyOCR بنجاح في {elapsed:.2f}s (GPU={torch.cuda.is_available()})") | |
| except Exception as e: | |
| logger.warning(f"فشل تحميل EasyOCR بـ GPU — الانتقال لوضع CPU: {e}") | |
| start = time.time() | |
| self.easy_reader = easyocr.Reader(ocr_languages, gpu=False) | |
| elapsed = time.time() - start | |
| logger.info(f"تم تحميل EasyOCR بـ CPU في {elapsed:.2f}s") | |
| def _load_lora_model(self, lora_save_path: str) -> None: | |
| """تحميل نموذج LoRA المُحسَّن مع تسجيل مفصّل.""" | |
| log_step(logger, "تحميل LoRA", {"path": lora_save_path}) | |
| try: | |
| from peft import PeftModel | |
| start = time.time() | |
| self.trocr_model = PeftModel.from_pretrained( | |
| self.trocr_model, lora_save_path | |
| ).to(self.device) | |
| self.lora_loaded = True | |
| elapsed = time.time() - start | |
| logger.info(f"تم تحميل LoRA weights من: {lora_save_path} في {elapsed:.2f}s") | |
| except ImportError as e: | |
| logger.warning(f"peft غير مثبت - لن يتم تحميل LoRA: {e}") | |
| logger.debug(traceback.format_exc()) | |
| except Exception as e: | |
| log_error_full(logger, "تحميل LoRA", e, extra={"path": lora_save_path}) | |
| def batch_predict(self, crops: List[np.ndarray]) -> List[str]: | |
| """ | |
| Batch inference لـ TrOCR — التحسين الأهم: 3-6x تسريع | |
| مع beam search لدقة أعلى. | |
| """ | |
| if self.skip_trocr or not crops: | |
| logger.debug(f"batch_predict: تخطي (skip_trocr={self.skip_trocr}, crops={len(crops) if crops else 0})") | |
| return [""] * len(crops) if crops else [] | |
| log_step(logger, "TrOCR batch_predict", { | |
| "num_crops": len(crops), | |
| "batch_size": self.trocr_batch_size, | |
| "num_beams": self.num_beams, | |
| "max_length": self.max_text_length, | |
| }) | |
| pil_imgs = [] | |
| for i, crop in enumerate(crops): | |
| rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB) | |
| pil_imgs.append(Image.fromarray(rgb)) | |
| logger.debug(f" crop[{i}]: أبعاد={crop.shape[:2]}, نوع={crop.dtype}") | |
| start = time.time() | |
| try: | |
| pv = self.trocr_processor( | |
| images=pil_imgs, return_tensors="pt", padding=True | |
| ).pixel_values.to(self.device) | |
| logger.debug(f" tensor shape: {pv.shape}, dtype: {pv.dtype}, device: {pv.device}") | |
| with torch.no_grad(): | |
| ids = self.trocr_model.generate( | |
| pv, | |
| max_length=self.max_text_length, | |
| num_beams=self.num_beams, | |
| early_stopping=True, | |
| length_penalty=1.0, | |
| ) | |
| results = self.trocr_processor.batch_decode(ids, skip_special_tokens=True) | |
| elapsed = time.time() - start | |
| logger.debug(f" batch_predict اكتمل في {elapsed:.2f}s") | |
| for i, txt in enumerate(results): | |
| logger.debug(f" crop[{i}] => '{txt}'") | |
| return results | |
| except RuntimeError as e: | |
| if 'out of memory' in str(e).lower(): | |
| logger.warning(f"batch_predict: OOM — تقليل batch size تلقائياً (كان {len(crops)} crop)") | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| # Fallback: واحد واحد | |
| results = [] | |
| for i, single_crop in enumerate(crops): | |
| try: | |
| rgb = cv2.cvtColor(single_crop, cv2.COLOR_BGR2RGB) | |
| pv = self.trocr_processor( | |
| images=[Image.fromarray(rgb)], return_tensors="pt", padding=True | |
| ).pixel_values.to(self.device) | |
| with torch.no_grad(): | |
| ids = self.trocr_model.generate( | |
| pv, max_length=self.max_text_length, num_beams=self.num_beams, | |
| early_stopping=True, length_penalty=1.0, | |
| ) | |
| txt = self.trocr_processor.batch_decode(ids, skip_special_tokens=True)[0] | |
| results.append(txt) | |
| except Exception as e2: | |
| logger.debug(f" OOM fallback crop[{i}] فشل: {e2}") | |
| results.append("") | |
| return results | |
| logger.warning(f"batch_predict failed: {e}") | |
| return [""] * len(crops) | |
| except Exception as e: | |
| log_error_full(logger, "TrOCR batch_predict", e, extra={ | |
| "num_crops": len(crops), | |
| "device": str(self.device), | |
| }) | |
| return [""] * len(crops) | |
| def recognize_word_ensemble( | |
| self, | |
| img_bgr: np.ndarray, | |
| easyocr_raw: Optional[list] = None, | |
| ) -> Tuple[str, float, str, bool]: | |
| """ | |
| Ensemble ذكي: يتخطى TrOCR إذا ثقة EasyOCR > easy_conf_threshold. | |
| Returns: | |
| tuple: (text, confidence, model_source, is_low_confidence) | |
| """ | |
| candidates = [] | |
| # EasyOCR أولاً | |
| if easyocr_raw is not None: | |
| _, txt, conf = easyocr_raw | |
| txt = normalize_text(txt) | |
| conf_val = float(conf) | |
| if txt: | |
| candidates.append(("easyocr", txt, conf_val)) | |
| logger.debug(f" EasyOCR: '{txt}' (ثقة={conf_val:.3f})") | |
| # تخطي TrOCR لو EasyOCR واثق | |
| if conf_val >= self.easy_conf_threshold: | |
| logger.debug(f" => اختيار EasyOCR مباشرة (ثقة >= {self.easy_conf_threshold})") | |
| return txt, conf_val, "easyocr", False | |
| # TrOCR (فقط عند الحاجة وتوفر النموذج) | |
| if not self.skip_trocr and self.trocr_processor is not None: | |
| try: | |
| start = time.time() | |
| rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) | |
| pv = self.trocr_processor( | |
| images=Image.fromarray(rgb), return_tensors="pt" | |
| ).pixel_values.to(self.device) | |
| with torch.no_grad(): | |
| ids = self.trocr_model.generate( | |
| pv, | |
| max_length=self.max_text_length, | |
| num_beams=self.num_beams, | |
| early_stopping=True, | |
| ) | |
| txt = self.trocr_processor.batch_decode( | |
| ids, skip_special_tokens=True | |
| )[0].strip() | |
| elapsed = time.time() - start | |
| if txt: | |
| candidates.append(("trocr", txt, self.trocr_default_confidence)) | |
| logger.debug(f" TrOCR: '{txt}' (ثقة={self.trocr_default_confidence:.3f}, وقت={elapsed:.3f}s)") | |
| except Exception as e: | |
| logger.debug(f" TrOCR single فشل: {type(e).__name__}: {e}") | |
| candidates = [c for c in candidates if c[1]] | |
| if not candidates: | |
| logger.debug(" => لا نتائج من أي محرك") | |
| return "", 0.0, "none", True | |
| best = max(candidates, key=lambda x: x[2]) | |
| logger.debug(f" => الاختيار النهائي: '{best[1]}' من {best[0]} (ثقة={best[2]:.3f})") | |
| return ( | |
| best[1], | |
| float(best[2]), | |
| best[0], | |
| best[2] < 0.5, | |
| ) | |
| def recognize_word(self, img_bgr: np.ndarray) -> str: | |
| """التعرف على كلمة: TrOCR أولاً ثم EasyOCR كبديل مع تسجيل مفصّل.""" | |
| if img_bgr is None or img_bgr.size == 0: | |
| logger.debug("recognize_word: صورة فارغة") | |
| return "" | |
| logger.debug(f"recognize_word: أبعاد={img_bgr.shape[:2]}, dtype={img_bgr.dtype}") | |
| text = self._recognize_trocr(img_bgr) | |
| if len(text.strip()) > 1: | |
| logger.debug(f"recognize_word => TrOCR: '{text.strip()}'") | |
| return text.strip() | |
| text = self._recognize_easyocr(img_bgr) | |
| result = text.strip() if text.strip() else "" | |
| logger.debug(f"recognize_word => EasyOCR: '{result}'") | |
| return result | |
| def detect_words_full(self, img_bgr: np.ndarray) -> list: | |
| """كشف الكلمات مع الإحداثيات باستخدام EasyOCR مع تسجيل مفصّل.""" | |
| logger.debug(f"detect_words_full: أبعاد الصورة={img_bgr.shape[:2]}") | |
| start = time.time() | |
| try: | |
| results = self.easy_reader.readtext(img_bgr, detail=1) | |
| elapsed = time.time() - start | |
| logger.info(f" EasyOCR detect: {len(results)} كلمة في {elapsed:.2f}s") | |
| for i, det in enumerate(results[:5]): # أول 5 فقط | |
| logger.debug(f" det[{i}]: '{det[1][:30]}' ثقة={det[2]:.3f}") | |
| if len(results) > 5: | |
| logger.debug(f" ... و {len(results) - 5} كلمة أخرى") | |
| return results | |
| except Exception as e: | |
| log_error_full(logger, "EasyOCR detect_words_full", e) | |
| return [] | |
| def _recognize_trocr(self, img_bgr: np.ndarray) -> str: | |
| """TrOCR للتعرف على كلمة واحدة مع تسجيل.""" | |
| if self.skip_trocr or self.trocr_processor is None: | |
| logger.debug(" _recognize_trocr: متجاوز (skip_trocr أو لا يوجد نموذج)") | |
| return "" | |
| try: | |
| rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) | |
| pil_img = Image.fromarray(rgb) | |
| pv = self.trocr_processor( | |
| images=pil_img, return_tensors="pt" | |
| ).pixel_values.to(self.device) | |
| with torch.no_grad(): | |
| ids = self.trocr_model.generate( | |
| pv, | |
| max_length=self.max_text_length, | |
| num_beams=self.num_beams, | |
| early_stopping=True, | |
| length_penalty=1.0, | |
| ) | |
| text = self.trocr_processor.batch_decode( | |
| ids, skip_special_tokens=True | |
| )[0] | |
| return text.strip() | |
| except Exception as e: | |
| logger.debug(f" _recognize_trocr فشل: {type(e).__name__}: {e}") | |
| logger.debug(traceback.format_exc()) | |
| return "" | |
| def _recognize_easyocr(self, img_bgr: np.ndarray) -> str: | |
| """EasyOCR: يختار النص بأعلى ثقة مع تسجيل مفصّل.""" | |
| try: | |
| results = self.easy_reader.readtext(img_bgr) | |
| if results: | |
| best = max(results, key=lambda r: r[2]) | |
| logger.debug(f" _recognize_easyocr: '{best[1]}' ثقة={best[2]:.3f} (من {len(results)} نتيجة)") | |
| return best[1] | |
| logger.debug(" _recognize_easyocr: لا نتائج") | |
| return "" | |
| except Exception as e: | |
| logger.debug(f" _recognize_easyocr فشل: {type(e).__name__}: {e}") | |
| return "" | |