"""Common dependency utilities for Brain API.""" import logging import logging.config import os import re from typing import Any import yaml from dotenv import load_dotenv from shared.remote_env import load_remote_env_if_configured CONFIG_CACHE: dict[str, Any] | None = None _LOGGING_READY = False PLACEHOLDER_RE = re.compile(r"^\$\{[A-Z0-9_]+\}$") def _load_env_stack() -> None: candidates = [ ".env", "kapo.env", ".env.runtime", ] for candidate in candidates: try: load_dotenv(candidate, override=True) except Exception: continue def _normalize_config_paths(cfg: dict[str, Any]) -> dict[str, Any]: if os.name != "nt": return cfg root = os.getcwd() normalized = dict(cfg) path_keys = { "DB_PATH", "TOOLS_DB_PATH", "FAISS_INDEX_PATH", "BRAIN_LOG_PATH", "EXEC_LOG_PATH", "LOCAL_DATA_DIR", } for key in path_keys: value = normalized.get(key) if not isinstance(value, str) or not value: continue if value.startswith("/data"): normalized[key] = os.path.join(root, "data", value[len("/data"):].lstrip("/\\")) elif value.startswith("/models"): normalized[key] = os.path.join(root, "models", value[len("/models"):].lstrip("/\\")) return normalized def _strip_unresolved_placeholders(value): if isinstance(value, dict): return {key: _strip_unresolved_placeholders(item) for key, item in value.items()} if isinstance(value, list): return [_strip_unresolved_placeholders(item) for item in value] if isinstance(value, str) and PLACEHOLDER_RE.match(value.strip()): return "" return value def load_config() -> dict: global CONFIG_CACHE if CONFIG_CACHE is not None: return CONFIG_CACHE _load_env_stack() load_remote_env_if_configured(override=True, logger_name="kapo.brain.remote_env") config_path = os.path.join(os.path.dirname(__file__), "..", "config", "config.yaml") with open(config_path, "r", encoding="utf-8") as handle: raw = handle.read() for key, value in os.environ.items(): raw = raw.replace(f"${{{key}}}", value) parsed = yaml.safe_load(raw) or {} CONFIG_CACHE = _normalize_config_paths(_strip_unresolved_placeholders(parsed)) return CONFIG_CACHE def is_remote_brain_only() -> bool: cfg = load_config() value = cfg.get("REMOTE_BRAIN_ONLY", os.getenv("REMOTE_BRAIN_ONLY", "0")) return str(value).strip().lower() in {"1", "true", "yes", "on"} def setup_logging() -> None: global _LOGGING_READY if _LOGGING_READY: return log_cfg_path = os.path.join(os.path.dirname(__file__), "..", "config", "logging.yaml") if not os.path.exists(log_cfg_path): logging.basicConfig(level=logging.INFO) _LOGGING_READY = True return try: with open(log_cfg_path, "r", encoding="utf-8") as handle: cfg = yaml.safe_load(handle) or {} logging.config.dictConfig(cfg) except Exception: logging.basicConfig(level=logging.INFO) logging.getLogger("kapo").warning("Falling back to basic logging configuration") _LOGGING_READY = True def get_logger(name: str) -> logging.Logger: setup_logging() return logging.getLogger(name) def _normalize_base_url(candidate: Any) -> str: text = "" if candidate is None else str(candidate).strip() if not text: return "" if "://" not in text: text = f"http://{text}" return text.rstrip("/") def get_executor_url(cfg: dict) -> str: env_url = _normalize_base_url(os.getenv("EXECUTOR_URL")) if env_url: return env_url cfg_url = _normalize_base_url(cfg.get("EXECUTOR_URL")) if cfg_url: return cfg_url scheme = str(cfg.get("EXECUTOR_SCHEME") or os.getenv("EXECUTOR_SCHEME", "http")).strip() or "http" host = str(cfg.get("EXECUTOR_HOST") or os.getenv("EXECUTOR_HOST", "localhost")).strip() port = str(cfg.get("EXECUTOR_PORT") or os.getenv("EXECUTOR_PORT", "9000")).strip() if "://" in host: return host.rstrip("/") if ":" in host: return f"{scheme}://{host}".rstrip("/") return f"{scheme}://{host}:{port}".rstrip("/") def get_executor_headers(cfg: dict) -> dict: header = cfg.get("EXECUTOR_BYPASS_HEADER") or os.getenv("EXECUTOR_BYPASS_HEADER") value = cfg.get("EXECUTOR_BYPASS_VALUE") or os.getenv("EXECUTOR_BYPASS_VALUE") if header and value: return {str(header): str(value)} return {} def get_brain_headers(cfg: dict) -> dict: header = cfg.get("BRAIN_BYPASS_HEADER") or os.getenv("BRAIN_BYPASS_HEADER") value = cfg.get("BRAIN_BYPASS_VALUE") or os.getenv("BRAIN_BYPASS_VALUE") if header and value: return {str(header): str(value)} return {}