MrA7A3's picture
Initial modernized KAPO runtime upload
564b5ea verified
"""Common dependency utilities for Brain API."""
import logging
import logging.config
import os
import re
from typing import Any
import yaml
from dotenv import load_dotenv
from shared.remote_env import load_remote_env_if_configured
CONFIG_CACHE: dict[str, Any] | None = None
_LOGGING_READY = False
PLACEHOLDER_RE = re.compile(r"^\$\{[A-Z0-9_]+\}$")
def _load_env_stack() -> None:
candidates = [
".env",
"kapo.env",
".env.runtime",
]
for candidate in candidates:
try:
load_dotenv(candidate, override=True)
except Exception:
continue
def _normalize_config_paths(cfg: dict[str, Any]) -> dict[str, Any]:
if os.name != "nt":
return cfg
root = os.getcwd()
normalized = dict(cfg)
path_keys = {
"DB_PATH",
"TOOLS_DB_PATH",
"FAISS_INDEX_PATH",
"BRAIN_LOG_PATH",
"EXEC_LOG_PATH",
"LOCAL_DATA_DIR",
}
for key in path_keys:
value = normalized.get(key)
if not isinstance(value, str) or not value:
continue
if value.startswith("/data"):
normalized[key] = os.path.join(root, "data", value[len("/data"):].lstrip("/\\"))
elif value.startswith("/models"):
normalized[key] = os.path.join(root, "models", value[len("/models"):].lstrip("/\\"))
return normalized
def _strip_unresolved_placeholders(value):
if isinstance(value, dict):
return {key: _strip_unresolved_placeholders(item) for key, item in value.items()}
if isinstance(value, list):
return [_strip_unresolved_placeholders(item) for item in value]
if isinstance(value, str) and PLACEHOLDER_RE.match(value.strip()):
return ""
return value
def load_config() -> dict:
global CONFIG_CACHE
if CONFIG_CACHE is not None:
return CONFIG_CACHE
_load_env_stack()
load_remote_env_if_configured(override=True, logger_name="kapo.brain.remote_env")
config_path = os.path.join(os.path.dirname(__file__), "..", "config", "config.yaml")
with open(config_path, "r", encoding="utf-8") as handle:
raw = handle.read()
for key, value in os.environ.items():
raw = raw.replace(f"${{{key}}}", value)
parsed = yaml.safe_load(raw) or {}
CONFIG_CACHE = _normalize_config_paths(_strip_unresolved_placeholders(parsed))
return CONFIG_CACHE
def is_remote_brain_only() -> bool:
cfg = load_config()
value = cfg.get("REMOTE_BRAIN_ONLY", os.getenv("REMOTE_BRAIN_ONLY", "0"))
return str(value).strip().lower() in {"1", "true", "yes", "on"}
def setup_logging() -> None:
global _LOGGING_READY
if _LOGGING_READY:
return
log_cfg_path = os.path.join(os.path.dirname(__file__), "..", "config", "logging.yaml")
if not os.path.exists(log_cfg_path):
logging.basicConfig(level=logging.INFO)
_LOGGING_READY = True
return
try:
with open(log_cfg_path, "r", encoding="utf-8") as handle:
cfg = yaml.safe_load(handle) or {}
logging.config.dictConfig(cfg)
except Exception:
logging.basicConfig(level=logging.INFO)
logging.getLogger("kapo").warning("Falling back to basic logging configuration")
_LOGGING_READY = True
def get_logger(name: str) -> logging.Logger:
setup_logging()
return logging.getLogger(name)
def _normalize_base_url(candidate: Any) -> str:
text = "" if candidate is None else str(candidate).strip()
if not text:
return ""
if "://" not in text:
text = f"http://{text}"
return text.rstrip("/")
def get_executor_url(cfg: dict) -> str:
env_url = _normalize_base_url(os.getenv("EXECUTOR_URL"))
if env_url:
return env_url
cfg_url = _normalize_base_url(cfg.get("EXECUTOR_URL"))
if cfg_url:
return cfg_url
scheme = str(cfg.get("EXECUTOR_SCHEME") or os.getenv("EXECUTOR_SCHEME", "http")).strip() or "http"
host = str(cfg.get("EXECUTOR_HOST") or os.getenv("EXECUTOR_HOST", "localhost")).strip()
port = str(cfg.get("EXECUTOR_PORT") or os.getenv("EXECUTOR_PORT", "9000")).strip()
if "://" in host:
return host.rstrip("/")
if ":" in host:
return f"{scheme}://{host}".rstrip("/")
return f"{scheme}://{host}:{port}".rstrip("/")
def get_executor_headers(cfg: dict) -> dict:
header = cfg.get("EXECUTOR_BYPASS_HEADER") or os.getenv("EXECUTOR_BYPASS_HEADER")
value = cfg.get("EXECUTOR_BYPASS_VALUE") or os.getenv("EXECUTOR_BYPASS_VALUE")
if header and value:
return {str(header): str(value)}
return {}
def get_brain_headers(cfg: dict) -> dict:
header = cfg.get("BRAIN_BYPASS_HEADER") or os.getenv("BRAIN_BYPASS_HEADER")
value = cfg.get("BRAIN_BYPASS_VALUE") or os.getenv("BRAIN_BYPASS_VALUE")
if header and value:
return {str(header): str(value)}
return {}