""" منسّق تشغيل المهام داخل process الـ FastAPI. المشكلة: BackgroundTasks لا يتحكم بالتوازي. لو وصلت 10 طلبات معاً سنحاول تشغيل 10 pipelines متوازية وكل واحد يستهلك ~1.5 GB. الحل: قائمة انتظار + مجموعة "قيد التنفيذ" + concurrency limit. - نسجل request_id قبل بدء التنفيذ في in_progress - يفحص poll_loop المجموعة قبل إرسال نفس الطلب مرتين - ننهي بإزالة الـ id من المجموعة سواء نجح أم فشل """ from __future__ import annotations import threading from typing import Any, Dict from loguru import logger # الحد الأقصى للتحليلات المتوازية. يستهلك كل تحليل ذاكرة كبيرة # (PySpark + CAMeL-BERT) لذلك واحد في المرة هو الأكثر أماناً على HF. _MAX_CONCURRENT = 1 _semaphore = threading.BoundedSemaphore(_MAX_CONCURRENT) _in_progress: set[str] = set() _lock = threading.Lock() def is_in_progress(request_id: str) -> bool: with _lock: return request_id in _in_progress def claim(request_id: str) -> bool: """يحاول تسجيل الطلب. يُرجع False إن كان قيد التنفيذ بالفعل.""" with _lock: if request_id in _in_progress: return False _in_progress.add(request_id) return True def release(request_id: str) -> None: with _lock: _in_progress.discard(request_id) def in_progress_count() -> int: with _lock: return len(_in_progress) def run_with_concurrency(request_id: str, request_data: Dict[str, Any]) -> Dict[str, Any]: """ Wrapper يستدعي run_analysis تحت قفل توازي. يُستدعى من poll_loop أو BackgroundTasks. ملاحظة: claim() يجب أن تكون قد استُدعيت مسبقاً من poll_loop أو api. هذه الدالة لا تُعيد claim — فقط تُنفّذ وتحرّر. """ from app.workers.analysis_worker import run_analysis try: logger.info(f"[runner] starting {request_id} (concurrent={in_progress_count()})") with _semaphore: return run_analysis(request_id, request_data) except Exception as e: logger.error(f"[runner] {request_id} crashed: {e}") return {"status": "failed", "request_id": request_id, "error": str(e)} finally: release(request_id) logger.info(f"[runner] released {request_id}")