File size: 5,640 Bytes
7d07e42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
63fae6b
 
 
 
 
 
 
7d07e42
 
 
63fae6b
 
 
 
7d07e42
63fae6b
 
7d07e42
 
63fae6b
7d07e42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""
Model readiness tracker untuk CV Pipeline.

Track state setiap model (LOADING / READY / ERROR) supaya:
  1. Frontend bisa polling endpoint /ready dan tau kapan boleh kirim request
  2. Endpoint inference bisa nunggu (atau reject cepat) kalau model belum ready
  3. Internal client (RAG OCR fallback) tau apakah CV API aman dipanggil

Thread-safe karena dipakai di background thread (prewarmer) + request handler.
"""

from __future__ import annotations

import threading
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, Optional


class ModelState(str, Enum):
    NOT_LOADED = "not_loaded"
    LOADING = "loading"
    READY = "ready"
    ERROR = "error"


@dataclass
class ModelStatus:
    state: ModelState = ModelState.NOT_LOADED
    error_message: str = ""
    started_at: float = 0.0
    ready_at: float = 0.0

    @property
    def load_seconds(self) -> Optional[float]:
        if self.ready_at and self.started_at:
            return round(self.ready_at - self.started_at, 2)
        return None

    def to_dict(self) -> dict:
        d = {"state": self.state.value}
        if self.error_message:
            d["error"] = self.error_message
        if self.load_seconds is not None:
            d["load_seconds"] = self.load_seconds
        return d


class ReadinessTracker:
    """Singleton state tracker — dipakai dari mana aja di CV API."""

    _instance: Optional["ReadinessTracker"] = None
    _lock = threading.Lock()

    def __new__(cls):
        with cls._lock:
            if cls._instance is None:
                cls._instance = super().__new__(cls)
                cls._instance._init()
            return cls._instance

    def _init(self):
        self._models: Dict[str, ModelStatus] = {
            "captioner": ModelStatus(),
            "yolo": ModelStatus(),
            "clip": ModelStatus(),
            "ocr": ModelStatus(),
        }
        self._ready_event = threading.Event()
        self._state_lock = threading.Lock()

    def mark_loading(self, model_name: str):
        with self._state_lock:
            status = self._models.setdefault(model_name, ModelStatus())
            status.state = ModelState.LOADING
            status.started_at = time.time()
            status.error_message = ""

    def mark_ready(self, model_name: str):
        with self._state_lock:
            status = self._models.setdefault(model_name, ModelStatus())
            status.state = ModelState.READY
            status.ready_at = time.time()
            if self._all_ready_unlocked():
                self._ready_event.set()

    def mark_error(self, model_name: str, message: str):
        with self._state_lock:
            status = self._models.setdefault(model_name, ModelStatus())
            status.state = ModelState.ERROR
            status.error_message = str(message)[:500]

    def get_status(self, model_name: str) -> ModelStatus:
        with self._state_lock:
            return self._models.get(model_name, ModelStatus())

    def is_ready(self, model_name: str) -> bool:
        with self._state_lock:
            return self._models.get(model_name, ModelStatus()).state == ModelState.READY

    def all_ready(self) -> bool:
        with self._state_lock:
            return self._all_ready_unlocked()

    def _all_ready_unlocked(self) -> bool:
        # Caller harus sudah hold _state_lock.
        # Hanya cek required models — CLIP dikecualikan (lazy on-demand).
        required = {k: v for k, v in self._models.items() if k in self._REQUIRED_MODELS}
        return all(s.state == ModelState.READY for s in required.values())

    # Model yang wajib ready untuk anggap CV siap dipakai.
    # CLIP dikecualikan — dia pure lazy (hanya di-load saat ada classify request).
    _REQUIRED_MODELS = {"captioner", "yolo", "ocr"}

    def overall_state(self) -> str:
        with self._state_lock:
            required = {k: v for k, v in self._models.items() if k in self._REQUIRED_MODELS}
            req_states = [s.state for s in required.values()]
            # CV siap = semua required model READY
            if all(s == ModelState.READY for s in req_states):
                return "ready"
            if any(s == ModelState.ERROR for s in req_states):
                if any(s == ModelState.LOADING for s in req_states):
                    return "loading"
                return "degraded"
            if any(s == ModelState.LOADING for s in req_states):
                return "loading"
            return "not_started"

    def wait_until_ready(self, timeout: float = 120.0) -> bool:
        """Block sampai semua model READY atau timeout. Returns True kalau ready."""
        return self._ready_event.wait(timeout=timeout)

    def wait_for(self, model_name: str, timeout: float = 120.0) -> bool:
        """Block sampai model spesifik READY. Polling-based supaya per-model."""
        deadline = time.time() + timeout
        while time.time() < deadline:
            status = self.get_status(model_name)
            if status.state == ModelState.READY:
                return True
            if status.state == ModelState.ERROR:
                return False
            time.sleep(0.2)
        return False

    def snapshot(self) -> dict:
        with self._state_lock:
            return {
                "overall": self.overall_state(),
                "models": {name: status.to_dict() for name, status in self._models.items()},
                "all_ready": self._all_ready_unlocked(),
            }


def get_readiness() -> ReadinessTracker:
    """Akses singleton readiness tracker."""
    return ReadinessTracker()