""" CV API routes. Pola yang dipakai: - Semua heavy endpoint pakai `def` (bukan `async def`) supaya FastAPI jalankan di threadpool, bukan event loop. Ini prevent blocking event loop yang menyebabkan health checks, readiness polling, dan semua request lain ikut hang. - Endpoint /ready dipakai UI buat polling. - _MODEL_WAIT_TIMEOUT dikurangi ke 30s (HF edge proxy timeout ~60s, jadi ada buffer 30s untuk compute aktual). """ from __future__ import annotations import asyncio import threading from fastapi import APIRouter, HTTPException, UploadFile, File, status from fastapi.responses import Response from pydantic import BaseModel from loguru import logger from .schemas import ( AnalyzeURLRequest, FullAnalysisResponse, ClassifyRequest, ClassificationResponse, SimilarityRequest, SimilarityResponse, VisualQARequest, VisualQAResponse, CaptionResponse, DetectionResponse, OCRResponse, ) from .readiness import get_readiness from ..cv_pipeline import CVPipeline router = APIRouter() _pipeline: CVPipeline = None _pipeline_lock = threading.Lock() _trigger_lock = threading.Lock() def get_pipeline() -> CVPipeline: global _pipeline if _pipeline is None: with _pipeline_lock: if _pipeline is None: _pipeline = CVPipeline() return _pipeline # Dikurangi dari 180s ke 30s. # Semua model prewarmed dalam <2s. Kalau masih belum ready dalam 30s, # ada masalah serius — lebih baik fail fast daripada block sampai HF proxy timeout. _MODEL_WAIT_TIMEOUT = 90.0 def _trigger_and_wait(model_name: str): """ Trigger lazy load model (akses pipeline property), lalu tunggu ReadinessTracker konfirmasi ready. Thread-safe: hanya satu thread yang load, sisanya tunggu. """ readiness = get_readiness() with _trigger_lock: status_info = readiness.get_status(model_name) if status_info.state.value == "error": raise HTTPException( status_code=503, detail={ "error": "model_failed_to_load", "model": model_name, "message": status_info.error_message or "Model gagal dimuat.", "hint": "Cek logs container untuk detail error.", }, ) need_spawn = status_info.state.value in ("not_loaded",) if need_spawn: readiness.mark_loading(model_name) if status_info.state.value == "ready": return if need_spawn: def _do_load(): try: p = get_pipeline() if model_name == "captioner": _ = p.captioner elif model_name == "yolo": _ = p.yolo elif model_name == "clip": _ = p.clip elif model_name == "ocr": _ = p.ocr readiness.mark_ready(model_name) logger.info(f"Model '{model_name}' lazy-loaded dan ready.") except Exception as e: readiness.mark_error(model_name, str(e)) logger.error(f"Lazy-load '{model_name}' failed: {e}") t = threading.Thread(target=_do_load, daemon=True, name=f"lazy-load-{model_name}") t.start() ok = readiness.wait_for(model_name, timeout=_MODEL_WAIT_TIMEOUT) if not ok: current = readiness.get_status(model_name).state.value if current == "error": err_msg = readiness.get_status(model_name).error_message raise HTTPException( status_code=503, detail={ "error": "model_failed_to_load", "model": model_name, "message": err_msg or f"Model '{model_name}' gagal dimuat.", "hint": "Cek logs container untuk traceback lengkap.", }, ) raise HTTPException( status_code=503, detail={ "error": "model_not_ready", "model": model_name, "current_state": current, "message": f"Model '{model_name}' belum siap setelah {_MODEL_WAIT_TIMEOUT}s.", "hint": "Cek GET /api/v1/ready. Coba request lagi dalam beberapa saat.", }, ) def _ensure_models_ready(*model_names: str): """Pastikan semua model yang dibutuhkan endpoint sudah ready.""" for name in model_names: _trigger_and_wait(name) # === HEALTH & READINESS === @router.get("/health", tags=["meta"]) def health(): return {"status": "ok"} @router.get("/ready", tags=["meta"]) def ready(): readiness = get_readiness() snap = readiness.snapshot() return snap # === ANALYSIS ENDPOINTS === # PENTING: semua heavy endpoint pakai `def` bukan `async def`. # FastAPI otomatis jalankan sync def di threadpool (anyio worker thread), # sehingga blocking code (httpx, ONNX, Tesseract) tidak freeze event loop. @router.post("/analyze/url", response_model=FullAnalysisResponse, tags=["analysis"]) def analyze_from_url(req: AnalyzeURLRequest): """Analisis gambar dari URL (caption + opsional detection/OCR/CLIP).""" import concurrent.futures as _cf needed = [] if req.run_caption: needed.append("captioner") if req.run_detection: needed.append("yolo") if req.classification_labels: needed.append("clip") if req.run_ocr: needed.append("ocr") _ensure_models_ready(*needed) # Hard outer deadline — TOTAL_TIMEOUT (40s) di cv_pipeline sudah handle ini, # tapi kita tambah satu lapis lagi di route untuk jaga-jaga. ROUTE_TIMEOUT = 78.0 # sedikit lebih dari CVPipeline.TOTAL_TIMEOUT def _run_analyze(): return get_pipeline().analyze( source=req.url, run_caption=req.run_caption, run_detection=req.run_detection, run_ocr=req.run_ocr, classification_labels=req.classification_labels, ) with _cf.ThreadPoolExecutor(max_workers=1) as exc: fut = exc.submit(_run_analyze) try: result = fut.result(timeout=ROUTE_TIMEOUT) except _cf.TimeoutError: raise HTTPException( status_code=504, detail=( "Analyze timeout setelah 42s. " "Kemungkinan server gambar lambat atau memblok HF. " "Coba URL gambar lain (imgur, ibb.co, raw GitHub, dll)." ), ) except HTTPException: raise except Exception as e: logger.error(f"Analyze error: {e}") raise HTTPException(status_code=500, detail=str(e)) return _to_response(result) @router.post("/analyze/upload", response_model=FullAnalysisResponse, tags=["analysis"]) async def analyze_upload( file: UploadFile = File(...), run_caption: bool = True, run_detection: bool = False, run_ocr: bool = False, ): """Upload dan analisis gambar langsung (multipart).""" import concurrent.futures as _cf allowed = {"image/jpeg", "image/png", "image/webp", "image/gif"} if file.content_type not in allowed: raise HTTPException(400, detail=f"Tipe file tidak didukung: {file.content_type}") data = await file.read() if len(data) > 10 * 1024 * 1024: raise HTTPException(400, detail="Ukuran file maksimum 10MB") needed = [] if run_caption: needed.append("captioner") if run_detection: needed.append("yolo") if run_ocr: needed.append("ocr") # Run blocking work in threadpool so we don't block event loop. # Pakai get_running_loop() (bukan get_event_loop() yang deprecated di Py3.10+). def _run(): _ensure_models_ready(*needed) return get_pipeline().analyze( source=data, run_caption=run_caption, run_detection=run_detection, run_ocr=run_ocr, ) UPLOAD_TIMEOUT = 78.0 # sama dengan analyze/url — hard deadline try: loop = asyncio.get_running_loop() with _cf.ThreadPoolExecutor(max_workers=1) as exc: fut = loop.run_in_executor(exc, _run) result = await asyncio.wait_for(fut, timeout=UPLOAD_TIMEOUT) return _to_response(result) except asyncio.TimeoutError: raise HTTPException( status_code=504, detail="Upload analyze timeout setelah 42s. Coba lagi atau kurangi ukuran gambar.", ) except HTTPException: raise except Exception as e: logger.error(f"Upload analyze error: {e}") raise HTTPException(status_code=500, detail=str(e)) # === INDIVIDUAL TASKS === @router.post("/caption", response_model=CaptionResponse, tags=["tasks"]) def caption(url: str, prompt: str = None): """Generate deskripsi teks dari gambar.""" _ensure_models_ready("captioner") try: from ..processors.image_preprocessor import ImagePreprocessor image = ImagePreprocessor.load(url) result = get_pipeline().captioner.caption(image, prompt=prompt) return CaptionResponse(caption=result.caption, model=result.model) except HTTPException: raise except Exception as e: logger.error(f"Caption error: {e}") raise HTTPException(500, detail=f"Caption gagal: {e}") @router.post("/detect", response_model=DetectionResponse, tags=["tasks"]) def detect(url: str, conf: float = None): """Deteksi objek dalam gambar dengan YOLOv8.""" _ensure_models_ready("yolo") try: from ..processors.image_preprocessor import ImagePreprocessor image = ImagePreprocessor.load(url) result = get_pipeline().yolo.detect(image, conf_threshold=conf) return DetectionResponse( detections=[_det_to_schema(d) for d in result.detections], count=result.count, labels_summary=result.labels_summary, image_width=result.image_width, image_height=result.image_height, inference_time_ms=result.inference_time_ms, ) except HTTPException: raise except Exception as e: logger.error(f"Detect error: {e}") raise HTTPException(500, detail=f"Detection gagal: {e}") @router.post("/classify", response_model=ClassificationResponse, tags=["tasks"]) def classify(req: ClassifyRequest): """Zero-shot classification dengan CLIP.""" _ensure_models_ready("clip") try: from ..processors.image_preprocessor import ImagePreprocessor image = ImagePreprocessor.load(req.url) result = get_pipeline().clip.classify(image, req.labels) return ClassificationResponse( top_label=result.top_label, top_score=result.top_score, labels=result.labels, probabilities=result.probabilities, ) except HTTPException: raise except Exception as e: logger.error(f"Classify error: {e}") raise HTTPException(500, detail=f"Classify gagal: {e}") @router.post("/ocr", response_model=OCRResponse, tags=["tasks"]) def ocr(url: str): """Ekstrak teks dari gambar dengan Tesseract OCR.""" _ensure_models_ready("ocr") try: from ..processors.image_preprocessor import ImagePreprocessor image = ImagePreprocessor.load(url) result = get_pipeline().ocr.extract_text(image) return OCRResponse( full_text=result.full_text, boxes=[{"text": b.text, "confidence": b.confidence, "bbox": b.bbox} for b in (result.boxes or [])], word_count=result.word_count, language=result.language, engine=result.engine, ) except HTTPException: raise except Exception as e: logger.error(f"OCR error: {e}") raise HTTPException(500, detail=f"OCR gagal: {e}") @router.post("/vqa", response_model=VisualQAResponse, tags=["tasks"]) def visual_qa(req: VisualQARequest): """Visual Question Answering.""" _ensure_models_ready("captioner") try: from ..processors.image_preprocessor import ImagePreprocessor image = ImagePreprocessor.load(req.url) result = get_pipeline().captioner.visual_qa(image, req.question) return VisualQAResponse(question=req.question, answer=result.caption) except HTTPException: raise except Exception as e: logger.error(f"VQA error: {e}") raise HTTPException(500, detail=f"VQA gagal: {e}") @router.post("/similarity", response_model=SimilarityResponse, tags=["tasks"]) def similarity(req: SimilarityRequest): """Hitung similarity antara gambar dan teks dengan CLIP.""" _ensure_models_ready("clip") try: score = get_pipeline().image_text_similarity(req.url, req.text) if score > 0.3: interpretation = "Sangat relevan" elif score > 0.2: interpretation = "Cukup relevan" elif score > 0.1: interpretation = "Sedikit relevan" else: interpretation = "Tidak relevan" return SimilarityResponse( similarity_score=round(score, 4), text=req.text, interpretation=interpretation, ) except HTTPException: raise except Exception as e: logger.error(f"Similarity error: {e}") raise HTTPException(500, detail=f"Similarity gagal: {e}") # === HELPERS === def _det_to_schema(d): from .schemas import DetectionSchema, BBoxSchema return DetectionSchema( label=d.label, confidence=d.confidence, bbox=BBoxSchema( x1=d.bbox.x1, y1=d.bbox.y1, x2=d.bbox.x2, y2=d.bbox.y2, width=d.bbox.width, height=d.bbox.height, ), class_id=d.class_id, ) def _to_response(result) -> FullAnalysisResponse: from .schemas import ( FullAnalysisResponse, CaptionResponse, DetectionResponse, ClassificationResponse, OCRResponse, BBoxSchema, DetectionSchema, OCRBoxSchema, ) cap = None if result.caption: cap = CaptionResponse(caption=result.caption.caption, model=result.caption.model) det = None if result.detections: det = DetectionResponse( detections=[_det_to_schema(d) for d in result.detections.detections], count=result.detections.count, labels_summary=result.detections.labels_summary, image_width=result.detections.image_width, image_height=result.detections.image_height, inference_time_ms=result.detections.inference_time_ms, ) cls = None if result.classification: cls = ClassificationResponse( top_label=result.classification.top_label, top_score=result.classification.top_score, labels=result.classification.labels or [], probabilities=result.classification.probabilities or [], ) ocr = None if result.ocr: ocr = OCRResponse( full_text=result.ocr.full_text, boxes=[ OCRBoxSchema(text=b.text, confidence=b.confidence, bbox=b.bbox) for b in (result.ocr.boxes or []) ], word_count=result.ocr.word_count, language=result.ocr.language, engine=result.ocr.engine, ) return FullAnalysisResponse( image_width=result.image_width, image_height=result.image_height, source=result.source, caption=cap, detections=det, classification=cls, ocr=ocr, summary_text=result.to_summary(), models_used=result.models_used, total_latency_ms=result.total_latency_ms, )