Spaces:
Sleeping
Sleeping
File size: 4,880 Bytes
564b5ea | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | """Common dependency utilities for Brain API."""
import logging
import logging.config
import os
import re
from typing import Any
import yaml
from dotenv import load_dotenv
from shared.remote_env import load_remote_env_if_configured
CONFIG_CACHE: dict[str, Any] | None = None
_LOGGING_READY = False
PLACEHOLDER_RE = re.compile(r"^\$\{[A-Z0-9_]+\}$")
def _load_env_stack() -> None:
candidates = [
".env",
"kapo.env",
".env.runtime",
]
for candidate in candidates:
try:
load_dotenv(candidate, override=True)
except Exception:
continue
def _normalize_config_paths(cfg: dict[str, Any]) -> dict[str, Any]:
if os.name != "nt":
return cfg
root = os.getcwd()
normalized = dict(cfg)
path_keys = {
"DB_PATH",
"TOOLS_DB_PATH",
"FAISS_INDEX_PATH",
"BRAIN_LOG_PATH",
"EXEC_LOG_PATH",
"LOCAL_DATA_DIR",
}
for key in path_keys:
value = normalized.get(key)
if not isinstance(value, str) or not value:
continue
if value.startswith("/data"):
normalized[key] = os.path.join(root, "data", value[len("/data"):].lstrip("/\\"))
elif value.startswith("/models"):
normalized[key] = os.path.join(root, "models", value[len("/models"):].lstrip("/\\"))
return normalized
def _strip_unresolved_placeholders(value):
if isinstance(value, dict):
return {key: _strip_unresolved_placeholders(item) for key, item in value.items()}
if isinstance(value, list):
return [_strip_unresolved_placeholders(item) for item in value]
if isinstance(value, str) and PLACEHOLDER_RE.match(value.strip()):
return ""
return value
def load_config() -> dict:
global CONFIG_CACHE
if CONFIG_CACHE is not None:
return CONFIG_CACHE
_load_env_stack()
load_remote_env_if_configured(override=True, logger_name="kapo.brain.remote_env")
config_path = os.path.join(os.path.dirname(__file__), "..", "config", "config.yaml")
with open(config_path, "r", encoding="utf-8") as handle:
raw = handle.read()
for key, value in os.environ.items():
raw = raw.replace(f"${{{key}}}", value)
parsed = yaml.safe_load(raw) or {}
CONFIG_CACHE = _normalize_config_paths(_strip_unresolved_placeholders(parsed))
return CONFIG_CACHE
def is_remote_brain_only() -> bool:
cfg = load_config()
value = cfg.get("REMOTE_BRAIN_ONLY", os.getenv("REMOTE_BRAIN_ONLY", "0"))
return str(value).strip().lower() in {"1", "true", "yes", "on"}
def setup_logging() -> None:
global _LOGGING_READY
if _LOGGING_READY:
return
log_cfg_path = os.path.join(os.path.dirname(__file__), "..", "config", "logging.yaml")
if not os.path.exists(log_cfg_path):
logging.basicConfig(level=logging.INFO)
_LOGGING_READY = True
return
try:
with open(log_cfg_path, "r", encoding="utf-8") as handle:
cfg = yaml.safe_load(handle) or {}
logging.config.dictConfig(cfg)
except Exception:
logging.basicConfig(level=logging.INFO)
logging.getLogger("kapo").warning("Falling back to basic logging configuration")
_LOGGING_READY = True
def get_logger(name: str) -> logging.Logger:
setup_logging()
return logging.getLogger(name)
def _normalize_base_url(candidate: Any) -> str:
text = "" if candidate is None else str(candidate).strip()
if not text:
return ""
if "://" not in text:
text = f"http://{text}"
return text.rstrip("/")
def get_executor_url(cfg: dict) -> str:
env_url = _normalize_base_url(os.getenv("EXECUTOR_URL"))
if env_url:
return env_url
cfg_url = _normalize_base_url(cfg.get("EXECUTOR_URL"))
if cfg_url:
return cfg_url
scheme = str(cfg.get("EXECUTOR_SCHEME") or os.getenv("EXECUTOR_SCHEME", "http")).strip() or "http"
host = str(cfg.get("EXECUTOR_HOST") or os.getenv("EXECUTOR_HOST", "localhost")).strip()
port = str(cfg.get("EXECUTOR_PORT") or os.getenv("EXECUTOR_PORT", "9000")).strip()
if "://" in host:
return host.rstrip("/")
if ":" in host:
return f"{scheme}://{host}".rstrip("/")
return f"{scheme}://{host}:{port}".rstrip("/")
def get_executor_headers(cfg: dict) -> dict:
header = cfg.get("EXECUTOR_BYPASS_HEADER") or os.getenv("EXECUTOR_BYPASS_HEADER")
value = cfg.get("EXECUTOR_BYPASS_VALUE") or os.getenv("EXECUTOR_BYPASS_VALUE")
if header and value:
return {str(header): str(value)}
return {}
def get_brain_headers(cfg: dict) -> dict:
header = cfg.get("BRAIN_BYPASS_HEADER") or os.getenv("BRAIN_BYPASS_HEADER")
value = cfg.get("BRAIN_BYPASS_VALUE") or os.getenv("BRAIN_BYPASS_VALUE")
if header and value:
return {str(header): str(value)}
return {}
|