ai-rag / cv_module /src /api /routes.py
robrtt's picture
Clean rebuild: all features fixed
4d62ba6
"""
CV API routes.
Pola yang dipakai:
- Semua heavy endpoint pakai `def` (bukan `async def`) supaya FastAPI
jalankan di threadpool, bukan event loop. Ini prevent blocking event loop
yang menyebabkan health checks, readiness polling, dan semua request lain
ikut hang.
- Endpoint /ready dipakai UI buat polling.
- _MODEL_WAIT_TIMEOUT dikurangi ke 30s (HF edge proxy timeout ~60s,
jadi ada buffer 30s untuk compute aktual).
"""
from __future__ import annotations
import asyncio
import threading
from fastapi import APIRouter, HTTPException, UploadFile, File, status
from fastapi.responses import Response
from pydantic import BaseModel
from loguru import logger
from .schemas import (
AnalyzeURLRequest, FullAnalysisResponse,
ClassifyRequest, ClassificationResponse,
SimilarityRequest, SimilarityResponse,
VisualQARequest, VisualQAResponse,
CaptionResponse, DetectionResponse, OCRResponse,
)
from .readiness import get_readiness
from ..cv_pipeline import CVPipeline
router = APIRouter()
_pipeline: CVPipeline = None
_pipeline_lock = threading.Lock()
_trigger_lock = threading.Lock()
def get_pipeline() -> CVPipeline:
global _pipeline
if _pipeline is None:
with _pipeline_lock:
if _pipeline is None:
_pipeline = CVPipeline()
return _pipeline
# Dikurangi dari 180s ke 30s.
# Semua model prewarmed dalam <2s. Kalau masih belum ready dalam 30s,
# ada masalah serius — lebih baik fail fast daripada block sampai HF proxy timeout.
_MODEL_WAIT_TIMEOUT = 90.0
def _trigger_and_wait(model_name: str):
"""
Trigger lazy load model (akses pipeline property),
lalu tunggu ReadinessTracker konfirmasi ready.
Thread-safe: hanya satu thread yang load, sisanya tunggu.
"""
readiness = get_readiness()
with _trigger_lock:
status_info = readiness.get_status(model_name)
if status_info.state.value == "error":
raise HTTPException(
status_code=503,
detail={
"error": "model_failed_to_load",
"model": model_name,
"message": status_info.error_message or "Model gagal dimuat.",
"hint": "Cek logs container untuk detail error.",
},
)
need_spawn = status_info.state.value in ("not_loaded",)
if need_spawn:
readiness.mark_loading(model_name)
if status_info.state.value == "ready":
return
if need_spawn:
def _do_load():
try:
p = get_pipeline()
if model_name == "captioner":
_ = p.captioner
elif model_name == "yolo":
_ = p.yolo
elif model_name == "clip":
_ = p.clip
elif model_name == "ocr":
_ = p.ocr
readiness.mark_ready(model_name)
logger.info(f"Model '{model_name}' lazy-loaded dan ready.")
except Exception as e:
readiness.mark_error(model_name, str(e))
logger.error(f"Lazy-load '{model_name}' failed: {e}")
t = threading.Thread(target=_do_load, daemon=True, name=f"lazy-load-{model_name}")
t.start()
ok = readiness.wait_for(model_name, timeout=_MODEL_WAIT_TIMEOUT)
if not ok:
current = readiness.get_status(model_name).state.value
if current == "error":
err_msg = readiness.get_status(model_name).error_message
raise HTTPException(
status_code=503,
detail={
"error": "model_failed_to_load",
"model": model_name,
"message": err_msg or f"Model '{model_name}' gagal dimuat.",
"hint": "Cek logs container untuk traceback lengkap.",
},
)
raise HTTPException(
status_code=503,
detail={
"error": "model_not_ready",
"model": model_name,
"current_state": current,
"message": f"Model '{model_name}' belum siap setelah {_MODEL_WAIT_TIMEOUT}s.",
"hint": "Cek GET /api/v1/ready. Coba request lagi dalam beberapa saat.",
},
)
def _ensure_models_ready(*model_names: str):
"""Pastikan semua model yang dibutuhkan endpoint sudah ready."""
for name in model_names:
_trigger_and_wait(name)
# === HEALTH & READINESS ===
@router.get("/health", tags=["meta"])
def health():
return {"status": "ok"}
@router.get("/ready", tags=["meta"])
def ready():
readiness = get_readiness()
snap = readiness.snapshot()
return snap
# === ANALYSIS ENDPOINTS ===
# PENTING: semua heavy endpoint pakai `def` bukan `async def`.
# FastAPI otomatis jalankan sync def di threadpool (anyio worker thread),
# sehingga blocking code (httpx, ONNX, Tesseract) tidak freeze event loop.
@router.post("/analyze/url", response_model=FullAnalysisResponse, tags=["analysis"])
def analyze_from_url(req: AnalyzeURLRequest):
"""Analisis gambar dari URL (caption + opsional detection/OCR/CLIP)."""
import concurrent.futures as _cf
needed = []
if req.run_caption:
needed.append("captioner")
if req.run_detection:
needed.append("yolo")
if req.classification_labels:
needed.append("clip")
if req.run_ocr:
needed.append("ocr")
_ensure_models_ready(*needed)
# Hard outer deadline — TOTAL_TIMEOUT (40s) di cv_pipeline sudah handle ini,
# tapi kita tambah satu lapis lagi di route untuk jaga-jaga.
ROUTE_TIMEOUT = 78.0 # sedikit lebih dari CVPipeline.TOTAL_TIMEOUT
def _run_analyze():
return get_pipeline().analyze(
source=req.url,
run_caption=req.run_caption,
run_detection=req.run_detection,
run_ocr=req.run_ocr,
classification_labels=req.classification_labels,
)
with _cf.ThreadPoolExecutor(max_workers=1) as exc:
fut = exc.submit(_run_analyze)
try:
result = fut.result(timeout=ROUTE_TIMEOUT)
except _cf.TimeoutError:
raise HTTPException(
status_code=504,
detail=(
"Analyze timeout setelah 42s. "
"Kemungkinan server gambar lambat atau memblok HF. "
"Coba URL gambar lain (imgur, ibb.co, raw GitHub, dll)."
),
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Analyze error: {e}")
raise HTTPException(status_code=500, detail=str(e))
return _to_response(result)
@router.post("/analyze/upload", response_model=FullAnalysisResponse, tags=["analysis"])
async def analyze_upload(
file: UploadFile = File(...),
run_caption: bool = True,
run_detection: bool = False,
run_ocr: bool = False,
):
"""Upload dan analisis gambar langsung (multipart)."""
import concurrent.futures as _cf
allowed = {"image/jpeg", "image/png", "image/webp", "image/gif"}
if file.content_type not in allowed:
raise HTTPException(400, detail=f"Tipe file tidak didukung: {file.content_type}")
data = await file.read()
if len(data) > 10 * 1024 * 1024:
raise HTTPException(400, detail="Ukuran file maksimum 10MB")
needed = []
if run_caption:
needed.append("captioner")
if run_detection:
needed.append("yolo")
if run_ocr:
needed.append("ocr")
# Run blocking work in threadpool so we don't block event loop.
# Pakai get_running_loop() (bukan get_event_loop() yang deprecated di Py3.10+).
def _run():
_ensure_models_ready(*needed)
return get_pipeline().analyze(
source=data,
run_caption=run_caption,
run_detection=run_detection,
run_ocr=run_ocr,
)
UPLOAD_TIMEOUT = 78.0 # sama dengan analyze/url — hard deadline
try:
loop = asyncio.get_running_loop()
with _cf.ThreadPoolExecutor(max_workers=1) as exc:
fut = loop.run_in_executor(exc, _run)
result = await asyncio.wait_for(fut, timeout=UPLOAD_TIMEOUT)
return _to_response(result)
except asyncio.TimeoutError:
raise HTTPException(
status_code=504,
detail="Upload analyze timeout setelah 42s. Coba lagi atau kurangi ukuran gambar.",
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Upload analyze error: {e}")
raise HTTPException(status_code=500, detail=str(e))
# === INDIVIDUAL TASKS ===
@router.post("/caption", response_model=CaptionResponse, tags=["tasks"])
def caption(url: str, prompt: str = None):
"""Generate deskripsi teks dari gambar."""
_ensure_models_ready("captioner")
try:
from ..processors.image_preprocessor import ImagePreprocessor
image = ImagePreprocessor.load(url)
result = get_pipeline().captioner.caption(image, prompt=prompt)
return CaptionResponse(caption=result.caption, model=result.model)
except HTTPException:
raise
except Exception as e:
logger.error(f"Caption error: {e}")
raise HTTPException(500, detail=f"Caption gagal: {e}")
@router.post("/detect", response_model=DetectionResponse, tags=["tasks"])
def detect(url: str, conf: float = None):
"""Deteksi objek dalam gambar dengan YOLOv8."""
_ensure_models_ready("yolo")
try:
from ..processors.image_preprocessor import ImagePreprocessor
image = ImagePreprocessor.load(url)
result = get_pipeline().yolo.detect(image, conf_threshold=conf)
return DetectionResponse(
detections=[_det_to_schema(d) for d in result.detections],
count=result.count,
labels_summary=result.labels_summary,
image_width=result.image_width,
image_height=result.image_height,
inference_time_ms=result.inference_time_ms,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Detect error: {e}")
raise HTTPException(500, detail=f"Detection gagal: {e}")
@router.post("/classify", response_model=ClassificationResponse, tags=["tasks"])
def classify(req: ClassifyRequest):
"""Zero-shot classification dengan CLIP."""
_ensure_models_ready("clip")
try:
from ..processors.image_preprocessor import ImagePreprocessor
image = ImagePreprocessor.load(req.url)
result = get_pipeline().clip.classify(image, req.labels)
return ClassificationResponse(
top_label=result.top_label,
top_score=result.top_score,
labels=result.labels,
probabilities=result.probabilities,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Classify error: {e}")
raise HTTPException(500, detail=f"Classify gagal: {e}")
@router.post("/ocr", response_model=OCRResponse, tags=["tasks"])
def ocr(url: str):
"""Ekstrak teks dari gambar dengan Tesseract OCR."""
_ensure_models_ready("ocr")
try:
from ..processors.image_preprocessor import ImagePreprocessor
image = ImagePreprocessor.load(url)
result = get_pipeline().ocr.extract_text(image)
return OCRResponse(
full_text=result.full_text,
boxes=[{"text": b.text, "confidence": b.confidence, "bbox": b.bbox}
for b in (result.boxes or [])],
word_count=result.word_count,
language=result.language,
engine=result.engine,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"OCR error: {e}")
raise HTTPException(500, detail=f"OCR gagal: {e}")
@router.post("/vqa", response_model=VisualQAResponse, tags=["tasks"])
def visual_qa(req: VisualQARequest):
"""Visual Question Answering."""
_ensure_models_ready("captioner")
try:
from ..processors.image_preprocessor import ImagePreprocessor
image = ImagePreprocessor.load(req.url)
result = get_pipeline().captioner.visual_qa(image, req.question)
return VisualQAResponse(question=req.question, answer=result.caption)
except HTTPException:
raise
except Exception as e:
logger.error(f"VQA error: {e}")
raise HTTPException(500, detail=f"VQA gagal: {e}")
@router.post("/similarity", response_model=SimilarityResponse, tags=["tasks"])
def similarity(req: SimilarityRequest):
"""Hitung similarity antara gambar dan teks dengan CLIP."""
_ensure_models_ready("clip")
try:
score = get_pipeline().image_text_similarity(req.url, req.text)
if score > 0.3:
interpretation = "Sangat relevan"
elif score > 0.2:
interpretation = "Cukup relevan"
elif score > 0.1:
interpretation = "Sedikit relevan"
else:
interpretation = "Tidak relevan"
return SimilarityResponse(
similarity_score=round(score, 4),
text=req.text,
interpretation=interpretation,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Similarity error: {e}")
raise HTTPException(500, detail=f"Similarity gagal: {e}")
# === HELPERS ===
def _det_to_schema(d):
from .schemas import DetectionSchema, BBoxSchema
return DetectionSchema(
label=d.label,
confidence=d.confidence,
bbox=BBoxSchema(
x1=d.bbox.x1,
y1=d.bbox.y1,
x2=d.bbox.x2,
y2=d.bbox.y2,
width=d.bbox.width,
height=d.bbox.height,
),
class_id=d.class_id,
)
def _to_response(result) -> FullAnalysisResponse:
from .schemas import (
FullAnalysisResponse, CaptionResponse, DetectionResponse,
ClassificationResponse, OCRResponse, BBoxSchema, DetectionSchema, OCRBoxSchema,
)
cap = None
if result.caption:
cap = CaptionResponse(caption=result.caption.caption, model=result.caption.model)
det = None
if result.detections:
det = DetectionResponse(
detections=[_det_to_schema(d) for d in result.detections.detections],
count=result.detections.count,
labels_summary=result.detections.labels_summary,
image_width=result.detections.image_width,
image_height=result.detections.image_height,
inference_time_ms=result.detections.inference_time_ms,
)
cls = None
if result.classification:
cls = ClassificationResponse(
top_label=result.classification.top_label,
top_score=result.classification.top_score,
labels=result.classification.labels or [],
probabilities=result.classification.probabilities or [],
)
ocr = None
if result.ocr:
ocr = OCRResponse(
full_text=result.ocr.full_text,
boxes=[
OCRBoxSchema(text=b.text, confidence=b.confidence, bbox=b.bbox)
for b in (result.ocr.boxes or [])
],
word_count=result.ocr.word_count,
language=result.ocr.language,
engine=result.ocr.engine,
)
return FullAnalysisResponse(
image_width=result.image_width,
image_height=result.image_height,
source=result.source,
caption=cap,
detections=det,
classification=cls,
ocr=ocr,
summary_text=result.to_summary(),
models_used=result.models_used,
total_latency_ms=result.total_latency_ms,
)