Spaces:
Sleeping
Sleeping
File size: 6,024 Bytes
900df0b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 | """
مهام الخلفية غير المتزامنة (Celery Tasks)
==============================================
معالجة الملفات بشكل غير متزامن باستخدام Celery + Redis.
الاستخدام:
# تشغيل Redis
redis-server
# تشغيل worker
celery -A tasks worker --loglevel=info
# من الكود
from tasks import process_ocr_async
result = process_ocr_async.delay(file_path, options)
"""
import logging
import os
import time
from typing import Optional
logger = logging.getLogger(__name__)
# تهيئة Celery (اختياري - يعمل فقط إذا كان Redis متاحاً)
_celery_app = None
CELERY_AVAILABLE = False
try:
from celery import Celery
_celery_app = Celery(
"omnifile_tasks",
broker=os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379/0"),
backend=os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379/0"),
)
_celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_time_limit=600, # 10 دقائق كحد أقصى
task_soft_time_limit=540, # 9 دقائق تحذير
)
CELERY_AVAILABLE = True
logger.info("تم تهيئة Celery بنجاح")
except ImportError:
logger.info("Celery غير مثبت. المهام غير المتزامنة غير متوفرة.")
except Exception as e:
logger.warning("فشل تهيئة Celery: %s", e)
def run_ocr_task(
file_path: str,
options: Optional[dict] = None,
) -> dict:
"""
معالجة OCR لملف (متزامنة - يمكن استدعاؤها مباشرة).
المعاملات:
file_path: مسار الملف
options: إعدادات إضافية
العائد:
نتيجة المعالجة
"""
start_time = time.time()
options = options or {}
try:
from config import OmniFileConfig
from modules.vision.ocr_engine import OCREngine
# إعداد الإعدادات
cfg = OmniFileConfig()
# تطبيق الخيارات
if options.get("enable_trocr") is not None:
cfg.enable_trocr = options["enable_trocr"]
if options.get("enable_easyocr") is not None:
cfg.enable_easyocr = options["enable_easyocr"]
if options.get("enable_tesseract") is not None:
cfg.enable_tesseract = options["enable_tesseract"]
if options.get("use_gpu") is not None:
cfg.use_gpu = options["use_gpu"]
# إنشاء محرك OCR
engine = OCREngine(
trocr_model_name=cfg.trocr_model_name,
easyocr_languages=cfg.easyocr_languages,
tesseract_langs=cfg.tesseract_langs,
use_gpu=cfg.use_gpu,
enable_trocr=cfg.enable_trocr,
enable_easyocr=cfg.enable_easyocr,
enable_tesseract=cfg.enable_tesseract,
)
# معالجة الملف
if file_path.lower().endswith(".pdf"):
results = engine.recognize_pdf(file_path)
else:
from PIL import Image
img = Image.open(file_path)
result = engine.recognize(img)
results = [result]
duration = time.time() - start_time
return {
"success": True,
"file_path": file_path,
"results": results,
"processing_time": duration,
"pages_processed": len(results),
}
except Exception as e:
duration = time.time() - start_time
logger.error("فشلت معالجة '%s': %s", file_path, e)
return {
"success": False,
"file_path": file_path,
"error": str(e),
"processing_time": duration,
"pages_processed": 0,
}
# تسجيل المهام في Celery إذا كان متاحاً
if CELERY_AVAILABLE and _celery_app is not None:
@_celery_app.task(bind=True, max_retries=3, default_retry_delay=30)
def process_ocr_async(self, file_path: str, options: Optional[dict] = None):
"""
معالجة OCR بشكل غير متزامن عبر Celery.
المعاملات:
file_path: مسار الملف
options: إعدادات إضافية
العائد:
نتيجة المعالجة
"""
try:
return run_ocr_task(file_path, options)
except Exception as exc:
logger.error("فشلت المهمة غير المتزامنة: %s", exc)
raise self.retry(exc=exc)
@_celery_app.task(bind=True, max_retries=2)
def process_batch_async(self, file_paths: list[str], options: Optional[dict] = None):
"""
معالجة دفعة ملفات بشكل غير متزامن.
المعاملات:
file_paths: قائمة مسارات الملفات
options: إعدادات إضافية
العائد:
قائمة نتائج المعالجة
"""
results = []
for fp in file_paths:
try:
result = run_ocr_task(fp, options)
results.append(result)
self.update_state(
state="PROGRESS",
meta={
"current": len(results),
"total": len(file_paths),
"file": fp,
},
)
except Exception as e:
results.append({
"success": False,
"file_path": fp,
"error": str(e),
})
return {
"total_files": len(file_paths),
"successful": sum(1 for r in results if r["success"]),
"failed": sum(1 for r in results if not r["success"]),
"results": results,
}
|