Spaces:
Sleeping
Sleeping
| """Common dependency utilities for Brain API.""" | |
| import logging | |
| import logging.config | |
| import os | |
| import re | |
| from typing import Any | |
| import yaml | |
| from dotenv import load_dotenv | |
| from shared.remote_env import load_remote_env_if_configured | |
| CONFIG_CACHE: dict[str, Any] | None = None | |
| _LOGGING_READY = False | |
| PLACEHOLDER_RE = re.compile(r"^\$\{[A-Z0-9_]+\}$") | |
| def _load_env_stack() -> None: | |
| candidates = [ | |
| ".env", | |
| "kapo.env", | |
| ".env.runtime", | |
| ] | |
| for candidate in candidates: | |
| try: | |
| load_dotenv(candidate, override=True) | |
| except Exception: | |
| continue | |
| def _normalize_config_paths(cfg: dict[str, Any]) -> dict[str, Any]: | |
| if os.name != "nt": | |
| return cfg | |
| root = os.getcwd() | |
| normalized = dict(cfg) | |
| path_keys = { | |
| "DB_PATH", | |
| "TOOLS_DB_PATH", | |
| "FAISS_INDEX_PATH", | |
| "BRAIN_LOG_PATH", | |
| "EXEC_LOG_PATH", | |
| "LOCAL_DATA_DIR", | |
| } | |
| for key in path_keys: | |
| value = normalized.get(key) | |
| if not isinstance(value, str) or not value: | |
| continue | |
| if value.startswith("/data"): | |
| normalized[key] = os.path.join(root, "data", value[len("/data"):].lstrip("/\\")) | |
| elif value.startswith("/models"): | |
| normalized[key] = os.path.join(root, "models", value[len("/models"):].lstrip("/\\")) | |
| return normalized | |
| def _strip_unresolved_placeholders(value): | |
| if isinstance(value, dict): | |
| return {key: _strip_unresolved_placeholders(item) for key, item in value.items()} | |
| if isinstance(value, list): | |
| return [_strip_unresolved_placeholders(item) for item in value] | |
| if isinstance(value, str) and PLACEHOLDER_RE.match(value.strip()): | |
| return "" | |
| return value | |
| def load_config() -> dict: | |
| global CONFIG_CACHE | |
| if CONFIG_CACHE is not None: | |
| return CONFIG_CACHE | |
| _load_env_stack() | |
| load_remote_env_if_configured(override=True, logger_name="kapo.brain.remote_env") | |
| config_path = os.path.join(os.path.dirname(__file__), "..", "config", "config.yaml") | |
| with open(config_path, "r", encoding="utf-8") as handle: | |
| raw = handle.read() | |
| for key, value in os.environ.items(): | |
| raw = raw.replace(f"${{{key}}}", value) | |
| parsed = yaml.safe_load(raw) or {} | |
| CONFIG_CACHE = _normalize_config_paths(_strip_unresolved_placeholders(parsed)) | |
| return CONFIG_CACHE | |
| def is_remote_brain_only() -> bool: | |
| cfg = load_config() | |
| value = cfg.get("REMOTE_BRAIN_ONLY", os.getenv("REMOTE_BRAIN_ONLY", "0")) | |
| return str(value).strip().lower() in {"1", "true", "yes", "on"} | |
| def setup_logging() -> None: | |
| global _LOGGING_READY | |
| if _LOGGING_READY: | |
| return | |
| log_cfg_path = os.path.join(os.path.dirname(__file__), "..", "config", "logging.yaml") | |
| if not os.path.exists(log_cfg_path): | |
| logging.basicConfig(level=logging.INFO) | |
| _LOGGING_READY = True | |
| return | |
| try: | |
| with open(log_cfg_path, "r", encoding="utf-8") as handle: | |
| cfg = yaml.safe_load(handle) or {} | |
| logging.config.dictConfig(cfg) | |
| except Exception: | |
| logging.basicConfig(level=logging.INFO) | |
| logging.getLogger("kapo").warning("Falling back to basic logging configuration") | |
| _LOGGING_READY = True | |
| def get_logger(name: str) -> logging.Logger: | |
| setup_logging() | |
| return logging.getLogger(name) | |
| def _normalize_base_url(candidate: Any) -> str: | |
| text = "" if candidate is None else str(candidate).strip() | |
| if not text: | |
| return "" | |
| if "://" not in text: | |
| text = f"http://{text}" | |
| return text.rstrip("/") | |
| def get_executor_url(cfg: dict) -> str: | |
| env_url = _normalize_base_url(os.getenv("EXECUTOR_URL")) | |
| if env_url: | |
| return env_url | |
| cfg_url = _normalize_base_url(cfg.get("EXECUTOR_URL")) | |
| if cfg_url: | |
| return cfg_url | |
| scheme = str(cfg.get("EXECUTOR_SCHEME") or os.getenv("EXECUTOR_SCHEME", "http")).strip() or "http" | |
| host = str(cfg.get("EXECUTOR_HOST") or os.getenv("EXECUTOR_HOST", "localhost")).strip() | |
| port = str(cfg.get("EXECUTOR_PORT") or os.getenv("EXECUTOR_PORT", "9000")).strip() | |
| if "://" in host: | |
| return host.rstrip("/") | |
| if ":" in host: | |
| return f"{scheme}://{host}".rstrip("/") | |
| return f"{scheme}://{host}:{port}".rstrip("/") | |
| def get_executor_headers(cfg: dict) -> dict: | |
| header = cfg.get("EXECUTOR_BYPASS_HEADER") or os.getenv("EXECUTOR_BYPASS_HEADER") | |
| value = cfg.get("EXECUTOR_BYPASS_VALUE") or os.getenv("EXECUTOR_BYPASS_VALUE") | |
| if header and value: | |
| return {str(header): str(value)} | |
| return {} | |
| def get_brain_headers(cfg: dict) -> dict: | |
| header = cfg.get("BRAIN_BYPASS_HEADER") or os.getenv("BRAIN_BYPASS_HEADER") | |
| value = cfg.get("BRAIN_BYPASS_VALUE") or os.getenv("BRAIN_BYPASS_VALUE") | |
| if header and value: | |
| return {str(header): str(value)} | |
| return {} | |