""" GPU / provider detection and vision-engine mode selection for CEPHEUS. CEPHEUS_CLOUD=1 without CEPHEUS_GPU_VISION uses the lightweight stub. CEPHEUS_GPU_VISION=1 on cloud uses the full vision stack (for Cloud Run + L4). CEPHEUS_FORCE_FULL_VISION=1 on cloud forces the full vision stack even on CPU. Local dev (CEPHEUS_CLOUD unset/0) always uses the full vision stack. """ from __future__ import annotations import logging import os import threading from typing import Any logger = logging.getLogger(__name__) _warmload_lock = threading.Lock() _warmload_state: dict[str, Any] = { "started": False, "complete": False, "error": None, "result": None, } def _env_truthy(name: str) -> bool: return os.getenv(name, "").strip().lower() in ("1", "true", "yes") def use_full_vision_engine() -> bool: """Return True when YOLO + InsightFace should load (local or GPU cloud).""" if not _env_truthy("CEPHEUS_CLOUD"): return True return _env_truthy("CEPHEUS_GPU_VISION") or _env_truthy("CEPHEUS_FORCE_FULL_VISION") def detect_acceleration() -> dict[str, Any]: """Probe PyTorch CUDA and ONNX Runtime providers without crashing.""" info: dict[str, Any] = { "cuda_available": False, "torch_cuda": False, "onnx_providers": [], "provider": "cpu", "device_name": None, "fallback_reason": None, } try: import onnxruntime as ort info["onnx_providers"] = list(ort.get_available_providers()) except Exception as exc: logger.debug("ONNX Runtime probe skipped: %s", exc) try: import torch info["torch_cuda"] = bool(torch.cuda.is_available()) if info["torch_cuda"]: info["cuda_available"] = True info["provider"] = "cuda" try: info["device_name"] = torch.cuda.get_device_name(0) except Exception: info["device_name"] = "cuda:0" except ImportError: if "CUDAExecutionProvider" in info["onnx_providers"]: info["cuda_available"] = True info["provider"] = "cuda" info["device_name"] = "onnxruntime-cuda" if not info["cuda_available"]: if _env_truthy("CEPHEUS_FORCE_CPU"): info["fallback_reason"] = "CEPHEUS_FORCE_CPU=1" elif not info["onnx_providers"]: info["fallback_reason"] = "ONNX Runtime unavailable" elif "CUDAExecutionProvider" not in info["onnx_providers"] and not info["torch_cuda"]: info["fallback_reason"] = ( "CUDA not available (install torch+cuda or onnxruntime-gpu for GPU inference)" ) else: info["fallback_reason"] = "CUDA not available" return info def insightface_ctx_id() -> int: """InsightFace ctx_id: 0 = GPU, -1 = CPU.""" if _env_truthy("CEPHEUS_FORCE_CPU"): return -1 accel = detect_acceleration() return 0 if accel["cuda_available"] else -1 def get_warmload_state() -> dict[str, Any]: with _warmload_lock: return dict(_warmload_state) def mark_warmload_started() -> bool: """Return False if warmload was already started.""" with _warmload_lock: if _warmload_state["started"]: return False _warmload_state["started"] = True return True def mark_warmload_complete(result: dict[str, Any] | None = None) -> None: with _warmload_lock: _warmload_state["complete"] = True _warmload_state["result"] = result def mark_warmload_failed(error: str) -> None: with _warmload_lock: _warmload_state["error"] = error _face_ready_probe: Any = None def register_face_ready_probe(probe: Any) -> None: """Optional callback from main after VisionEngine init — avoids import cycles.""" global _face_ready_probe _face_ready_probe = probe def live_status_payload() -> dict[str, str]: state = get_warmload_state() if state.get("complete"): return {"status": "ready", "message": "Instance ready"} if _face_ready_probe: try: if _face_ready_probe(): return {"status": "ready", "message": "Face engine loaded"} except Exception: pass if state.get("started"): return {"status": "warming", "message": "Model warmload in progress"} return {"status": "warming", "message": "Instance initialization triggered"}