File size: 4,880 Bytes
564b5ea
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
"""Common dependency utilities for Brain API."""
import logging
import logging.config
import os
import re
from typing import Any

import yaml
from dotenv import load_dotenv
from shared.remote_env import load_remote_env_if_configured

CONFIG_CACHE: dict[str, Any] | None = None
_LOGGING_READY = False
PLACEHOLDER_RE = re.compile(r"^\$\{[A-Z0-9_]+\}$")


def _load_env_stack() -> None:
    candidates = [
        ".env",
        "kapo.env",
        ".env.runtime",
    ]
    for candidate in candidates:
        try:
            load_dotenv(candidate, override=True)
        except Exception:
            continue


def _normalize_config_paths(cfg: dict[str, Any]) -> dict[str, Any]:
    if os.name != "nt":
        return cfg

    root = os.getcwd()
    normalized = dict(cfg)
    path_keys = {
        "DB_PATH",
        "TOOLS_DB_PATH",
        "FAISS_INDEX_PATH",
        "BRAIN_LOG_PATH",
        "EXEC_LOG_PATH",
        "LOCAL_DATA_DIR",
    }
    for key in path_keys:
        value = normalized.get(key)
        if not isinstance(value, str) or not value:
            continue
        if value.startswith("/data"):
            normalized[key] = os.path.join(root, "data", value[len("/data"):].lstrip("/\\"))
        elif value.startswith("/models"):
            normalized[key] = os.path.join(root, "models", value[len("/models"):].lstrip("/\\"))
    return normalized


def _strip_unresolved_placeholders(value):
    if isinstance(value, dict):
        return {key: _strip_unresolved_placeholders(item) for key, item in value.items()}
    if isinstance(value, list):
        return [_strip_unresolved_placeholders(item) for item in value]
    if isinstance(value, str) and PLACEHOLDER_RE.match(value.strip()):
        return ""
    return value


def load_config() -> dict:
    global CONFIG_CACHE
    if CONFIG_CACHE is not None:
        return CONFIG_CACHE

    _load_env_stack()
    load_remote_env_if_configured(override=True, logger_name="kapo.brain.remote_env")
    config_path = os.path.join(os.path.dirname(__file__), "..", "config", "config.yaml")
    with open(config_path, "r", encoding="utf-8") as handle:
        raw = handle.read()

    for key, value in os.environ.items():
        raw = raw.replace(f"${{{key}}}", value)

    parsed = yaml.safe_load(raw) or {}
    CONFIG_CACHE = _normalize_config_paths(_strip_unresolved_placeholders(parsed))
    return CONFIG_CACHE


def is_remote_brain_only() -> bool:
    cfg = load_config()
    value = cfg.get("REMOTE_BRAIN_ONLY", os.getenv("REMOTE_BRAIN_ONLY", "0"))
    return str(value).strip().lower() in {"1", "true", "yes", "on"}


def setup_logging() -> None:
    global _LOGGING_READY
    if _LOGGING_READY:
        return

    log_cfg_path = os.path.join(os.path.dirname(__file__), "..", "config", "logging.yaml")
    if not os.path.exists(log_cfg_path):
        logging.basicConfig(level=logging.INFO)
        _LOGGING_READY = True
        return

    try:
        with open(log_cfg_path, "r", encoding="utf-8") as handle:
            cfg = yaml.safe_load(handle) or {}
        logging.config.dictConfig(cfg)
    except Exception:
        logging.basicConfig(level=logging.INFO)
        logging.getLogger("kapo").warning("Falling back to basic logging configuration")

    _LOGGING_READY = True


def get_logger(name: str) -> logging.Logger:
    setup_logging()
    return logging.getLogger(name)


def _normalize_base_url(candidate: Any) -> str:
    text = "" if candidate is None else str(candidate).strip()
    if not text:
        return ""
    if "://" not in text:
        text = f"http://{text}"
    return text.rstrip("/")


def get_executor_url(cfg: dict) -> str:
    env_url = _normalize_base_url(os.getenv("EXECUTOR_URL"))
    if env_url:
        return env_url

    cfg_url = _normalize_base_url(cfg.get("EXECUTOR_URL"))
    if cfg_url:
        return cfg_url

    scheme = str(cfg.get("EXECUTOR_SCHEME") or os.getenv("EXECUTOR_SCHEME", "http")).strip() or "http"
    host = str(cfg.get("EXECUTOR_HOST") or os.getenv("EXECUTOR_HOST", "localhost")).strip()
    port = str(cfg.get("EXECUTOR_PORT") or os.getenv("EXECUTOR_PORT", "9000")).strip()

    if "://" in host:
        return host.rstrip("/")
    if ":" in host:
        return f"{scheme}://{host}".rstrip("/")
    return f"{scheme}://{host}:{port}".rstrip("/")


def get_executor_headers(cfg: dict) -> dict:
    header = cfg.get("EXECUTOR_BYPASS_HEADER") or os.getenv("EXECUTOR_BYPASS_HEADER")
    value = cfg.get("EXECUTOR_BYPASS_VALUE") or os.getenv("EXECUTOR_BYPASS_VALUE")
    if header and value:
        return {str(header): str(value)}
    return {}


def get_brain_headers(cfg: dict) -> dict:
    header = cfg.get("BRAIN_BYPASS_HEADER") or os.getenv("BRAIN_BYPASS_HEADER")
    value = cfg.get("BRAIN_BYPASS_VALUE") or os.getenv("BRAIN_BYPASS_VALUE")
    if header and value:
        return {str(header): str(value)}
    return {}