| """Common dependency utilities for Brain API.""" |
| import logging |
| import logging.config |
| import os |
| import re |
| from typing import Any |
|
|
| import yaml |
| from dotenv import load_dotenv |
| from shared.remote_env import load_remote_env_if_configured |
|
|
| CONFIG_CACHE: dict[str, Any] | None = None |
| _LOGGING_READY = False |
| PLACEHOLDER_RE = re.compile(r"^\$\{[A-Z0-9_]+\}$") |
|
|
|
|
| def _load_env_stack() -> None: |
| candidates = [ |
| ".env", |
| "kapo.env", |
| ".env.runtime", |
| ] |
| for candidate in candidates: |
| try: |
| load_dotenv(candidate, override=True) |
| except Exception: |
| continue |
|
|
|
|
| def _normalize_config_paths(cfg: dict[str, Any]) -> dict[str, Any]: |
| if os.name != "nt": |
| return cfg |
|
|
| root = os.getcwd() |
| normalized = dict(cfg) |
| path_keys = { |
| "DB_PATH", |
| "TOOLS_DB_PATH", |
| "FAISS_INDEX_PATH", |
| "BRAIN_LOG_PATH", |
| "EXEC_LOG_PATH", |
| "LOCAL_DATA_DIR", |
| } |
| for key in path_keys: |
| value = normalized.get(key) |
| if not isinstance(value, str) or not value: |
| continue |
| if value.startswith("/data"): |
| normalized[key] = os.path.join(root, "data", value[len("/data"):].lstrip("/\\")) |
| elif value.startswith("/models"): |
| normalized[key] = os.path.join(root, "models", value[len("/models"):].lstrip("/\\")) |
| return normalized |
|
|
|
|
| def _strip_unresolved_placeholders(value): |
| if isinstance(value, dict): |
| return {key: _strip_unresolved_placeholders(item) for key, item in value.items()} |
| if isinstance(value, list): |
| return [_strip_unresolved_placeholders(item) for item in value] |
| if isinstance(value, str) and PLACEHOLDER_RE.match(value.strip()): |
| return "" |
| return value |
|
|
|
|
| def load_config() -> dict: |
| global CONFIG_CACHE |
| if CONFIG_CACHE is not None: |
| return CONFIG_CACHE |
|
|
| _load_env_stack() |
| load_remote_env_if_configured(override=True, logger_name="kapo.brain.remote_env") |
| config_path = os.path.join(os.path.dirname(__file__), "..", "config", "config.yaml") |
| with open(config_path, "r", encoding="utf-8") as handle: |
| raw = handle.read() |
|
|
| for key, value in os.environ.items(): |
| raw = raw.replace(f"${{{key}}}", value) |
|
|
| parsed = yaml.safe_load(raw) or {} |
| CONFIG_CACHE = _normalize_config_paths(_strip_unresolved_placeholders(parsed)) |
| return CONFIG_CACHE |
|
|
|
|
| def is_remote_brain_only() -> bool: |
| cfg = load_config() |
| value = cfg.get("REMOTE_BRAIN_ONLY", os.getenv("REMOTE_BRAIN_ONLY", "0")) |
| return str(value).strip().lower() in {"1", "true", "yes", "on"} |
|
|
|
|
| def setup_logging() -> None: |
| global _LOGGING_READY |
| if _LOGGING_READY: |
| return |
|
|
| log_cfg_path = os.path.join(os.path.dirname(__file__), "..", "config", "logging.yaml") |
| if not os.path.exists(log_cfg_path): |
| logging.basicConfig(level=logging.INFO) |
| _LOGGING_READY = True |
| return |
|
|
| try: |
| with open(log_cfg_path, "r", encoding="utf-8") as handle: |
| cfg = yaml.safe_load(handle) or {} |
| logging.config.dictConfig(cfg) |
| except Exception: |
| logging.basicConfig(level=logging.INFO) |
| logging.getLogger("kapo").warning("Falling back to basic logging configuration") |
|
|
| _LOGGING_READY = True |
|
|
|
|
| def get_logger(name: str) -> logging.Logger: |
| setup_logging() |
| return logging.getLogger(name) |
|
|
|
|
| def _normalize_base_url(candidate: Any) -> str: |
| text = "" if candidate is None else str(candidate).strip() |
| if not text: |
| return "" |
| if "://" not in text: |
| text = f"http://{text}" |
| return text.rstrip("/") |
|
|
|
|
| def get_executor_url(cfg: dict) -> str: |
| env_url = _normalize_base_url(os.getenv("EXECUTOR_URL")) |
| if env_url: |
| return env_url |
|
|
| cfg_url = _normalize_base_url(cfg.get("EXECUTOR_URL")) |
| if cfg_url: |
| return cfg_url |
|
|
| scheme = str(cfg.get("EXECUTOR_SCHEME") or os.getenv("EXECUTOR_SCHEME", "http")).strip() or "http" |
| host = str(cfg.get("EXECUTOR_HOST") or os.getenv("EXECUTOR_HOST", "localhost")).strip() |
| port = str(cfg.get("EXECUTOR_PORT") or os.getenv("EXECUTOR_PORT", "9000")).strip() |
|
|
| if "://" in host: |
| return host.rstrip("/") |
| if ":" in host: |
| return f"{scheme}://{host}".rstrip("/") |
| return f"{scheme}://{host}:{port}".rstrip("/") |
|
|
|
|
| def get_executor_headers(cfg: dict) -> dict: |
| header = cfg.get("EXECUTOR_BYPASS_HEADER") or os.getenv("EXECUTOR_BYPASS_HEADER") |
| value = cfg.get("EXECUTOR_BYPASS_VALUE") or os.getenv("EXECUTOR_BYPASS_VALUE") |
| if header and value: |
| return {str(header): str(value)} |
| return {} |
|
|
|
|
| def get_brain_headers(cfg: dict) -> dict: |
| header = cfg.get("BRAIN_BYPASS_HEADER") or os.getenv("BRAIN_BYPASS_HEADER") |
| value = cfg.get("BRAIN_BYPASS_VALUE") or os.getenv("BRAIN_BYPASS_VALUE") |
| if header and value: |
| return {str(header): str(value)} |
| return {} |
|
|