| """ |
| GPU / provider detection and vision-engine mode selection for CEPHEUS. |
| |
| CEPHEUS_CLOUD=1 without CEPHEUS_GPU_VISION uses the lightweight stub. |
| CEPHEUS_GPU_VISION=1 on cloud uses the full vision stack (for Cloud Run + L4). |
| CEPHEUS_FORCE_FULL_VISION=1 on cloud forces the full vision stack even on CPU. |
| Local dev (CEPHEUS_CLOUD unset/0) always uses the full vision stack. |
| """ |
| from __future__ import annotations |
|
|
| import logging |
| import os |
| import threading |
| from typing import Any |
|
|
| logger = logging.getLogger(__name__) |
|
|
| _warmload_lock = threading.Lock() |
| _warmload_state: dict[str, Any] = { |
| "started": False, |
| "complete": False, |
| "error": None, |
| "result": None, |
| } |
|
|
|
|
| def _env_truthy(name: str) -> bool: |
| return os.getenv(name, "").strip().lower() in ("1", "true", "yes") |
|
|
|
|
| def use_full_vision_engine() -> bool: |
| """Return True when YOLO + InsightFace should load (local or GPU cloud).""" |
| if not _env_truthy("CEPHEUS_CLOUD"): |
| return True |
| return _env_truthy("CEPHEUS_GPU_VISION") or _env_truthy("CEPHEUS_FORCE_FULL_VISION") |
|
|
|
|
| def detect_acceleration() -> dict[str, Any]: |
| """Probe PyTorch CUDA and ONNX Runtime providers without crashing.""" |
| info: dict[str, Any] = { |
| "cuda_available": False, |
| "torch_cuda": False, |
| "onnx_providers": [], |
| "provider": "cpu", |
| "device_name": None, |
| "fallback_reason": None, |
| } |
|
|
| try: |
| import onnxruntime as ort |
|
|
| info["onnx_providers"] = list(ort.get_available_providers()) |
| except Exception as exc: |
| logger.debug("ONNX Runtime probe skipped: %s", exc) |
|
|
| try: |
| import torch |
|
|
| info["torch_cuda"] = bool(torch.cuda.is_available()) |
| if info["torch_cuda"]: |
| info["cuda_available"] = True |
| info["provider"] = "cuda" |
| try: |
| info["device_name"] = torch.cuda.get_device_name(0) |
| except Exception: |
| info["device_name"] = "cuda:0" |
| except ImportError: |
| if "CUDAExecutionProvider" in info["onnx_providers"]: |
| info["cuda_available"] = True |
| info["provider"] = "cuda" |
| info["device_name"] = "onnxruntime-cuda" |
|
|
| if not info["cuda_available"]: |
| if _env_truthy("CEPHEUS_FORCE_CPU"): |
| info["fallback_reason"] = "CEPHEUS_FORCE_CPU=1" |
| elif not info["onnx_providers"]: |
| info["fallback_reason"] = "ONNX Runtime unavailable" |
| elif "CUDAExecutionProvider" not in info["onnx_providers"] and not info["torch_cuda"]: |
| info["fallback_reason"] = ( |
| "CUDA not available (install torch+cuda or onnxruntime-gpu for GPU inference)" |
| ) |
| else: |
| info["fallback_reason"] = "CUDA not available" |
|
|
| return info |
|
|
|
|
| def insightface_ctx_id() -> int: |
| """InsightFace ctx_id: 0 = GPU, -1 = CPU.""" |
| if _env_truthy("CEPHEUS_FORCE_CPU"): |
| return -1 |
| accel = detect_acceleration() |
| return 0 if accel["cuda_available"] else -1 |
|
|
|
|
| def get_warmload_state() -> dict[str, Any]: |
| with _warmload_lock: |
| return dict(_warmload_state) |
|
|
|
|
| def mark_warmload_started() -> bool: |
| """Return False if warmload was already started.""" |
| with _warmload_lock: |
| if _warmload_state["started"]: |
| return False |
| _warmload_state["started"] = True |
| return True |
|
|
|
|
| def mark_warmload_complete(result: dict[str, Any] | None = None) -> None: |
| with _warmload_lock: |
| _warmload_state["complete"] = True |
| _warmload_state["result"] = result |
|
|
|
|
| def mark_warmload_failed(error: str) -> None: |
| with _warmload_lock: |
| _warmload_state["error"] = error |
|
|
|
|
| _face_ready_probe: Any = None |
|
|
|
|
| def register_face_ready_probe(probe: Any) -> None: |
| """Optional callback from main after VisionEngine init — avoids import cycles.""" |
| global _face_ready_probe |
| _face_ready_probe = probe |
|
|
|
|
| def live_status_payload() -> dict[str, str]: |
| state = get_warmload_state() |
| if state.get("complete"): |
| return {"status": "ready", "message": "Instance ready"} |
| if _face_ready_probe: |
| try: |
| if _face_ready_probe(): |
| return {"status": "ready", "message": "Face engine loaded"} |
| except Exception: |
| pass |
| if state.get("started"): |
| return {"status": "warming", "message": "Model warmload in progress"} |
| return {"status": "warming", "message": "Instance initialization triggered"} |
|
|