MrA7A1 commited on
Commit
b3f1931
·
verified ·
1 Parent(s): a7442af

Sync modernized KAPO runtime from control plane

Browse files
Dockerfile CHANGED
@@ -1,10 +1,10 @@
1
- FROM python:3.11-slim
2
- WORKDIR /app
3
- ENV PYTHONUNBUFFERED=1
4
- ENV PYTHONUTF8=1
5
- COPY requirements.txt /app/requirements.txt
6
- RUN pip install --no-cache-dir -r /app/requirements.txt
7
- RUN mkdir -p /data/kapo_runtime/current /data/kapo_runtime/overlay
8
- COPY brain_server /app/brain_server
9
- COPY bootstrap_space_runtime.py /app/bootstrap_space_runtime.py
10
- CMD ["python", "/app/bootstrap_space_runtime.py"]
 
1
+ FROM python:3.11-slim
2
+ WORKDIR /app
3
+ ENV PYTHONUNBUFFERED=1
4
+ ENV PYTHONUTF8=1
5
+ COPY requirements.txt /app/requirements.txt
6
+ RUN pip install --no-cache-dir -r /app/requirements.txt
7
+ RUN mkdir -p /data/kapo_runtime/current /data/kapo_runtime/overlay
8
+ COPY brain_server /app/brain_server
9
+ COPY bootstrap_space_runtime.py /app/bootstrap_space_runtime.py
10
+ CMD ["python", "/app/bootstrap_space_runtime.py"]
README.md CHANGED
@@ -1,5 +1,5 @@
1
  ---
2
- title: hf_debugger_main
3
  emoji: 🤖
4
  colorFrom: blue
5
  colorTo: indigo
@@ -7,14 +7,14 @@ sdk: docker
7
  pinned: false
8
  ---
9
 
10
- # hf_debugger_main
11
 
12
  Generated Hugging Face deployment package from KAPO Control Center.
13
 
14
- Model profile: hf-debugger-qwen25-7b-instruct
15
- Model repo: Qwen/Qwen2.5-1.5B-Instruct
16
  Model file: not set
17
- Roles: fallback
18
  Languages: ar, en
19
 
20
  This is a Docker-oriented Hugging Face Space package.
 
1
  ---
2
+ title: ai_coder_main
3
  emoji: 🤖
4
  colorFrom: blue
5
  colorTo: indigo
 
7
  pinned: false
8
  ---
9
 
10
+ # ai_coder_main
11
 
12
  Generated Hugging Face deployment package from KAPO Control Center.
13
 
14
+ Model profile: hf-coder-qwen25-coder-7b-instruct
15
+ Model repo: Qwen/Qwen2.5-Coder-1.5B-Instruct
16
  Model file: not set
17
+ Roles: coding, planner, fallback
18
  Languages: ar, en
19
 
20
  This is a Docker-oriented Hugging Face Space package.
bootstrap_space_runtime.py CHANGED
@@ -6,28 +6,40 @@ import sys
6
  from pathlib import Path
7
 
8
  DEFAULT_ENV = {
9
- "REMOTE_BRAIN_ONLY": "1",
10
- "KAGGLE_AUTO_BOOTSTRAP": "0",
11
- "BRAIN_AUTO_NGROK": "0",
12
- "BRAIN_AUTO_PUBLISH_URL_ON_STARTUP": "0",
13
- "BRAIN_REUSE_PUBLIC_URL_ON_RESTART": "0",
14
- "HF_SPACE_DOCKER": "1",
15
- "KAPO_COMPUTE_PROFILE": "cpu",
16
- "HF_ACCELERATOR": "cpu",
17
- "KAPO_HF_TRANSFORMERS_RUNTIME": "1",
18
- "KAPO_LAZY_MODEL_STARTUP": "1",
19
- "KAPO_LAZY_EMBED_STARTUP": "1",
20
- "MODEL_PROFILE_ID": "hf-debugger-qwen25-7b-instruct",
21
- "MODEL_REPO": "Qwen/Qwen2.5-1.5B-Instruct",
22
- "BRAIN_ROLES": "fallback",
23
- "BRAIN_LANGUAGES": "ar,en",
24
- "BRAIN_PLATFORM_NAME": "hf_debugger_main",
25
- "BRAIN_TEMPLATE": "hf-space-cpu",
26
- "BRAIN_PROVIDER": "huggingface",
27
- "FIREBASE_ENABLED": "1",
28
- "FIREBASE_PROJECT_ID": "citadel4travels",
29
- "FIREBASE_NAMESPACE": "kapo",
30
- "BRAIN_REUSE_PUBLIC_URL_ON_RESTART": "0",
 
 
 
 
 
 
 
 
 
 
 
 
31
  }
32
 
33
 
@@ -64,8 +76,13 @@ def main() -> None:
64
  for key, value in DEFAULT_ENV.items():
65
  os.environ.setdefault(str(key), str(value))
66
  space_host = str(os.getenv('SPACE_HOST', '')).strip().rstrip('/')
 
 
67
  if space_host:
68
  public_url = space_host if space_host.startswith('http://') or space_host.startswith('https://') else f'https://{space_host}'
 
 
 
69
  os.environ['BRAIN_PUBLIC_URL'] = public_url.rstrip('/')
70
  runtime_root.mkdir(parents=True, exist_ok=True)
71
  overlay_root.mkdir(parents=True, exist_ok=True)
 
6
  from pathlib import Path
7
 
8
  DEFAULT_ENV = {
9
+ "BRAIN_AUTO_NGROK": "0",
10
+ "BRAIN_AUTO_PUBLISH_URL_ON_STARTUP": "0",
11
+ "BRAIN_LANGUAGES": "ar,en",
12
+ "BRAIN_PLATFORM_NAME": "ai_coder_main",
13
+ "BRAIN_PROVIDER": "huggingface",
14
+ "BRAIN_REUSE_PUBLIC_URL_ON_RESTART": "0",
15
+ "BRAIN_ROLES": "coding,planner,fallback",
16
+ "BRAIN_TEMPLATE": "hf-space-cpu",
17
+ "BRAIN_TUNNEL_PROVIDER": "none",
18
+ "EXECUTOR_URL": "https://hartford-resorts-resulted-dis.trycloudflare.com",
19
+ "FIREBASE_ENABLED": "0",
20
+ "FIREBASE_NAMESPACE": "kapo",
21
+ "FIREBASE_PROJECT_ID": "citadel4travels",
22
+ "GOOGLE_DRIVE_BOOTSTRAP_URL": "https://drive.google.com/uc?export=download&id=19jyBWsQ9ciJVPi2PUigu5ti3gJ24A6TG",
23
+ "HF_ACCELERATOR": "cpu",
24
+ "HF_SPACE_DOCKER": "1",
25
+ "KAGGLE_AUTO_BOOTSTRAP": "0",
26
+ "KAPO_BOOTSTRAP_URL": "https://drive.google.com/uc?export=download&id=19jyBWsQ9ciJVPi2PUigu5ti3gJ24A6TG",
27
+ "KAPO_CLOUDFLARE_QUEUE_NAME": "kapo-task-events",
28
+ "KAPO_COMPUTE_PROFILE": "cpu",
29
+ "KAPO_CONTROL_PLANE_URL": "https://kapo-control-plane.kaboalby2015.workers.dev",
30
+ "KAPO_HF_INFERENCE_API": "1",
31
+ "KAPO_HF_TRANSFORMERS_RUNTIME": "0",
32
+ "KAPO_LAZY_EMBED_STARTUP": "1",
33
+ "KAPO_LAZY_MODEL_STARTUP": "1",
34
+ "KAPO_PATCH_BUNDLE_URL": "https://drive.google.com/uc?export=download&id=16rIe05GZihhAz7ba8E-WibKaJKbh9eu1",
35
+ "KAPO_PATCH_MANIFEST_URL": "https://drive.google.com/uc?export=download&id=1jLuPMCA3hp9qstZZtpBNzTK0XOmLrV8b",
36
+ "KAPO_REMOTE_ENV_PASSWORD_B64": "Nmk4ZWItQXdNQzgzTU10Rndoem5XYTFXaG41OGdxQW0",
37
+ "KAPO_REMOTE_ENV_URL_B64": "aHR0cHM6Ly9kcml2ZS5nb29nbGUuY29tL3VjP2V4cG9ydD1kb3dubG9hZCZpZD0xb3Itd2NFbjNmYzBpekg1NGVVeEpFRXljOEFLQ2swcHo",
38
+ "KAPO_SHARED_STATE_BACKEND": "google_drive",
39
+ "MODEL_PROFILE_ID": "hf-coder-qwen25-coder-7b-instruct",
40
+ "MODEL_REPO": "Qwen/Qwen2.5-Coder-1.5B-Instruct",
41
+ "REMOTE_BRAIN_ONLY": "1",
42
+ "SPACE_PUBLIC_URL": "https://MrA7A1-AiCoder.hf.space"
43
  }
44
 
45
 
 
76
  for key, value in DEFAULT_ENV.items():
77
  os.environ.setdefault(str(key), str(value))
78
  space_host = str(os.getenv('SPACE_HOST', '')).strip().rstrip('/')
79
+ space_id = str(os.getenv('SPACE_ID', '')).strip()
80
+ public_url = ''
81
  if space_host:
82
  public_url = space_host if space_host.startswith('http://') or space_host.startswith('https://') else f'https://{space_host}'
83
+ elif space_id and '/' in space_id:
84
+ public_url = 'https://' + space_id.replace('/', '-').lower() + '.hf.space'
85
+ if public_url:
86
  os.environ['BRAIN_PUBLIC_URL'] = public_url.rstrip('/')
87
  runtime_root.mkdir(parents=True, exist_ok=True)
88
  overlay_root.mkdir(parents=True, exist_ok=True)
brain_server/api/deps.py CHANGED
@@ -7,12 +7,26 @@ from typing import Any
7
 
8
  import yaml
9
  from dotenv import load_dotenv
 
10
 
11
  CONFIG_CACHE: dict[str, Any] | None = None
12
  _LOGGING_READY = False
13
  PLACEHOLDER_RE = re.compile(r"^\$\{[A-Z0-9_]+\}$")
14
 
15
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
  def _normalize_config_paths(cfg: dict[str, Any]) -> dict[str, Any]:
17
  if os.name != "nt":
18
  return cfg
@@ -53,7 +67,8 @@ def load_config() -> dict:
53
  if CONFIG_CACHE is not None:
54
  return CONFIG_CACHE
55
 
56
- load_dotenv()
 
57
  config_path = os.path.join(os.path.dirname(__file__), "..", "config", "config.yaml")
58
  with open(config_path, "r", encoding="utf-8") as handle:
59
  raw = handle.read()
 
7
 
8
  import yaml
9
  from dotenv import load_dotenv
10
+ from shared.remote_env import load_remote_env_if_configured
11
 
12
  CONFIG_CACHE: dict[str, Any] | None = None
13
  _LOGGING_READY = False
14
  PLACEHOLDER_RE = re.compile(r"^\$\{[A-Z0-9_]+\}$")
15
 
16
 
17
+ def _load_env_stack() -> None:
18
+ candidates = [
19
+ ".env",
20
+ "kapo.env",
21
+ ".env.runtime",
22
+ ]
23
+ for candidate in candidates:
24
+ try:
25
+ load_dotenv(candidate, override=True)
26
+ except Exception:
27
+ continue
28
+
29
+
30
  def _normalize_config_paths(cfg: dict[str, Any]) -> dict[str, Any]:
31
  if os.name != "nt":
32
  return cfg
 
67
  if CONFIG_CACHE is not None:
68
  return CONFIG_CACHE
69
 
70
+ _load_env_stack()
71
+ load_remote_env_if_configured(override=True, logger_name="kapo.brain.remote_env")
72
  config_path = os.path.join(os.path.dirname(__file__), "..", "config", "config.yaml")
73
  with open(config_path, "r", encoding="utf-8") as handle:
74
  raw = handle.read()
brain_server/api/firebase_store.py CHANGED
@@ -1,28 +1,63 @@
1
- """Optional Firebase mirror for brain runtime state."""
2
 
3
  from __future__ import annotations
4
 
5
  import json
6
  import logging
7
  import os
 
8
  import time
9
  from pathlib import Path
10
  from typing import Any
11
 
 
 
12
 
13
  class FirebaseStore:
14
  def __init__(self, component: str, logger_name: str = "kapo.brain.firebase") -> None:
15
  self.component = component
16
  self.logger = logging.getLogger(logger_name)
17
  self._db = None
18
- self._cache: dict[str, tuple[float, Any]] = {}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
 
20
  def enabled(self) -> bool:
21
- return str(os.getenv("FIREBASE_ENABLED", "0")).strip().lower() in {"1", "true", "yes", "on"}
22
 
23
  def namespace(self) -> str:
24
  return str(os.getenv("FIREBASE_NAMESPACE", "kapo")).strip() or "kapo"
25
 
 
 
 
 
 
 
 
 
 
 
 
26
  def _service_payload(self) -> dict[str, Any] | None:
27
  raw = str(os.getenv("FIREBASE_SERVICE_ACCOUNT_JSON", "")).strip()
28
  if not raw:
@@ -38,40 +73,41 @@ class FirebaseStore:
38
  return str(os.getenv("FIREBASE_SERVICE_ACCOUNT_PATH", "")).strip()
39
 
40
  def _client(self):
41
- if not self.enabled():
42
- return None
43
- if self._db is not None:
44
- return self._db
45
- try:
46
- import firebase_admin
47
- from firebase_admin import credentials, firestore
48
-
49
- if not firebase_admin._apps:
50
- payload = self._service_payload()
51
- if payload:
52
- cred = credentials.Certificate(payload)
53
- else:
54
- service_path = self._service_path()
55
- if not service_path:
56
- return None
57
- path_obj = Path(service_path).expanduser()
58
- if not path_obj.exists() or not path_obj.is_file():
59
- self.logger.warning(
60
- "Firebase service account path is unavailable on this runtime: %s",
61
- service_path,
62
- )
63
- return None
64
- cred = credentials.Certificate(str(path_obj.resolve()))
65
- options = {}
66
- project_id = str(os.getenv("FIREBASE_PROJECT_ID", "")).strip()
67
- if project_id:
68
- options["projectId"] = project_id
69
- firebase_admin.initialize_app(cred, options or None)
70
- self._db = firestore.client()
71
- return self._db
72
- except Exception:
73
- self.logger.exception("Failed to initialize Firebase client")
74
  return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
 
76
  def _collection(self, name: str) -> str:
77
  return f"{self.namespace()}_{name}"
@@ -81,45 +117,177 @@ class FirebaseStore:
81
  text = str(value or "").strip() or default
82
  return "".join(ch if ch.isalnum() or ch in {"-", "_", "."} else "_" for ch in text)[:180]
83
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84
  def get_document(self, collection: str, doc_id: str, ttl_sec: float = 12.0) -> dict[str, Any]:
85
- db = self._client()
86
- if db is None:
87
- return {}
88
  safe_doc = self._safe_id(doc_id)
89
  cache_key = f"{collection}:{safe_doc}"
90
  now = time.time()
91
- cached = self._cache.get(cache_key)
92
  if cached and (now - cached[0]) < ttl_sec:
93
  return dict(cached[1] or {})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94
  try:
95
  snapshot = db.collection(self._collection(collection)).document(safe_doc).get()
96
  payload = snapshot.to_dict() if snapshot.exists else {}
97
- self._cache[cache_key] = (now, payload)
98
  return dict(payload or {})
99
- except Exception:
100
- self.logger.exception("Failed to read Firebase document %s/%s", collection, safe_doc)
101
- return {}
 
 
 
 
102
 
103
- def set_document(self, collection: str, doc_id: str, payload: dict[str, Any], merge: bool = True) -> bool:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
104
  db = self._client()
105
  if db is None:
106
  return False
107
- safe_doc = self._safe_id(doc_id)
108
  try:
109
- body = dict(payload or {})
110
- body["component"] = self.component
111
- body["updated_at"] = time.time()
112
  db.collection(self._collection(collection)).document(safe_doc).set(body, merge=merge)
113
- self._cache.pop(f"{collection}:{safe_doc}", None)
 
 
 
 
114
  return True
115
- except Exception:
116
- self.logger.exception("Failed to write Firebase document %s/%s", collection, safe_doc)
 
 
 
 
117
  return False
118
 
119
- def list_documents(self, collection: str, limit: int = 200) -> list[dict[str, Any]]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
120
  db = self._client()
121
  if db is None:
122
- return []
123
  try:
124
  docs = db.collection(self._collection(collection)).limit(max(1, int(limit))).stream()
125
  items: list[dict[str, Any]] = []
@@ -127,20 +295,62 @@ class FirebaseStore:
127
  payload = doc.to_dict() or {}
128
  payload.setdefault("id", doc.id)
129
  items.append(payload)
 
130
  return items
131
- except Exception:
132
- self.logger.exception("Failed to list Firebase collection %s", collection)
133
- return []
 
 
 
 
134
 
135
  def delete_document(self, collection: str, doc_id: str) -> bool:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
136
  db = self._client()
137
  if db is None:
138
  return False
139
- safe_doc = self._safe_id(doc_id)
140
  try:
141
  db.collection(self._collection(collection)).document(safe_doc).delete()
142
- self._cache.pop(f"{collection}:{safe_doc}", None)
 
 
 
 
 
143
  return True
144
- except Exception:
145
- self.logger.exception("Failed to delete Firebase document %s/%s", collection, safe_doc)
 
 
 
 
146
  return False
 
1
+ """Shared-state store for brain runtime via Google Drive, local files, or Firebase."""
2
 
3
  from __future__ import annotations
4
 
5
  import json
6
  import logging
7
  import os
8
+ import threading
9
  import time
10
  from pathlib import Path
11
  from typing import Any
12
 
13
+ from shared.google_drive_state import GoogleDriveStateClient
14
+
15
 
16
  class FirebaseStore:
17
  def __init__(self, component: str, logger_name: str = "kapo.brain.firebase") -> None:
18
  self.component = component
19
  self.logger = logging.getLogger(logger_name)
20
  self._db = None
21
+ self._lock = threading.Lock()
22
+ self._read_cache: dict[str, tuple[float, Any]] = {}
23
+ self._list_cache: dict[str, tuple[float, list[dict[str, Any]]]] = {}
24
+ self._write_cache: dict[str, tuple[float, str]] = {}
25
+ self._quota_backoff_until: float = 0.0
26
+ self._drive = GoogleDriveStateClient(self.logger)
27
+
28
+ def backend(self) -> str:
29
+ configured = str(os.getenv("KAPO_SHARED_STATE_BACKEND", "")).strip().lower()
30
+ if configured in {"google_drive", "drive", "gdrive"}:
31
+ return "google_drive"
32
+ if configured in {"file", "files"}:
33
+ return "file"
34
+ if configured in {"firebase", "firestore"}:
35
+ return "firebase"
36
+ if configured in {"disabled", "off", "none"}:
37
+ return "disabled"
38
+ if self._drive.enabled():
39
+ return "google_drive"
40
+ if str(os.getenv("FIREBASE_ENABLED", "0")).strip().lower() in {"1", "true", "yes", "on"}:
41
+ return "firebase"
42
+ return "file"
43
 
44
  def enabled(self) -> bool:
45
+ return self.backend() != "disabled"
46
 
47
  def namespace(self) -> str:
48
  return str(os.getenv("FIREBASE_NAMESPACE", "kapo")).strip() or "kapo"
49
 
50
+ def storage_root(self) -> Path:
51
+ configured = str(os.getenv("KAPO_SHARED_STATE_DIR", "")).strip()
52
+ if configured:
53
+ root = Path(configured).expanduser()
54
+ if not root.is_absolute():
55
+ root = Path.cwd().resolve() / root
56
+ else:
57
+ root = (Path.cwd().resolve() / "data" / "local" / "shared_state").resolve()
58
+ root.mkdir(parents=True, exist_ok=True)
59
+ return root
60
+
61
  def _service_payload(self) -> dict[str, Any] | None:
62
  raw = str(os.getenv("FIREBASE_SERVICE_ACCOUNT_JSON", "")).strip()
63
  if not raw:
 
73
  return str(os.getenv("FIREBASE_SERVICE_ACCOUNT_PATH", "")).strip()
74
 
75
  def _client(self):
76
+ if self.backend() != "firebase":
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
77
  return None
78
+ with self._lock:
79
+ if self._db is not None:
80
+ return self._db
81
+ try:
82
+ import firebase_admin
83
+ from firebase_admin import credentials, firestore
84
+
85
+ if not firebase_admin._apps:
86
+ payload = self._service_payload()
87
+ if payload:
88
+ cred = credentials.Certificate(payload)
89
+ else:
90
+ service_path = self._service_path()
91
+ if not service_path:
92
+ return None
93
+ path_obj = Path(service_path).expanduser()
94
+ if not path_obj.exists() or not path_obj.is_file():
95
+ self.logger.warning(
96
+ "Firebase service account path is unavailable on this runtime: %s",
97
+ service_path,
98
+ )
99
+ return None
100
+ cred = credentials.Certificate(str(path_obj.resolve()))
101
+ options = {}
102
+ project_id = str(os.getenv("FIREBASE_PROJECT_ID", "")).strip()
103
+ if project_id:
104
+ options["projectId"] = project_id
105
+ firebase_admin.initialize_app(cred, options or None)
106
+ self._db = firestore.client()
107
+ return self._db
108
+ except Exception:
109
+ self.logger.exception("Failed to initialize Firebase client")
110
+ return None
111
 
112
  def _collection(self, name: str) -> str:
113
  return f"{self.namespace()}_{name}"
 
117
  text = str(value or "").strip() or default
118
  return "".join(ch if ch.isalnum() or ch in {"-", "_", "."} else "_" for ch in text)[:180]
119
 
120
+ @staticmethod
121
+ def _payload_hash(payload: Any) -> str:
122
+ return json.dumps(payload, ensure_ascii=False, sort_keys=True, default=str)
123
+
124
+ @staticmethod
125
+ def _is_quota_error(exc: Exception) -> bool:
126
+ text = str(exc or "").lower()
127
+ return "resourceexhausted" in text or "quota exceeded" in text or "429" in text
128
+
129
+ def _quota_backoff_active(self) -> bool:
130
+ return time.time() < self._quota_backoff_until
131
+
132
+ def _activate_quota_backoff(self, seconds: float | None = None) -> None:
133
+ delay = float(seconds or os.getenv("FIREBASE_QUOTA_BACKOFF_SEC", "120") or 120)
134
+ self._quota_backoff_until = max(self._quota_backoff_until, time.time() + max(5.0, delay))
135
+
136
+ def _should_skip_write(self, key: str, payload: Any, min_interval_sec: float) -> bool:
137
+ now = time.time()
138
+ payload_hash = self._payload_hash(payload)
139
+ last = self._write_cache.get(key)
140
+ if last and last[1] == payload_hash and (now - last[0]) < min_interval_sec:
141
+ return True
142
+ self._write_cache[key] = (now, payload_hash)
143
+ return False
144
+
145
+ def _file_collection_dir(self, collection: str) -> Path:
146
+ path = self.storage_root() / self._safe_id(collection, "collection")
147
+ path.mkdir(parents=True, exist_ok=True)
148
+ return path
149
+
150
+ def _file_doc_path(self, collection: str, doc_id: str) -> Path:
151
+ return self._file_collection_dir(collection) / f"{self._safe_id(doc_id)}.json"
152
+
153
+ def _write_json_atomic(self, path: Path, payload: dict[str, Any]) -> None:
154
+ path.parent.mkdir(parents=True, exist_ok=True)
155
+ tmp = path.with_suffix(f"{path.suffix}.tmp")
156
+ tmp.write_text(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
157
+ tmp.replace(path)
158
+
159
  def get_document(self, collection: str, doc_id: str, ttl_sec: float = 12.0) -> dict[str, Any]:
 
 
 
160
  safe_doc = self._safe_id(doc_id)
161
  cache_key = f"{collection}:{safe_doc}"
162
  now = time.time()
163
+ cached = self._read_cache.get(cache_key)
164
  if cached and (now - cached[0]) < ttl_sec:
165
  return dict(cached[1] or {})
166
+ backend = self.backend()
167
+ if backend == "google_drive":
168
+ payload = self._drive.get_document(collection, safe_doc)
169
+ self._read_cache[cache_key] = (now, payload)
170
+ return dict(payload or {})
171
+ if backend == "file":
172
+ path = self._file_doc_path(collection, safe_doc)
173
+ if not path.exists():
174
+ return {}
175
+ try:
176
+ payload = json.loads(path.read_text(encoding="utf-8"))
177
+ except Exception:
178
+ self.logger.warning("Failed to read shared-state file %s", path, exc_info=True)
179
+ return {}
180
+ self._read_cache[cache_key] = (now, payload)
181
+ return dict(payload or {})
182
+ if self._quota_backoff_active():
183
+ return dict(cached[1] or {}) if cached else {}
184
+ db = self._client()
185
+ if db is None:
186
+ return dict(cached[1] or {}) if cached else {}
187
  try:
188
  snapshot = db.collection(self._collection(collection)).document(safe_doc).get()
189
  payload = snapshot.to_dict() if snapshot.exists else {}
190
+ self._read_cache[cache_key] = (now, payload)
191
  return dict(payload or {})
192
+ except Exception as exc:
193
+ if self._is_quota_error(exc):
194
+ self._activate_quota_backoff()
195
+ self.logger.warning("Firebase quota exceeded while reading %s/%s; using cache/backoff", collection, safe_doc)
196
+ else:
197
+ self.logger.exception("Failed to read Firebase document %s/%s", collection, safe_doc)
198
+ return dict(cached[1] or {}) if cached else {}
199
 
200
+ def set_document(self, collection: str, doc_id: str, payload: dict[str, Any], merge: bool = True, min_interval_sec: float = 5.0) -> bool:
201
+ safe_doc = self._safe_id(doc_id)
202
+ cache_key = f"{collection}:{safe_doc}"
203
+ body = dict(payload or {})
204
+ body["component"] = self.component
205
+ body["updated_at"] = time.time()
206
+ if self._should_skip_write(cache_key, body, min_interval_sec):
207
+ return True
208
+ backend = self.backend()
209
+ if backend == "google_drive":
210
+ stored = self._drive.set_document(collection, safe_doc, body, merge=merge)
211
+ if stored:
212
+ self._read_cache.pop(cache_key, None)
213
+ stale_prefix = f"{collection}:list:"
214
+ for key in list(self._list_cache.keys()):
215
+ if key.startswith(stale_prefix):
216
+ self._list_cache.pop(key, None)
217
+ return stored
218
+ if backend == "file":
219
+ try:
220
+ path = self._file_doc_path(collection, safe_doc)
221
+ existing = {}
222
+ if merge and path.exists():
223
+ existing = json.loads(path.read_text(encoding="utf-8"))
224
+ combined = {**existing, **body} if merge else body
225
+ combined.setdefault("id", safe_doc)
226
+ self._write_json_atomic(path, combined)
227
+ self._read_cache.pop(cache_key, None)
228
+ stale_prefix = f"{collection}:list:"
229
+ for key in list(self._list_cache.keys()):
230
+ if key.startswith(stale_prefix):
231
+ self._list_cache.pop(key, None)
232
+ return True
233
+ except Exception:
234
+ self.logger.warning("Failed to write shared-state file %s/%s", collection, safe_doc, exc_info=True)
235
+ return False
236
+ if self._quota_backoff_active():
237
+ return False
238
  db = self._client()
239
  if db is None:
240
  return False
 
241
  try:
242
+ if self._should_skip_write(cache_key, body, min_interval_sec):
243
+ return True
 
244
  db.collection(self._collection(collection)).document(safe_doc).set(body, merge=merge)
245
+ self._read_cache.pop(cache_key, None)
246
+ stale_prefix = f"{collection}:list:"
247
+ for key in list(self._list_cache.keys()):
248
+ if key.startswith(stale_prefix):
249
+ self._list_cache.pop(key, None)
250
  return True
251
+ except Exception as exc:
252
+ if self._is_quota_error(exc):
253
+ self._activate_quota_backoff()
254
+ self.logger.warning("Firebase quota exceeded while writing %s/%s; write skipped", collection, safe_doc)
255
+ else:
256
+ self.logger.exception("Failed to write Firebase document %s/%s", collection, safe_doc)
257
  return False
258
 
259
+ def list_documents(self, collection: str, limit: int = 200, ttl_sec: float = 30.0) -> list[dict[str, Any]]:
260
+ cache_key = f"{collection}:list:{max(1, int(limit))}"
261
+ now = time.time()
262
+ cached = self._list_cache.get(cache_key)
263
+ if cached and (now - cached[0]) < ttl_sec:
264
+ return [dict(item) for item in (cached[1] or [])]
265
+ backend = self.backend()
266
+ if backend == "google_drive":
267
+ items = self._drive.list_documents(collection, limit=max(1, int(limit)))
268
+ self._list_cache[cache_key] = (now, [dict(item) for item in items])
269
+ return items
270
+ if backend == "file":
271
+ items: list[dict[str, Any]] = []
272
+ try:
273
+ paths = sorted(
274
+ self._file_collection_dir(collection).glob("*.json"),
275
+ key=lambda path: path.stat().st_mtime,
276
+ reverse=True,
277
+ )
278
+ for path in paths[: max(1, int(limit))]:
279
+ payload = json.loads(path.read_text(encoding="utf-8"))
280
+ payload.setdefault("id", path.stem)
281
+ items.append(payload)
282
+ except Exception:
283
+ self.logger.warning("Failed to list shared-state files for %s", collection, exc_info=True)
284
+ self._list_cache[cache_key] = (now, [dict(item) for item in items])
285
+ return items
286
+ if self._quota_backoff_active():
287
+ return [dict(item) for item in ((cached[1] if cached else []) or [])]
288
  db = self._client()
289
  if db is None:
290
+ return [dict(item) for item in ((cached[1] if cached else []) or [])]
291
  try:
292
  docs = db.collection(self._collection(collection)).limit(max(1, int(limit))).stream()
293
  items: list[dict[str, Any]] = []
 
295
  payload = doc.to_dict() or {}
296
  payload.setdefault("id", doc.id)
297
  items.append(payload)
298
+ self._list_cache[cache_key] = (now, [dict(item) for item in items])
299
  return items
300
+ except Exception as exc:
301
+ if self._is_quota_error(exc):
302
+ self._activate_quota_backoff()
303
+ self.logger.warning("Firebase quota exceeded while listing %s; using cache/backoff", collection)
304
+ else:
305
+ self.logger.exception("Failed to list Firebase collection %s", collection)
306
+ return [dict(item) for item in ((cached[1] if cached else []) or [])]
307
 
308
  def delete_document(self, collection: str, doc_id: str) -> bool:
309
+ safe_doc = self._safe_id(doc_id)
310
+ backend = self.backend()
311
+ if backend == "google_drive":
312
+ deleted = self._drive.delete_document(collection, safe_doc)
313
+ if deleted:
314
+ self._read_cache.pop(f"{collection}:{safe_doc}", None)
315
+ self._write_cache.pop(f"{collection}:{safe_doc}", None)
316
+ stale_prefix = f"{collection}:list:"
317
+ for key in list(self._list_cache.keys()):
318
+ if key.startswith(stale_prefix):
319
+ self._list_cache.pop(key, None)
320
+ return deleted
321
+ if backend == "file":
322
+ try:
323
+ path = self._file_doc_path(collection, safe_doc)
324
+ if path.exists():
325
+ path.unlink()
326
+ self._read_cache.pop(f"{collection}:{safe_doc}", None)
327
+ self._write_cache.pop(f"{collection}:{safe_doc}", None)
328
+ stale_prefix = f"{collection}:list:"
329
+ for key in list(self._list_cache.keys()):
330
+ if key.startswith(stale_prefix):
331
+ self._list_cache.pop(key, None)
332
+ return True
333
+ except Exception:
334
+ self.logger.warning("Failed to delete shared-state file %s/%s", collection, safe_doc, exc_info=True)
335
+ return False
336
+ if self._quota_backoff_active():
337
+ return False
338
  db = self._client()
339
  if db is None:
340
  return False
 
341
  try:
342
  db.collection(self._collection(collection)).document(safe_doc).delete()
343
+ self._read_cache.pop(f"{collection}:{safe_doc}", None)
344
+ self._write_cache.pop(f"{collection}:{safe_doc}", None)
345
+ stale_prefix = f"{collection}:list:"
346
+ for key in list(self._list_cache.keys()):
347
+ if key.startswith(stale_prefix):
348
+ self._list_cache.pop(key, None)
349
  return True
350
+ except Exception as exc:
351
+ if self._is_quota_error(exc):
352
+ self._activate_quota_backoff()
353
+ self.logger.warning("Firebase quota exceeded while deleting %s/%s; delete skipped", collection, safe_doc)
354
+ else:
355
+ self.logger.exception("Failed to delete Firebase document %s/%s", collection, safe_doc)
356
  return False
brain_server/api/main.py CHANGED
@@ -1,4 +1,5 @@
1
  """FastAPI entrypoint for the Brain Server."""
 
2
  import gc
3
  import logging
4
  import os
@@ -28,6 +29,7 @@ try:
28
  from api.routes_analyze import router as analyze_router
29
  from api.routes_execute import router as execute_router
30
  from api.routes_plan import router as plan_router
 
31
  except ImportError:
32
  from . import deps as deps_module
33
  from .deps import get_executor_headers, get_logger, load_config
@@ -35,6 +37,7 @@ except ImportError:
35
  from .routes_analyze import router as analyze_router
36
  from .routes_execute import router as execute_router
37
  from .routes_plan import router as plan_router
 
38
 
39
  logger = get_logger("kapo.brain.main")
40
 
@@ -72,9 +75,11 @@ MODEL_ERROR = None
72
  MODEL_META = {"repo_id": None, "filename": None, "path": None}
73
  EMBED_MODEL = None
74
  FIREBASE = FirebaseStore("brain", logger_name="kapo.brain.firebase")
 
75
  FIREBASE_RUNTIME_CACHE: dict[str, tuple[float, Any]] = {}
76
  RUNTIME_LOG_BUFFER: deque[dict[str, Any]] = deque(maxlen=200)
77
  LAST_BRAIN_URL_REPORT: dict[str, Any] = {"url": "", "ts": 0.0}
 
78
 
79
  DEFAULT_MODEL_REPO = "QuantFactory/aya-expanse-8b-GGUF"
80
  DEFAULT_MODEL_FILE = "aya-expanse-8b.Q4_K_M.gguf"
@@ -125,6 +130,13 @@ def _configured_public_url() -> str:
125
  return str(os.getenv("BRAIN_PUBLIC_URL", "")).strip().rstrip("/")
126
 
127
 
 
 
 
 
 
 
 
128
  def _reuse_public_url_on_restart() -> bool:
129
  return _feature_enabled("BRAIN_REUSE_PUBLIC_URL_ON_RESTART", default=True)
130
 
@@ -137,6 +149,10 @@ def _internal_restart_in_progress() -> bool:
137
  return _feature_enabled("KAPO_INTERNAL_RESTART", default=False)
138
 
139
 
 
 
 
 
140
  def _executor_connect_timeout() -> float:
141
  return float(os.getenv("EXECUTOR_CONNECT_TIMEOUT_SEC", "3.0") or 3.0)
142
 
@@ -158,6 +174,29 @@ def _executor_roundtrip_allowed(feature_name: str, default: bool = True) -> bool
158
  return _feature_enabled(feature_name, default=effective_default)
159
 
160
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
161
  def _should_report_brain_url(public_url: str) -> bool:
162
  normalized = str(public_url or "").strip().rstrip("/")
163
  if not normalized:
@@ -291,6 +330,19 @@ def _apply_executor_settings(settings: dict[str, Any]) -> None:
291
  "FIREBASE_SERVICE_ACCOUNT_PATH",
292
  "FIREBASE_SERVICE_ACCOUNT_JSON",
293
  "FIREBASE_NAMESPACE",
 
 
 
 
 
 
 
 
 
 
 
 
 
294
  ):
295
  value = settings.get(key)
296
  if value not in (None, ""):
@@ -299,6 +351,8 @@ def _apply_executor_settings(settings: dict[str, Any]) -> None:
299
 
300
 
301
  def _apply_firebase_runtime_settings() -> None:
 
 
302
  if not FIREBASE.enabled():
303
  return
304
  shared = FIREBASE.get_document("settings", "global")
@@ -310,7 +364,6 @@ def _apply_firebase_runtime_settings() -> None:
310
  mappings = {
311
  "executor_public_url": "EXECUTOR_PUBLIC_URL",
312
  "executor_url": "EXECUTOR_URL",
313
- "current_brain_url": "BRAIN_PUBLIC_URL",
314
  "model_repo": "MODEL_REPO",
315
  "model_file": "MODEL_FILE",
316
  "model_profile_id": "MODEL_PROFILE_ID",
@@ -318,6 +371,9 @@ def _apply_firebase_runtime_settings() -> None:
318
  "brain_roles": "BRAIN_ROLES",
319
  "brain_languages": "BRAIN_LANGUAGES",
320
  }
 
 
 
321
  for key, env_name in mappings.items():
322
  value = merged.get(key)
323
  if value not in (None, ""):
@@ -339,6 +395,8 @@ def _firebase_collection_cache_key(name: str) -> str:
339
 
340
 
341
  def _firebase_list_documents_cached(collection: str, ttl_sec: float = 30.0, limit: int = 200) -> list[dict[str, Any]]:
 
 
342
  if not FIREBASE.enabled():
343
  return []
344
  key = _firebase_collection_cache_key(collection)
@@ -409,6 +467,10 @@ def _firebase_prompt_body(role_name: str, language: str = "en") -> str:
409
 
410
 
411
  def _prepare_runtime_environment() -> None:
 
 
 
 
412
  if not _is_kaggle_runtime():
413
  return
414
 
@@ -444,6 +506,10 @@ def _prepare_runtime_environment() -> None:
444
  os.environ["TOOLS_DB_PATH"] = str(data_dir / "tools.db")
445
  os.environ["FAISS_INDEX_PATH"] = str(data_dir / "faiss.index")
446
  os.environ["REMOTE_BRAIN_ONLY"] = str(os.getenv("REMOTE_BRAIN_ONLY", "1") or "1")
 
 
 
 
447
  deps_module.CONFIG_CACHE = None
448
 
449
 
@@ -494,9 +560,6 @@ def _remember_public_url(public_url: str) -> None:
494
 
495
 
496
  def _load_saved_public_url() -> str:
497
- configured = _configured_public_url()
498
- if configured:
499
- return configured
500
  try:
501
  value = _public_url_state_path().read_text(encoding="utf-8").strip().rstrip("/")
502
  return value
@@ -511,6 +574,140 @@ def _ngrok_api_state_path() -> Path:
511
  return state_dir / "ngrok_api_url.txt"
512
 
513
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
514
  def _remember_ngrok_api_url(api_url: str) -> None:
515
  value = str(api_url or "").strip().rstrip("/")
516
  if not value:
@@ -661,15 +858,57 @@ def _create_ngrok_tunnel(api_url: str, port: int) -> str | None:
661
  return str(payload.get("public_url") or "").strip().rstrip("/") or None
662
 
663
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
664
  def _report_brain_url(public_url: str) -> None:
 
665
  executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/")
666
  if not executor_url:
667
  return
668
  if not _should_report_brain_url(public_url):
669
  return
670
  last_error: Exception | None = None
671
- connect_timeout = max(1.0, float(os.getenv("BRAIN_REPORT_CONNECT_TIMEOUT_SEC", "3.0") or 3.0))
672
- read_timeout = max(2.0, float(os.getenv("BRAIN_REPORT_READ_TIMEOUT_SEC", "5.0") or 5.0))
673
  retries = max(1, int(os.getenv("BRAIN_REPORT_RETRIES", "2") or 2))
674
  for _ in range(retries):
675
  try:
@@ -689,6 +928,7 @@ def _report_brain_url(public_url: str) -> None:
689
  timeout=(connect_timeout, read_timeout),
690
  )
691
  response.raise_for_status()
 
692
  return
693
  except Exception as exc:
694
  last_error = exc
@@ -700,20 +940,25 @@ def _report_brain_url(public_url: str) -> None:
700
 
701
 
702
  def _pull_executor_settings() -> dict[str, Any]:
703
- executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/")
 
 
704
  if not executor_url:
705
  return {}
706
  try:
707
  response = requests.get(
708
  f"{executor_url}/share/settings",
709
  headers=_brain_headers(),
710
- timeout=15,
 
 
 
711
  )
712
  if response.status_code == 200:
713
  return response.json()
714
- logger.warning("Executor settings request failed: %s", response.text)
715
- except Exception:
716
- logger.warning("Failed to pull executor settings", exc_info=True)
717
  return {}
718
 
719
 
@@ -731,7 +976,7 @@ def start_ngrok(token: str | None = None) -> str | None:
731
  return saved_public_url
732
 
733
  configured_public_url = _configured_public_url()
734
- if configured_public_url:
735
  _remember_public_url(configured_public_url)
736
  _report_brain_url(configured_public_url)
737
  FIREBASE.set_document("brains", configured_public_url, {"url": configured_public_url, "status": "healthy", "source": "configured_public_url"})
@@ -790,15 +1035,26 @@ def _report_known_public_url() -> str | None:
790
  def _bootstrap_executor_handshake(start_tunnel: bool = False) -> None:
791
  executor_url = os.getenv("EXECUTOR_URL", "").strip()
792
  if not executor_url:
 
 
 
 
 
 
 
793
  logger.info("Skipping executor handshake: EXECUTOR_URL not configured")
794
  return
795
 
796
  settings = _pull_executor_settings()
797
  _apply_executor_settings(settings)
798
 
799
- public_url = _report_known_public_url()
800
- if not public_url and start_tunnel:
801
  public_url = start_ngrok(os.getenv("NGROK_AUTHTOKEN") or None)
 
 
 
 
802
  if public_url:
803
  logger.info("Brain public URL reported to executor: %s", public_url)
804
  else:
@@ -807,48 +1063,51 @@ def _bootstrap_executor_handshake(start_tunnel: bool = False) -> None:
807
 
808
  @app.on_event("startup")
809
  async def startup_event():
 
810
  try:
811
- settings = _pull_executor_settings()
812
- _apply_executor_settings(settings)
813
- except Exception:
814
- logger.exception("Executor settings bootstrap failed")
815
- try:
816
- _apply_firebase_runtime_settings()
817
  except Exception:
818
- logger.exception("Firebase runtime bootstrap failed")
819
  try:
820
  _prepare_runtime_environment()
821
  except Exception:
822
  logger.exception("Runtime environment bootstrap failed")
823
- if str(os.getenv("KAPO_LAZY_MODEL_STARTUP", "1")).strip().lower() not in {"1", "true", "yes", "on"}:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
824
  _load_default_model()
825
  try:
826
- if str(os.getenv("KAPO_LAZY_EMBED_STARTUP", "1")).strip().lower() not in {"1", "true", "yes", "on"}:
827
  _load_embed_model()
828
  except Exception:
829
  logger.exception("Embedding model startup failed")
830
  try:
831
- start_tunnel = _auto_publish_public_url_on_startup() and not _internal_restart_in_progress()
832
  _bootstrap_executor_handshake(start_tunnel=start_tunnel)
833
  except Exception:
834
  logger.exception("Executor handshake startup failed")
835
  finally:
836
  os.environ["KAPO_INTERNAL_RESTART"] = "0"
837
- FIREBASE.set_document(
838
- "brains",
839
- os.getenv("BRAIN_PUBLIC_URL") or os.getenv("KAPO_RUNTIME_ROOT") or "brain_runtime",
840
- {
841
- "url": os.getenv("BRAIN_PUBLIC_URL", ""),
842
- "runtime_root": os.getenv("KAPO_RUNTIME_ROOT", ""),
843
- "sync_root": os.getenv("KAPO_SYNC_ROOT", ""),
844
- "model_repo": os.getenv("MODEL_REPO", DEFAULT_MODEL_REPO),
845
- "model_file": os.getenv("MODEL_FILE", DEFAULT_MODEL_FILE),
846
- "model_profile_id": os.getenv("MODEL_PROFILE_ID", DEFAULT_MODEL_PROFILE_ID),
847
- "roles": os.getenv("BRAIN_ROLES", "supervisor,chat,coding,planner,arabic,fallback"),
848
- "languages": os.getenv("BRAIN_LANGUAGES", "ar,en"),
849
- "status": "starting",
850
- },
851
- )
852
 
853
 
854
  class ModelLoadRequest(BaseModel):
@@ -1461,6 +1720,7 @@ def _restart_process(delay_sec: float = 1.0) -> None:
1461
  target_root = _sync_target_root()
1462
  os.chdir(target_root)
1463
  os.environ["KAPO_INTERNAL_RESTART"] = "1"
 
1464
  if _reuse_public_url_on_restart():
1465
  current_public_url = _load_saved_public_url()
1466
  if current_public_url:
@@ -1515,6 +1775,15 @@ async def runtime_errors(limit: int = 50, level: str = "WARNING"):
1515
  return {"status": "ok", "count": len(items[-limit:]), "items": items[-limit:]}
1516
 
1517
 
 
 
 
 
 
 
 
 
 
1518
  @app.get("/model/status")
1519
  async def model_status():
1520
  return {
@@ -1668,7 +1937,7 @@ async def chat(req: ChatRequest):
1668
 
1669
  @app.post("/init-connection")
1670
  async def init_connection(payload: ConnectionInit):
1671
- os.environ["EXECUTOR_URL"] = payload.executor_url
1672
  FIREBASE.set_document("runtime", "executor", {"executor_url": payload.executor_url})
1673
  public_url = _report_known_public_url()
1674
  if not public_url:
@@ -1686,15 +1955,17 @@ async def system_publish_url(req: PublishUrlRequest | None = None):
1686
  FIREBASE.set_document("brains", explicit_public_url, {"url": explicit_public_url, "status": "healthy", "source": "explicit_publish"})
1687
  return {"status": "published", "brain_public_url": explicit_public_url, "mode": "explicit"}
1688
 
1689
- public_url = _report_known_public_url()
1690
- if public_url:
1691
- return {"status": "published", "brain_public_url": public_url, "mode": "saved"}
1692
-
1693
  if not payload.start_tunnel:
 
 
 
1694
  return {"status": "skipped", "brain_public_url": None, "mode": "none"}
1695
 
1696
  public_url = start_ngrok(payload.ngrok_token)
1697
- return {"status": "published" if public_url else "error", "brain_public_url": public_url, "mode": "ngrok"}
 
 
 
1698
 
1699
 
1700
  @app.get("/system/files")
@@ -1825,8 +2096,7 @@ async def system_restart(req: RestartRequest | None = None):
1825
  }
1826
 
1827
 
1828
- @app.get("/health")
1829
- async def health(executor_url: str | None = None, check_executor: bool = False):
1830
  cfg = load_config()
1831
  base_exec_url = (executor_url or os.getenv("EXECUTOR_URL", "")).strip().rstrip("/")
1832
  exec_ok = False
@@ -1853,7 +2123,7 @@ async def health(executor_url: str | None = None, check_executor: bool = False):
1853
  exec_error = str(exc)
1854
 
1855
  faiss_path = cfg.get("FAISS_INDEX_PATH")
1856
- payload = {
1857
  "status": "ok",
1858
  "model_loaded": MODEL is not None,
1859
  "model_error": MODEL_ERROR,
@@ -1867,11 +2137,88 @@ async def health(executor_url: str | None = None, check_executor: bool = False):
1867
  "sync_root": _sync_target_root(),
1868
  "timestamp": time.time(),
1869
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1870
  FIREBASE.set_document(
1871
  "brains",
1872
- os.getenv("BRAIN_PUBLIC_URL") or _load_saved_public_url() or os.getenv("KAPO_RUNTIME_ROOT") or "brain_runtime",
1873
  {
1874
- "url": os.getenv("BRAIN_PUBLIC_URL") or _load_saved_public_url() or "",
1875
  "health": payload,
1876
  "status": "healthy" if payload["model_loaded"] else "degraded",
1877
  "model_repo": os.getenv("MODEL_REPO", DEFAULT_MODEL_REPO),
@@ -1879,8 +2226,40 @@ async def health(executor_url: str | None = None, check_executor: bool = False):
1879
  "model_profile_id": os.getenv("MODEL_PROFILE_ID", DEFAULT_MODEL_PROFILE_ID),
1880
  "roles": os.getenv("BRAIN_ROLES", "supervisor,chat,coding,planner,arabic,fallback"),
1881
  "languages": os.getenv("BRAIN_LANGUAGES", "ar,en"),
 
 
1882
  },
 
1883
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1884
  return payload
1885
 
1886
 
 
1
  """FastAPI entrypoint for the Brain Server."""
2
+ import base64
3
  import gc
4
  import logging
5
  import os
 
29
  from api.routes_analyze import router as analyze_router
30
  from api.routes_execute import router as execute_router
31
  from api.routes_plan import router as plan_router
32
+ from shared.google_drive_state import GoogleDriveStateClient
33
  except ImportError:
34
  from . import deps as deps_module
35
  from .deps import get_executor_headers, get_logger, load_config
 
37
  from .routes_analyze import router as analyze_router
38
  from .routes_execute import router as execute_router
39
  from .routes_plan import router as plan_router
40
+ from shared.google_drive_state import GoogleDriveStateClient
41
 
42
  logger = get_logger("kapo.brain.main")
43
 
 
75
  MODEL_META = {"repo_id": None, "filename": None, "path": None}
76
  EMBED_MODEL = None
77
  FIREBASE = FirebaseStore("brain", logger_name="kapo.brain.firebase")
78
+ DRIVE_STATE = GoogleDriveStateClient(logger)
79
  FIREBASE_RUNTIME_CACHE: dict[str, tuple[float, Any]] = {}
80
  RUNTIME_LOG_BUFFER: deque[dict[str, Any]] = deque(maxlen=200)
81
  LAST_BRAIN_URL_REPORT: dict[str, Any] = {"url": "", "ts": 0.0}
82
+ RUNTIME_STATE_THREAD_STARTED = False
83
 
84
  DEFAULT_MODEL_REPO = "QuantFactory/aya-expanse-8b-GGUF"
85
  DEFAULT_MODEL_FILE = "aya-expanse-8b.Q4_K_M.gguf"
 
130
  return str(os.getenv("BRAIN_PUBLIC_URL", "")).strip().rstrip("/")
131
 
132
 
133
+ def _prefer_configured_public_url() -> bool:
134
+ provider = str(os.getenv("BRAIN_PROVIDER", "") or os.getenv("BRAIN_TEMPLATE", "")).strip().lower()
135
+ if "huggingface" in provider or "hf-space" in provider:
136
+ return True
137
+ return _feature_enabled("BRAIN_FORCE_CONFIGURED_PUBLIC_URL", default=False)
138
+
139
+
140
  def _reuse_public_url_on_restart() -> bool:
141
  return _feature_enabled("BRAIN_REUSE_PUBLIC_URL_ON_RESTART", default=True)
142
 
 
149
  return _feature_enabled("KAPO_INTERNAL_RESTART", default=False)
150
 
151
 
152
+ def _fast_restart_enabled() -> bool:
153
+ return _feature_enabled("KAPO_FAST_INTERNAL_RESTART", default=True)
154
+
155
+
156
  def _executor_connect_timeout() -> float:
157
  return float(os.getenv("EXECUTOR_CONNECT_TIMEOUT_SEC", "3.0") or 3.0)
158
 
 
174
  return _feature_enabled(feature_name, default=effective_default)
175
 
176
 
177
+ def _remote_runtime_reads_enabled() -> bool:
178
+ return _feature_enabled("KAPO_REMOTE_STATE_READS", default=False)
179
+
180
+
181
+ def _shared_state_backend() -> str:
182
+ return str(os.getenv("KAPO_SHARED_STATE_BACKEND", "")).strip().lower()
183
+
184
+
185
+ def _drive_bootstrap_configured() -> bool:
186
+ return bool(
187
+ str(os.getenv("GOOGLE_DRIVE_BOOTSTRAP_URL", "") or os.getenv("KAPO_BOOTSTRAP_URL", "") or "").strip()
188
+ )
189
+
190
+
191
+ def _bootstrap_shared_state() -> None:
192
+ if _drive_bootstrap_configured() or _shared_state_backend() in {"google_drive", "drive", "gdrive"}:
193
+ DRIVE_STATE.ensure_bootstrap_loaded(force=False)
194
+
195
+
196
+ def _startup_self_update_enabled() -> bool:
197
+ return _feature_enabled("KAPO_STARTUP_SELF_UPDATE", default=True)
198
+
199
+
200
  def _should_report_brain_url(public_url: str) -> bool:
201
  normalized = str(public_url or "").strip().rstrip("/")
202
  if not normalized:
 
330
  "FIREBASE_SERVICE_ACCOUNT_PATH",
331
  "FIREBASE_SERVICE_ACCOUNT_JSON",
332
  "FIREBASE_NAMESPACE",
333
+ "KAPO_SHARED_STATE_BACKEND",
334
+ "GOOGLE_DRIVE_SHARED_STATE_FOLDER_ID",
335
+ "GOOGLE_DRIVE_SHARED_STATE_PREFIX",
336
+ "GOOGLE_DRIVE_BOOTSTRAP_URL",
337
+ "KAPO_BOOTSTRAP_URL",
338
+ "GOOGLE_DRIVE_ACCESS_TOKEN",
339
+ "GOOGLE_DRIVE_REFRESH_TOKEN",
340
+ "GOOGLE_DRIVE_CLIENT_SECRET_JSON",
341
+ "GOOGLE_DRIVE_CLIENT_SECRET_JSON_BASE64",
342
+ "GOOGLE_DRIVE_CLIENT_SECRET_PATH",
343
+ "GOOGLE_DRIVE_TOKEN_EXPIRES_AT",
344
+ "KAPO_CONTROL_PLANE_URL",
345
+ "KAPO_CLOUDFLARE_QUEUE_NAME",
346
  ):
347
  value = settings.get(key)
348
  if value not in (None, ""):
 
351
 
352
 
353
  def _apply_firebase_runtime_settings() -> None:
354
+ if not _remote_runtime_reads_enabled():
355
+ return
356
  if not FIREBASE.enabled():
357
  return
358
  shared = FIREBASE.get_document("settings", "global")
 
364
  mappings = {
365
  "executor_public_url": "EXECUTOR_PUBLIC_URL",
366
  "executor_url": "EXECUTOR_URL",
 
367
  "model_repo": "MODEL_REPO",
368
  "model_file": "MODEL_FILE",
369
  "model_profile_id": "MODEL_PROFILE_ID",
 
371
  "brain_roles": "BRAIN_ROLES",
372
  "brain_languages": "BRAIN_LANGUAGES",
373
  }
374
+ current_brain_url = str(merged.get("current_brain_url") or "").strip()
375
+ if current_brain_url:
376
+ os.environ["KAPO_EXECUTOR_CURRENT_BRAIN_URL"] = current_brain_url
377
  for key, env_name in mappings.items():
378
  value = merged.get(key)
379
  if value not in (None, ""):
 
395
 
396
 
397
  def _firebase_list_documents_cached(collection: str, ttl_sec: float = 30.0, limit: int = 200) -> list[dict[str, Any]]:
398
+ if not _remote_runtime_reads_enabled():
399
+ return []
400
  if not FIREBASE.enabled():
401
  return []
402
  key = _firebase_collection_cache_key(collection)
 
467
 
468
 
469
  def _prepare_runtime_environment() -> None:
470
+ try:
471
+ _bootstrap_shared_state()
472
+ except Exception:
473
+ logger.warning("Shared-state bootstrap preload failed", exc_info=True)
474
  if not _is_kaggle_runtime():
475
  return
476
 
 
506
  os.environ["TOOLS_DB_PATH"] = str(data_dir / "tools.db")
507
  os.environ["FAISS_INDEX_PATH"] = str(data_dir / "faiss.index")
508
  os.environ["REMOTE_BRAIN_ONLY"] = str(os.getenv("REMOTE_BRAIN_ONLY", "1") or "1")
509
+ saved_executor_url = _load_saved_executor_url()
510
+ current_executor_url = str(os.getenv("EXECUTOR_URL", "") or "").strip()
511
+ if saved_executor_url and not current_executor_url:
512
+ os.environ["EXECUTOR_URL"] = saved_executor_url
513
  deps_module.CONFIG_CACHE = None
514
 
515
 
 
560
 
561
 
562
  def _load_saved_public_url() -> str:
 
 
 
563
  try:
564
  value = _public_url_state_path().read_text(encoding="utf-8").strip().rstrip("/")
565
  return value
 
574
  return state_dir / "ngrok_api_url.txt"
575
 
576
 
577
+ def _executor_url_state_path() -> Path:
578
+ runtime_root = Path(_sync_target_root()).resolve()
579
+ state_dir = runtime_root / "data" / "local" / "brain_runtime"
580
+ state_dir.mkdir(parents=True, exist_ok=True)
581
+ return state_dir / "executor_url.txt"
582
+
583
+
584
+ def _remember_executor_url(executor_url: str) -> None:
585
+ value = str(executor_url or "").strip().rstrip("/")
586
+ if not value:
587
+ return
588
+ os.environ["EXECUTOR_URL"] = value
589
+ try:
590
+ _executor_url_state_path().write_text(value, encoding="utf-8")
591
+ except Exception:
592
+ logger.warning("Failed to persist executor URL", exc_info=True)
593
+
594
+
595
+ def _load_saved_executor_url() -> str:
596
+ configured = str(os.getenv("EXECUTOR_URL", "")).strip().rstrip("/")
597
+ if configured:
598
+ return configured
599
+ try:
600
+ return _executor_url_state_path().read_text(encoding="utf-8").strip().rstrip("/")
601
+ except Exception:
602
+ return ""
603
+
604
+
605
+ def _brain_state_id() -> str:
606
+ public_url = str(os.getenv("BRAIN_PUBLIC_URL") or _load_saved_public_url() or "").strip().rstrip("/")
607
+ if public_url:
608
+ return re.sub(r"[^A-Za-z0-9._-]+", "_", public_url)
609
+ runtime_root = str(os.getenv("KAPO_RUNTIME_ROOT") or _sync_target_root() or "brain_runtime").strip()
610
+ return re.sub(r"[^A-Za-z0-9._-]+", "_", runtime_root)
611
+
612
+
613
+ def _applied_runtime_version_path() -> Path:
614
+ runtime_root = Path(_sync_target_root()).resolve()
615
+ state_dir = runtime_root / "data" / "local" / "brain_runtime"
616
+ state_dir.mkdir(parents=True, exist_ok=True)
617
+ return state_dir / "applied_version.json"
618
+
619
+
620
+ def _load_applied_runtime_version() -> dict[str, Any]:
621
+ try:
622
+ return dict(json.loads(_applied_runtime_version_path().read_text(encoding="utf-8")) or {})
623
+ except Exception:
624
+ return {}
625
+
626
+
627
+ def _write_applied_runtime_version(payload: dict[str, Any]) -> None:
628
+ _applied_runtime_version_path().write_text(
629
+ json.dumps(dict(payload or {}), ensure_ascii=False, indent=2, sort_keys=True),
630
+ encoding="utf-8",
631
+ )
632
+
633
+
634
+ def _download_remote_zip(url: str, destination: Path) -> Path:
635
+ response = requests.get(str(url).strip(), timeout=120)
636
+ response.raise_for_status()
637
+ destination.parent.mkdir(parents=True, exist_ok=True)
638
+ destination.write_bytes(response.content)
639
+ return destination
640
+
641
+
642
+ def _apply_zip_overlay(zip_path: Path, target_root: Path) -> None:
643
+ with zipfile.ZipFile(zip_path, "r") as archive:
644
+ archive.extractall(target_root)
645
+
646
+
647
+ def _load_patch_manifest_from_bootstrap() -> dict[str, Any]:
648
+ bootstrap = DRIVE_STATE.ensure_bootstrap_loaded(force=True) or {}
649
+ manifest_url = str(bootstrap.get("patch_manifest_url") or "").strip()
650
+ if not manifest_url:
651
+ return dict(bootstrap)
652
+ try:
653
+ response = requests.get(manifest_url, timeout=30)
654
+ response.raise_for_status()
655
+ payload = dict(response.json() or {})
656
+ merged = {**bootstrap, **payload}
657
+ return merged
658
+ except Exception:
659
+ logger.warning("Failed to load patch manifest from bootstrap URL", exc_info=True)
660
+ return dict(bootstrap)
661
+
662
+
663
+ def _run_startup_self_update() -> None:
664
+ if not _startup_self_update_enabled():
665
+ return
666
+ manifest = _load_patch_manifest_from_bootstrap()
667
+ target_version = str(manifest.get("version") or "").strip()
668
+ target_hash = str(manifest.get("build_hash") or "").strip()
669
+ if not target_version and not target_hash:
670
+ return
671
+ current = _load_applied_runtime_version()
672
+ if (
673
+ str(current.get("version") or "").strip() == target_version
674
+ and str(current.get("build_hash") or "").strip() == target_hash
675
+ and target_version
676
+ ):
677
+ return
678
+ runtime_root = Path(_sync_target_root()).resolve()
679
+ temp_dir = runtime_root / "data" / "local" / "brain_runtime" / "updates"
680
+ patch_url = str(manifest.get("patch_bundle_url") or "").strip()
681
+ full_url = str(manifest.get("full_package_url") or "").strip()
682
+ applied_mode = ""
683
+ source_url = ""
684
+ try:
685
+ if patch_url:
686
+ zip_path = _download_remote_zip(patch_url, temp_dir / "patch_bundle.zip")
687
+ _apply_zip_overlay(zip_path, runtime_root)
688
+ applied_mode = "patch_bundle"
689
+ source_url = patch_url
690
+ elif full_url:
691
+ zip_path = _download_remote_zip(full_url, temp_dir / "full_package.zip")
692
+ _apply_zip_overlay(zip_path, runtime_root)
693
+ applied_mode = "full_package"
694
+ source_url = full_url
695
+ else:
696
+ return
697
+ _write_applied_runtime_version(
698
+ {
699
+ "version": target_version,
700
+ "build_hash": target_hash,
701
+ "applied_at": time.time(),
702
+ "mode": applied_mode,
703
+ "source_url": source_url,
704
+ }
705
+ )
706
+ logger.info("Applied startup self-update (%s) version=%s", applied_mode, target_version or target_hash)
707
+ except Exception:
708
+ logger.warning("Startup self-update failed", exc_info=True)
709
+
710
+
711
  def _remember_ngrok_api_url(api_url: str) -> None:
712
  value = str(api_url or "").strip().rstrip("/")
713
  if not value:
 
858
  return str(payload.get("public_url") or "").strip().rstrip("/") or None
859
 
860
 
861
+ def _publish_brain_presence(public_url: str, *, source: str = "runtime") -> None:
862
+ normalized = str(public_url or "").strip().rstrip("/")
863
+ if not normalized:
864
+ return
865
+ payload = {
866
+ "url": normalized,
867
+ "status": "healthy",
868
+ "source": source,
869
+ "platform": "kaggle" if _is_kaggle_runtime() else str(os.getenv("BRAIN_PROVIDER") or "remote"),
870
+ "role": os.getenv("BRAIN_PRIMARY_ROLE", "fallback"),
871
+ "roles": [part.strip() for part in os.getenv("BRAIN_ROLES", "supervisor,chat,coding,planner,arabic,fallback").split(",") if part.strip()],
872
+ "languages": [part.strip() for part in os.getenv("BRAIN_LANGUAGES", "ar,en").split(",") if part.strip()],
873
+ "model_profile_id": os.getenv("MODEL_PROFILE_ID") or os.getenv("SUPERVISOR_MODEL_PROFILE_ID") or DEFAULT_MODEL_PROFILE_ID,
874
+ "model_repo": MODEL_META.get("repo_id") or os.getenv("MODEL_REPO") or DEFAULT_MODEL_REPO,
875
+ "model_file": MODEL_META.get("filename") or os.getenv("MODEL_FILE") or DEFAULT_MODEL_FILE,
876
+ "updated_at": time.time(),
877
+ }
878
+ FIREBASE.set_document("brains", normalized, payload)
879
+ FIREBASE.set_document(
880
+ "runtime",
881
+ "brains_last_report",
882
+ {
883
+ "brain_url": normalized,
884
+ "source": source,
885
+ "updated_at": time.time(),
886
+ "provider": payload["platform"],
887
+ "model_profile_id": payload["model_profile_id"],
888
+ },
889
+ )
890
+ FIREBASE.set_document(
891
+ "tunnels",
892
+ f"brain_{normalized}",
893
+ {
894
+ "kind": "brain",
895
+ "public_url": normalized,
896
+ "provider": "ngrok" if "ngrok" in normalized else payload["platform"],
897
+ "updated_at": time.time(),
898
+ },
899
+ )
900
+
901
+
902
  def _report_brain_url(public_url: str) -> None:
903
+ _publish_brain_presence(public_url, source="report_attempt")
904
  executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/")
905
  if not executor_url:
906
  return
907
  if not _should_report_brain_url(public_url):
908
  return
909
  last_error: Exception | None = None
910
+ connect_timeout = max(1.0, float(os.getenv("BRAIN_REPORT_CONNECT_TIMEOUT_SEC", "4.0") or 4.0))
911
+ read_timeout = max(5.0, float(os.getenv("BRAIN_REPORT_READ_TIMEOUT_SEC", "15.0") or 15.0))
912
  retries = max(1, int(os.getenv("BRAIN_REPORT_RETRIES", "2") or 2))
913
  for _ in range(retries):
914
  try:
 
928
  timeout=(connect_timeout, read_timeout),
929
  )
930
  response.raise_for_status()
931
+ _publish_brain_presence(public_url, source="executor_report")
932
  return
933
  except Exception as exc:
934
  last_error = exc
 
940
 
941
 
942
  def _pull_executor_settings() -> dict[str, Any]:
943
+ if _shared_state_backend() in {"google_drive", "drive", "gdrive"} or _drive_bootstrap_configured():
944
+ return {}
945
+ executor_url = _load_saved_executor_url()
946
  if not executor_url:
947
  return {}
948
  try:
949
  response = requests.get(
950
  f"{executor_url}/share/settings",
951
  headers=_brain_headers(),
952
+ timeout=(
953
+ max(1.5, float(os.getenv("EXECUTOR_CONNECT_TIMEOUT_SEC", "3.0") or 3.0)),
954
+ max(2.0, float(os.getenv("EXECUTOR_SETTINGS_READ_TIMEOUT_SEC", "5.0") or 5.0)),
955
+ ),
956
  )
957
  if response.status_code == 200:
958
  return response.json()
959
+ logger.warning("Executor settings request failed: %s", response.text[:400])
960
+ except Exception as exc:
961
+ logger.warning("Failed to pull executor settings (%s)", exc)
962
  return {}
963
 
964
 
 
976
  return saved_public_url
977
 
978
  configured_public_url = _configured_public_url()
979
+ if configured_public_url and _prefer_configured_public_url():
980
  _remember_public_url(configured_public_url)
981
  _report_brain_url(configured_public_url)
982
  FIREBASE.set_document("brains", configured_public_url, {"url": configured_public_url, "status": "healthy", "source": "configured_public_url"})
 
1035
  def _bootstrap_executor_handshake(start_tunnel: bool = False) -> None:
1036
  executor_url = os.getenv("EXECUTOR_URL", "").strip()
1037
  if not executor_url:
1038
+ if start_tunnel:
1039
+ public_url = start_ngrok(os.getenv("NGROK_AUTHTOKEN") or None)
1040
+ if public_url:
1041
+ logger.info("Brain public URL started locally without executor handshake: %s", public_url)
1042
+ else:
1043
+ logger.info("Brain started without publishing a public URL")
1044
+ return
1045
  logger.info("Skipping executor handshake: EXECUTOR_URL not configured")
1046
  return
1047
 
1048
  settings = _pull_executor_settings()
1049
  _apply_executor_settings(settings)
1050
 
1051
+ public_url = None
1052
+ if start_tunnel:
1053
  public_url = start_ngrok(os.getenv("NGROK_AUTHTOKEN") or None)
1054
+ if not public_url:
1055
+ public_url = _report_known_public_url()
1056
+ else:
1057
+ public_url = _report_known_public_url()
1058
  if public_url:
1059
  logger.info("Brain public URL reported to executor: %s", public_url)
1060
  else:
 
1063
 
1064
  @app.on_event("startup")
1065
  async def startup_event():
1066
+ global RUNTIME_STATE_THREAD_STARTED
1067
  try:
1068
+ _bootstrap_shared_state()
 
 
 
 
 
1069
  except Exception:
1070
+ logger.exception("Shared-state bootstrap failed")
1071
  try:
1072
  _prepare_runtime_environment()
1073
  except Exception:
1074
  logger.exception("Runtime environment bootstrap failed")
1075
+ try:
1076
+ _run_startup_self_update()
1077
+ except Exception:
1078
+ logger.exception("Startup self-update bootstrap failed")
1079
+ internal_restart = _internal_restart_in_progress()
1080
+ fast_restart = internal_restart and _fast_restart_enabled()
1081
+ if not fast_restart:
1082
+ try:
1083
+ settings = _pull_executor_settings()
1084
+ _apply_executor_settings(settings)
1085
+ except Exception:
1086
+ logger.exception("Executor settings bootstrap failed")
1087
+ try:
1088
+ _apply_firebase_runtime_settings()
1089
+ except Exception:
1090
+ logger.exception("Firebase runtime bootstrap failed")
1091
+ else:
1092
+ logger.info("Fast internal restart enabled; skipping executor/Firebase startup bootstrap")
1093
+ if not fast_restart:
1094
  _load_default_model()
1095
  try:
1096
+ if not fast_restart:
1097
  _load_embed_model()
1098
  except Exception:
1099
  logger.exception("Embedding model startup failed")
1100
  try:
1101
+ start_tunnel = _auto_publish_public_url_on_startup() and not internal_restart
1102
  _bootstrap_executor_handshake(start_tunnel=start_tunnel)
1103
  except Exception:
1104
  logger.exception("Executor handshake startup failed")
1105
  finally:
1106
  os.environ["KAPO_INTERNAL_RESTART"] = "0"
1107
+ _persist_runtime_state_snapshot(reason="startup")
1108
+ if not RUNTIME_STATE_THREAD_STARTED:
1109
+ RUNTIME_STATE_THREAD_STARTED = True
1110
+ threading.Thread(target=_runtime_state_pulse, daemon=True).start()
 
 
 
 
 
 
 
 
 
 
 
1111
 
1112
 
1113
  class ModelLoadRequest(BaseModel):
 
1720
  target_root = _sync_target_root()
1721
  os.chdir(target_root)
1722
  os.environ["KAPO_INTERNAL_RESTART"] = "1"
1723
+ os.environ.setdefault("KAPO_FAST_INTERNAL_RESTART", "1")
1724
  if _reuse_public_url_on_restart():
1725
  current_public_url = _load_saved_public_url()
1726
  if current_public_url:
 
1775
  return {"status": "ok", "count": len(items[-limit:]), "items": items[-limit:]}
1776
 
1777
 
1778
+ @app.get("/runtime/modernization")
1779
+ async def runtime_modernization():
1780
+ try:
1781
+ return {"status": "ok", "modernization": _runtime_modernization_snapshot()}
1782
+ except Exception as exc:
1783
+ logger.warning("Failed to build runtime modernization snapshot", exc_info=True)
1784
+ return {"status": "degraded", "detail": str(exc), "modernization": {}}
1785
+
1786
+
1787
  @app.get("/model/status")
1788
  async def model_status():
1789
  return {
 
1937
 
1938
  @app.post("/init-connection")
1939
  async def init_connection(payload: ConnectionInit):
1940
+ _remember_executor_url(payload.executor_url)
1941
  FIREBASE.set_document("runtime", "executor", {"executor_url": payload.executor_url})
1942
  public_url = _report_known_public_url()
1943
  if not public_url:
 
1955
  FIREBASE.set_document("brains", explicit_public_url, {"url": explicit_public_url, "status": "healthy", "source": "explicit_publish"})
1956
  return {"status": "published", "brain_public_url": explicit_public_url, "mode": "explicit"}
1957
 
 
 
 
 
1958
  if not payload.start_tunnel:
1959
+ public_url = _report_known_public_url()
1960
+ if public_url:
1961
+ return {"status": "published", "brain_public_url": public_url, "mode": "saved"}
1962
  return {"status": "skipped", "brain_public_url": None, "mode": "none"}
1963
 
1964
  public_url = start_ngrok(payload.ngrok_token)
1965
+ if public_url:
1966
+ return {"status": "published", "brain_public_url": public_url, "mode": "ngrok"}
1967
+ public_url = _report_known_public_url()
1968
+ return {"status": "published" if public_url else "error", "brain_public_url": public_url, "mode": "saved" if public_url else "none"}
1969
 
1970
 
1971
  @app.get("/system/files")
 
2096
  }
2097
 
2098
 
2099
+ def _health_payload(check_executor: bool = False, executor_url: str | None = None) -> dict[str, Any]:
 
2100
  cfg = load_config()
2101
  base_exec_url = (executor_url or os.getenv("EXECUTOR_URL", "")).strip().rstrip("/")
2102
  exec_ok = False
 
2123
  exec_error = str(exc)
2124
 
2125
  faiss_path = cfg.get("FAISS_INDEX_PATH")
2126
+ return {
2127
  "status": "ok",
2128
  "model_loaded": MODEL is not None,
2129
  "model_error": MODEL_ERROR,
 
2137
  "sync_root": _sync_target_root(),
2138
  "timestamp": time.time(),
2139
  }
2140
+
2141
+
2142
+ def _decode_b64_text(value: str) -> str:
2143
+ raw = str(value or "").strip()
2144
+ if not raw:
2145
+ return ""
2146
+ padding = "=" * (-len(raw) % 4)
2147
+ try:
2148
+ return base64.urlsafe_b64decode((raw + padding).encode("utf-8")).decode("utf-8").strip()
2149
+ except Exception:
2150
+ return ""
2151
+
2152
+
2153
+ def _runtime_modernization_snapshot() -> dict[str, Any]:
2154
+ bootstrap = DRIVE_STATE.ensure_bootstrap_loaded(force=False) or {}
2155
+ patch_manifest_url = str(
2156
+ os.getenv("KAPO_PATCH_MANIFEST_URL", "")
2157
+ or bootstrap.get("patch_manifest_url")
2158
+ or ""
2159
+ ).strip()
2160
+ remote_env_url = _decode_b64_text(os.getenv("KAPO_REMOTE_ENV_URL_B64", "")) or str(os.getenv("KAPO_REMOTE_ENV_URL", "") or "").strip()
2161
+ public_url = str(os.getenv("BRAIN_PUBLIC_URL", "") or LAST_BRAIN_URL_REPORT.get("url") or _load_saved_public_url() or "").strip()
2162
+ applied_version = _load_applied_runtime_version()
2163
+ control_plane_url = str(os.getenv("KAPO_CONTROL_PLANE_URL", "") or "").strip()
2164
+ queue_name = str(os.getenv("KAPO_CLOUDFLARE_QUEUE_NAME", "") or "").strip()
2165
+ return {
2166
+ "shared_state_backend": _shared_state_backend(),
2167
+ "firebase_enabled": str(os.getenv("FIREBASE_ENABLED", "0")).strip().lower() in {"1", "true", "yes", "on"},
2168
+ "drive": {
2169
+ "folder_id": str(os.getenv("GOOGLE_DRIVE_SHARED_STATE_FOLDER_ID", "") or os.getenv("GOOGLE_DRIVE_STORAGE_FOLDER_ID", "") or "").strip(),
2170
+ "prefix": str(os.getenv("GOOGLE_DRIVE_SHARED_STATE_PREFIX", "") or os.getenv("GOOGLE_DRIVE_STORAGE_PREFIX", "") or "").strip(),
2171
+ "bootstrap_loaded": bool(bootstrap),
2172
+ },
2173
+ "bootstrap": {
2174
+ "configured": bool(str(os.getenv("KAPO_BOOTSTRAP_URL", "") or os.getenv("GOOGLE_DRIVE_BOOTSTRAP_URL", "") or "").strip()),
2175
+ "url": str(os.getenv("KAPO_BOOTSTRAP_URL", "") or os.getenv("GOOGLE_DRIVE_BOOTSTRAP_URL", "") or "").strip(),
2176
+ "loaded": bool(bootstrap),
2177
+ "version": str(bootstrap.get("version") or "").strip(),
2178
+ "updated_at": bootstrap.get("updated_at"),
2179
+ },
2180
+ "patch": {
2181
+ "configured": bool(patch_manifest_url),
2182
+ "manifest_url": patch_manifest_url,
2183
+ "startup_self_update_enabled": _startup_self_update_enabled(),
2184
+ "applied_version": applied_version,
2185
+ },
2186
+ "remote_env": {
2187
+ "configured": bool(remote_env_url),
2188
+ "loaded": str(os.getenv("KAPO_REMOTE_ENV_LOADED", "0")).strip().lower() in {"1", "true", "yes", "on"},
2189
+ "url": remote_env_url,
2190
+ },
2191
+ "control_plane": {
2192
+ "configured": bool(control_plane_url),
2193
+ "url": control_plane_url,
2194
+ "queue_name": queue_name,
2195
+ "queue_configured": bool(queue_name),
2196
+ },
2197
+ "transport": {
2198
+ "provider": str(os.getenv("BRAIN_TUNNEL_PROVIDER", "ngrok") or "ngrok").strip().lower(),
2199
+ "auto_ngrok": _ngrok_bootstrap_enabled(),
2200
+ "reuse_public_url_on_restart": _reuse_public_url_on_restart(),
2201
+ "public_url": public_url,
2202
+ "executor_url": str(os.getenv("EXECUTOR_URL", "") or "").strip(),
2203
+ },
2204
+ "runtime": {
2205
+ "root": str(_sync_target_root()),
2206
+ "model_profile_id": str(os.getenv("MODEL_PROFILE_ID", "") or "").strip(),
2207
+ "model_loaded": MODEL is not None,
2208
+ "embed_loaded": EMBED_MODEL is not None,
2209
+ },
2210
+ }
2211
+
2212
+
2213
+ def _persist_runtime_state_snapshot(reason: str = "periodic") -> dict[str, Any]:
2214
+ payload = _health_payload(check_executor=False)
2215
+ public_url = os.getenv("BRAIN_PUBLIC_URL") or _load_saved_public_url() or ""
2216
+ state_id = _brain_state_id()
2217
  FIREBASE.set_document(
2218
  "brains",
2219
+ public_url or state_id,
2220
  {
2221
+ "url": public_url,
2222
  "health": payload,
2223
  "status": "healthy" if payload["model_loaded"] else "degraded",
2224
  "model_repo": os.getenv("MODEL_REPO", DEFAULT_MODEL_REPO),
 
2226
  "model_profile_id": os.getenv("MODEL_PROFILE_ID", DEFAULT_MODEL_PROFILE_ID),
2227
  "roles": os.getenv("BRAIN_ROLES", "supervisor,chat,coding,planner,arabic,fallback"),
2228
  "languages": os.getenv("BRAIN_LANGUAGES", "ar,en"),
2229
+ "updated_at": time.time(),
2230
+ "source": reason,
2231
  },
2232
+ min_interval_sec=5.0,
2233
  )
2234
+ FIREBASE.set_document(
2235
+ "brain_runtime",
2236
+ state_id,
2237
+ {
2238
+ "brain_url": public_url,
2239
+ "reason": reason,
2240
+ "health": payload,
2241
+ "recent_logs": list(RUNTIME_LOG_BUFFER)[-25:],
2242
+ "updated_at": time.time(),
2243
+ },
2244
+ min_interval_sec=5.0,
2245
+ )
2246
+ return payload
2247
+
2248
+
2249
+ def _runtime_state_pulse() -> None:
2250
+ interval = max(20.0, float(os.getenv("BRAIN_STATE_SYNC_INTERVAL_SEC", "60") or 60))
2251
+ while True:
2252
+ try:
2253
+ _persist_runtime_state_snapshot(reason="background_pulse")
2254
+ except Exception:
2255
+ logger.warning("Background runtime state sync failed", exc_info=True)
2256
+ time.sleep(interval)
2257
+
2258
+
2259
+ @app.get("/health")
2260
+ async def health(executor_url: str | None = None, check_executor: bool = False):
2261
+ payload = _health_payload(check_executor=check_executor, executor_url=executor_url)
2262
+ _persist_runtime_state_snapshot(reason="health_endpoint")
2263
  return payload
2264
 
2265
 
brain_server/requirements.txt CHANGED
@@ -15,6 +15,7 @@ huggingface_hub>=0.33.5,<2.0
15
  pyngrok==7.1.6
16
  firebase-admin==6.5.0
17
  starlette>=0.40.0,<1.0
 
18
 
19
  sentence-transformers
20
  transformers
 
15
  pyngrok==7.1.6
16
  firebase-admin==6.5.0
17
  starlette>=0.40.0,<1.0
18
+ websockets==12.0
19
 
20
  sentence-transformers
21
  transformers
requirements.txt CHANGED
@@ -1,18 +1,9 @@
1
- numpy<2.0.0
2
  fastapi>=0.115.2,<1.0
3
  uvicorn==0.30.1
4
  pydantic==2.7.3
5
  python-dotenv==1.0.1
 
6
  requests==2.32.3
7
- python-multipart>=0.0.18
8
- sqlalchemy==2.0.30
9
- sentence-transformers==3.0.1
10
- faiss-cpu==1.8.0
11
  python-json-logger==2.0.7
12
- langgraph==0.0.50
13
  huggingface_hub>=0.33.5,<2.0
14
- firebase-admin==6.5.0
15
  starlette>=0.40.0,<1.0
16
- transformers>=4.46.0,<5.0
17
- accelerate>=0.34.0,<1.0
18
- datasets>=2.21.0,<4.0
 
 
1
  fastapi>=0.115.2,<1.0
2
  uvicorn==0.30.1
3
  pydantic==2.7.3
4
  python-dotenv==1.0.1
5
+ PyYAML>=6.0,<7.0
6
  requests==2.32.3
 
 
 
 
7
  python-json-logger==2.0.7
 
8
  huggingface_hub>=0.33.5,<2.0
 
9
  starlette>=0.40.0,<1.0
 
 
 
shared/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ """Shared utilities for KAPO runtimes."""
shared/google_drive_state.py ADDED
@@ -0,0 +1,409 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Google Drive backed shared-state utilities."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import base64
6
+ import json
7
+ import logging
8
+ import os
9
+ import sqlite3
10
+ import threading
11
+ import time
12
+ from pathlib import Path
13
+ from typing import Any
14
+
15
+ import requests
16
+
17
+
18
+ def _safe_name(value: str, default: str = "item") -> str:
19
+ text = str(value or "").strip() or default
20
+ return "".join(ch if ch.isalnum() or ch in {"-", "_", "."} else "_" for ch in text)[:180]
21
+
22
+
23
+ class GoogleDriveStateClient:
24
+ def __init__(self, logger: logging.Logger) -> None:
25
+ self.logger = logger
26
+ self._folder_cache: dict[str, str] = {}
27
+ self._file_cache: dict[str, str] = {}
28
+ self._bootstrap_lock = threading.Lock()
29
+ self._bootstrap_loaded_at = 0.0
30
+ self._bootstrap_payload: dict[str, Any] = {}
31
+
32
+ def _bootstrap_url(self) -> str:
33
+ return str(os.getenv("GOOGLE_DRIVE_BOOTSTRAP_URL", "") or os.getenv("KAPO_BOOTSTRAP_URL", "") or "").strip()
34
+
35
+ def _bootstrap_ttl(self) -> float:
36
+ return float(os.getenv("GOOGLE_DRIVE_BOOTSTRAP_TTL_SEC", "300") or 300)
37
+
38
+ def _apply_bootstrap_payload(self, payload: dict[str, Any]) -> None:
39
+ mappings = {
40
+ "shared_state_backend": "KAPO_SHARED_STATE_BACKEND",
41
+ "shared_state_folder_id": "GOOGLE_DRIVE_SHARED_STATE_FOLDER_ID",
42
+ "shared_state_prefix": "GOOGLE_DRIVE_SHARED_STATE_PREFIX",
43
+ "storage_folder_id": "GOOGLE_DRIVE_STORAGE_FOLDER_ID",
44
+ "storage_prefix": "GOOGLE_DRIVE_STORAGE_PREFIX",
45
+ "access_token": "GOOGLE_DRIVE_ACCESS_TOKEN",
46
+ "refresh_token": "GOOGLE_DRIVE_REFRESH_TOKEN",
47
+ "client_secret_json": "GOOGLE_DRIVE_CLIENT_SECRET_JSON",
48
+ "client_secret_json_base64": "GOOGLE_DRIVE_CLIENT_SECRET_JSON_BASE64",
49
+ "client_secret_path": "GOOGLE_DRIVE_CLIENT_SECRET_PATH",
50
+ "token_expires_at": "GOOGLE_DRIVE_TOKEN_EXPIRES_AT",
51
+ "firebase_enabled": "FIREBASE_ENABLED",
52
+ "executor_url": "EXECUTOR_URL",
53
+ "executor_public_url": "EXECUTOR_PUBLIC_URL",
54
+ "ngrok_authtoken": "NGROK_AUTHTOKEN",
55
+ "brain_public_url": "BRAIN_PUBLIC_URL",
56
+ "brain_roles": "BRAIN_ROLES",
57
+ "brain_languages": "BRAIN_LANGUAGES",
58
+ }
59
+ for key, env_name in mappings.items():
60
+ value = payload.get(key)
61
+ if value not in (None, ""):
62
+ os.environ[env_name] = str(value)
63
+
64
+ def ensure_bootstrap_loaded(self, force: bool = False) -> dict[str, Any]:
65
+ bootstrap_url = self._bootstrap_url()
66
+ if not bootstrap_url:
67
+ return {}
68
+ now = time.time()
69
+ if not force and self._bootstrap_payload and (now - self._bootstrap_loaded_at) < self._bootstrap_ttl():
70
+ return dict(self._bootstrap_payload)
71
+ with self._bootstrap_lock:
72
+ now = time.time()
73
+ if not force and self._bootstrap_payload and (now - self._bootstrap_loaded_at) < self._bootstrap_ttl():
74
+ return dict(self._bootstrap_payload)
75
+ try:
76
+ response = requests.get(bootstrap_url, timeout=20)
77
+ response.raise_for_status()
78
+ payload = dict(response.json() or {})
79
+ self._bootstrap_payload = payload
80
+ self._bootstrap_loaded_at = time.time()
81
+ self._apply_bootstrap_payload(payload)
82
+ return dict(payload)
83
+ except Exception:
84
+ self.logger.warning("Failed to load Google Drive bootstrap manifest", exc_info=True)
85
+ return dict(self._bootstrap_payload)
86
+
87
+ def enabled(self) -> bool:
88
+ self.ensure_bootstrap_loaded()
89
+ return bool(self.root_folder_id())
90
+
91
+ def root_folder_id(self) -> str:
92
+ return str(
93
+ os.getenv("GOOGLE_DRIVE_SHARED_STATE_FOLDER_ID", "")
94
+ or os.getenv("GOOGLE_DRIVE_STORAGE_FOLDER_ID", "")
95
+ or ""
96
+ ).strip()
97
+
98
+ def prefix(self) -> str:
99
+ return str(
100
+ os.getenv("GOOGLE_DRIVE_SHARED_STATE_PREFIX", "")
101
+ or os.getenv("GOOGLE_DRIVE_STORAGE_PREFIX", "")
102
+ or "kapo/shared_state"
103
+ ).strip("/ ")
104
+
105
+ def _control_plane_token(self, label: str) -> str:
106
+ candidates = [
107
+ Path.cwd().resolve() / "data" / "local" / "control_plane.db",
108
+ Path.cwd().resolve() / "data" / "drive_cache" / "control_plane.db",
109
+ ]
110
+ for db_path in candidates:
111
+ if not db_path.exists():
112
+ continue
113
+ try:
114
+ conn = sqlite3.connect(db_path)
115
+ conn.row_factory = sqlite3.Row
116
+ row = conn.execute(
117
+ "SELECT token FROM ngrok_tokens WHERE label = ? AND enabled = 1",
118
+ (label,),
119
+ ).fetchone()
120
+ conn.close()
121
+ token = str(row["token"]) if row else ""
122
+ if token:
123
+ return token
124
+ except Exception:
125
+ self.logger.warning("Failed to read Google Drive token from %s", db_path, exc_info=True)
126
+ return ""
127
+
128
+ def _client_secret_payload(self) -> dict[str, Any] | None:
129
+ raw = str(os.getenv("GOOGLE_DRIVE_CLIENT_SECRET_JSON", "")).strip()
130
+ raw_b64 = str(os.getenv("GOOGLE_DRIVE_CLIENT_SECRET_JSON_BASE64", "")).strip()
131
+ path = str(os.getenv("GOOGLE_DRIVE_CLIENT_SECRET_PATH", "")).strip()
132
+ if raw:
133
+ try:
134
+ return json.loads(raw)
135
+ except Exception:
136
+ self.logger.warning("Invalid GOOGLE_DRIVE_CLIENT_SECRET_JSON", exc_info=True)
137
+ if raw_b64:
138
+ try:
139
+ return json.loads(base64.b64decode(raw_b64).decode("utf-8"))
140
+ except Exception:
141
+ self.logger.warning("Invalid GOOGLE_DRIVE_CLIENT_SECRET_JSON_BASE64", exc_info=True)
142
+ if path:
143
+ try:
144
+ return json.loads(Path(path).expanduser().read_text(encoding="utf-8"))
145
+ except Exception:
146
+ self.logger.warning("Failed to load Google Drive client secret path %s", path, exc_info=True)
147
+ client_id = str(os.getenv("GOOGLE_DRIVE_CLIENT_ID", "")).strip()
148
+ client_secret = str(os.getenv("GOOGLE_DRIVE_CLIENT_SECRET", "")).strip()
149
+ token_uri = str(os.getenv("GOOGLE_DRIVE_TOKEN_URI", "https://oauth2.googleapis.com/token")).strip()
150
+ if client_id and client_secret:
151
+ return {
152
+ "installed": {
153
+ "client_id": client_id,
154
+ "client_secret": client_secret,
155
+ "token_uri": token_uri,
156
+ }
157
+ }
158
+ return None
159
+
160
+ def _oauth_client(self) -> dict[str, str]:
161
+ payload = self._client_secret_payload() or {}
162
+ installed = payload.get("installed") or payload.get("web") or {}
163
+ return {
164
+ "client_id": str(installed.get("client_id") or "").strip(),
165
+ "client_secret": str(installed.get("client_secret") or "").strip(),
166
+ "token_uri": str(installed.get("token_uri") or "https://oauth2.googleapis.com/token").strip(),
167
+ }
168
+
169
+ def _refresh_token(self) -> str:
170
+ return str(os.getenv("GOOGLE_DRIVE_REFRESH_TOKEN", "") or self._control_plane_token("google_drive_refresh_token") or "").strip()
171
+
172
+ def _access_token(self) -> str:
173
+ token = str(os.getenv("GOOGLE_DRIVE_ACCESS_TOKEN", "") or self._control_plane_token("google_drive_access_token") or "").strip()
174
+ expires_at = float(str(os.getenv("GOOGLE_DRIVE_TOKEN_EXPIRES_AT", "0") or "0") or 0)
175
+ if token and (not expires_at or expires_at > (time.time() + 60)):
176
+ return token
177
+ refreshed = self._refresh_access_token()
178
+ return refreshed or token
179
+
180
+ def _refresh_access_token(self) -> str:
181
+ refresh_token = self._refresh_token()
182
+ client = self._oauth_client()
183
+ if not refresh_token or not client["client_id"] or not client["client_secret"]:
184
+ return ""
185
+ response = requests.post(
186
+ client["token_uri"],
187
+ data={
188
+ "grant_type": "refresh_token",
189
+ "refresh_token": refresh_token,
190
+ "client_id": client["client_id"],
191
+ "client_secret": client["client_secret"],
192
+ },
193
+ timeout=30,
194
+ )
195
+ response.raise_for_status()
196
+ payload = response.json()
197
+ access_token = str(payload.get("access_token") or "").strip()
198
+ expires_in = float(payload.get("expires_in") or 0)
199
+ if access_token:
200
+ os.environ["GOOGLE_DRIVE_ACCESS_TOKEN"] = access_token
201
+ os.environ["GOOGLE_DRIVE_TOKEN_EXPIRES_AT"] = str(time.time() + expires_in) if expires_in else "0"
202
+ return access_token
203
+
204
+ def _request(self, method: str, url: str, *, headers: dict[str, str] | None = None, timeout: int = 60, **kwargs) -> requests.Response:
205
+ token = self._access_token()
206
+ if not token:
207
+ raise ValueError("Google Drive access token is not configured")
208
+ base_headers = dict(headers or {})
209
+ response: requests.Response | None = None
210
+ for _ in range(2):
211
+ response = requests.request(
212
+ method,
213
+ url,
214
+ headers={**base_headers, "Authorization": f"Bearer {token}"},
215
+ timeout=timeout,
216
+ **kwargs,
217
+ )
218
+ if response.status_code != 401:
219
+ response.raise_for_status()
220
+ return response
221
+ token = self._refresh_access_token()
222
+ if not token:
223
+ break
224
+ assert response is not None
225
+ response.raise_for_status()
226
+ return response
227
+
228
+ @staticmethod
229
+ def _escape_query(value: str) -> str:
230
+ return str(value or "").replace("\\", "\\\\").replace("'", "\\'")
231
+
232
+ def _find_folder(self, name: str, parent_id: str) -> str:
233
+ query = (
234
+ "mimeType = 'application/vnd.google-apps.folder' and trashed = false "
235
+ f"and name = '{self._escape_query(name)}'"
236
+ )
237
+ if parent_id:
238
+ query += f" and '{self._escape_query(parent_id)}' in parents"
239
+ response = self._request(
240
+ "GET",
241
+ "https://www.googleapis.com/drive/v3/files",
242
+ params={"q": query, "fields": "files(id,name)", "pageSize": 10, "supportsAllDrives": "true"},
243
+ )
244
+ files = response.json().get("files", [])
245
+ return str(files[0].get("id") or "").strip() if files else ""
246
+
247
+ def _create_folder(self, name: str, parent_id: str) -> str:
248
+ payload: dict[str, Any] = {"name": name, "mimeType": "application/vnd.google-apps.folder"}
249
+ if parent_id:
250
+ payload["parents"] = [parent_id]
251
+ response = self._request(
252
+ "POST",
253
+ "https://www.googleapis.com/drive/v3/files?supportsAllDrives=true",
254
+ headers={"Content-Type": "application/json; charset=UTF-8"},
255
+ json=payload,
256
+ )
257
+ return str(response.json().get("id") or "").strip()
258
+
259
+ def _ensure_folder_path(self, relative_path: str) -> str:
260
+ current_parent = self.root_folder_id()
261
+ parts = [part for part in f"{self.prefix()}/{relative_path}".replace("\\", "/").split("/") if part.strip()]
262
+ for part in parts:
263
+ safe_part = _safe_name(part, "folder")
264
+ cache_key = f"{current_parent}:{safe_part}"
265
+ cached = self._folder_cache.get(cache_key)
266
+ if cached:
267
+ current_parent = cached
268
+ continue
269
+ folder_id = self._find_folder(safe_part, current_parent)
270
+ if not folder_id:
271
+ folder_id = self._create_folder(safe_part, current_parent)
272
+ self._folder_cache[cache_key] = folder_id
273
+ current_parent = folder_id
274
+ return current_parent
275
+
276
+ def _find_file(self, folder_id: str, file_name: str) -> str:
277
+ cache_key = f"{folder_id}:{file_name}"
278
+ cached = self._file_cache.get(cache_key)
279
+ if cached:
280
+ return cached
281
+ query = (
282
+ "trashed = false "
283
+ f"and name = '{self._escape_query(file_name)}' "
284
+ f"and '{self._escape_query(folder_id)}' in parents"
285
+ )
286
+ response = self._request(
287
+ "GET",
288
+ "https://www.googleapis.com/drive/v3/files",
289
+ params={"q": query, "fields": "files(id,name)", "pageSize": 10, "supportsAllDrives": "true"},
290
+ )
291
+ files = response.json().get("files", [])
292
+ file_id = str(files[0].get("id") or "").strip() if files else ""
293
+ if file_id:
294
+ self._file_cache[cache_key] = file_id
295
+ return file_id
296
+
297
+ def _create_file(self, folder_id: str, file_name: str) -> str:
298
+ response = self._request(
299
+ "POST",
300
+ "https://www.googleapis.com/drive/v3/files?supportsAllDrives=true",
301
+ headers={"Content-Type": "application/json; charset=UTF-8"},
302
+ json={"name": file_name, "parents": [folder_id]},
303
+ )
304
+ file_id = str(response.json().get("id") or "").strip()
305
+ if file_id:
306
+ self._file_cache[f"{folder_id}:{file_name}"] = file_id
307
+ return file_id
308
+
309
+ def _upload_json(self, file_id: str, payload: dict[str, Any]) -> None:
310
+ self._request(
311
+ "PATCH",
312
+ f"https://www.googleapis.com/upload/drive/v3/files/{file_id}?uploadType=media&supportsAllDrives=true",
313
+ headers={"Content-Type": "application/json; charset=UTF-8"},
314
+ data=json.dumps(payload, ensure_ascii=False, sort_keys=True).encode("utf-8"),
315
+ )
316
+
317
+ def _download_json(self, file_id: str) -> dict[str, Any]:
318
+ response = self._request(
319
+ "GET",
320
+ f"https://www.googleapis.com/drive/v3/files/{file_id}?alt=media&supportsAllDrives=true",
321
+ )
322
+ return dict(response.json() or {})
323
+
324
+ def get_document(self, collection: str, doc_id: str) -> dict[str, Any]:
325
+ if not self.enabled():
326
+ return {}
327
+ folder_id = self._ensure_folder_path(_safe_name(collection, "collection"))
328
+ file_name = f"{_safe_name(doc_id)}.json"
329
+ file_id = self._find_file(folder_id, file_name)
330
+ if not file_id:
331
+ return {}
332
+ try:
333
+ payload = self._download_json(file_id)
334
+ payload.setdefault("id", _safe_name(doc_id))
335
+ return payload
336
+ except Exception:
337
+ self.logger.warning("Failed to download shared-state file %s/%s", collection, doc_id, exc_info=True)
338
+ return {}
339
+
340
+ def set_document(self, collection: str, doc_id: str, payload: dict[str, Any], *, merge: bool = True) -> bool:
341
+ if not self.enabled():
342
+ return False
343
+ safe_doc = _safe_name(doc_id)
344
+ try:
345
+ existing = self.get_document(collection, safe_doc) if merge else {}
346
+ combined = {**existing, **dict(payload or {})} if merge else dict(payload or {})
347
+ combined.setdefault("id", safe_doc)
348
+ folder_id = self._ensure_folder_path(_safe_name(collection, "collection"))
349
+ file_name = f"{safe_doc}.json"
350
+ file_id = self._find_file(folder_id, file_name)
351
+ if not file_id:
352
+ file_id = self._create_file(folder_id, file_name)
353
+ self._upload_json(file_id, combined)
354
+ return True
355
+ except Exception:
356
+ self.logger.warning("Failed to upload shared-state file %s/%s", collection, safe_doc, exc_info=True)
357
+ return False
358
+
359
+ def list_documents(self, collection: str, *, limit: int = 200) -> list[dict[str, Any]]:
360
+ if not self.enabled():
361
+ return []
362
+ try:
363
+ folder_id = self._ensure_folder_path(_safe_name(collection, "collection"))
364
+ query = f"trashed = false and '{self._escape_query(folder_id)}' in parents"
365
+ response = self._request(
366
+ "GET",
367
+ "https://www.googleapis.com/drive/v3/files",
368
+ params={
369
+ "q": query,
370
+ "fields": "files(id,name,modifiedTime)",
371
+ "pageSize": max(1, int(limit)),
372
+ "supportsAllDrives": "true",
373
+ "orderBy": "modifiedTime desc",
374
+ },
375
+ )
376
+ items: list[dict[str, Any]] = []
377
+ for file in response.json().get("files", []):
378
+ file_id = str(file.get("id") or "").strip()
379
+ file_name = str(file.get("name") or "").strip()
380
+ if not file_id or not file_name.endswith(".json"):
381
+ continue
382
+ self._file_cache[f"{folder_id}:{file_name}"] = file_id
383
+ payload = self._download_json(file_id)
384
+ payload.setdefault("id", file_name[:-5])
385
+ items.append(payload)
386
+ return items
387
+ except Exception:
388
+ self.logger.warning("Failed to list shared-state collection %s", collection, exc_info=True)
389
+ return []
390
+
391
+ def delete_document(self, collection: str, doc_id: str) -> bool:
392
+ if not self.enabled():
393
+ return False
394
+ safe_doc = _safe_name(doc_id)
395
+ try:
396
+ folder_id = self._ensure_folder_path(_safe_name(collection, "collection"))
397
+ file_name = f"{safe_doc}.json"
398
+ file_id = self._find_file(folder_id, file_name)
399
+ if not file_id:
400
+ return True
401
+ self._request(
402
+ "DELETE",
403
+ f"https://www.googleapis.com/drive/v3/files/{file_id}?supportsAllDrives=true",
404
+ )
405
+ self._file_cache.pop(f"{folder_id}:{file_name}", None)
406
+ return True
407
+ except Exception:
408
+ self.logger.warning("Failed to delete shared-state file %s/%s", collection, safe_doc, exc_info=True)
409
+ return False
shared/remote_env.py ADDED
@@ -0,0 +1,102 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Encrypted remote .env loader and publisher helpers."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import base64
6
+ import io
7
+ import json
8
+ import logging
9
+ import os
10
+ from typing import Any
11
+
12
+ import requests
13
+ from cryptography.fernet import Fernet
14
+ from cryptography.hazmat.primitives import hashes
15
+ from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
16
+ from dotenv import dotenv_values
17
+
18
+
19
+ def _decode_maybe_b64(value: str) -> str:
20
+ text = str(value or "").strip()
21
+ if not text:
22
+ return ""
23
+ try:
24
+ padded = text + "=" * (-len(text) % 4)
25
+ decoded = base64.urlsafe_b64decode(padded.encode("utf-8")).decode("utf-8")
26
+ if decoded:
27
+ return decoded
28
+ except Exception:
29
+ pass
30
+ return text
31
+
32
+
33
+ def _derive_key(password: str, salt: bytes) -> bytes:
34
+ kdf = PBKDF2HMAC(
35
+ algorithm=hashes.SHA256(),
36
+ length=32,
37
+ salt=salt,
38
+ iterations=390000,
39
+ )
40
+ return base64.urlsafe_b64encode(kdf.derive(password.encode("utf-8")))
41
+
42
+
43
+ def build_remote_env_bundle(env_text: str, password: str, metadata: dict[str, Any] | None = None) -> dict[str, Any]:
44
+ salt = os.urandom(16)
45
+ key = _derive_key(password, salt)
46
+ token = Fernet(key).encrypt((env_text or "").encode("utf-8"))
47
+ return {
48
+ "schema": "kapo-remote-env-v1",
49
+ "salt": base64.urlsafe_b64encode(salt).decode("utf-8"),
50
+ "token": token.decode("utf-8"),
51
+ "metadata": dict(metadata or {}),
52
+ }
53
+
54
+
55
+ def decrypt_remote_env_bundle(payload: dict[str, Any], password: str) -> str:
56
+ salt_raw = str(payload.get("salt") or "").strip()
57
+ token = str(payload.get("token") or "").strip()
58
+ if not salt_raw or not token:
59
+ raise ValueError("Remote env payload is incomplete")
60
+ salt = base64.urlsafe_b64decode(salt_raw.encode("utf-8"))
61
+ key = _derive_key(password, salt)
62
+ return Fernet(key).decrypt(token.encode("utf-8")).decode("utf-8")
63
+
64
+
65
+ def _remote_env_url() -> str:
66
+ return _decode_maybe_b64(
67
+ str(os.getenv("KAPO_REMOTE_ENV_URL", "") or os.getenv("KAPO_REMOTE_ENV_URL_B64", "") or "").strip()
68
+ )
69
+
70
+
71
+ def _remote_env_password() -> str:
72
+ return _decode_maybe_b64(
73
+ str(os.getenv("KAPO_REMOTE_ENV_PASSWORD", "") or os.getenv("KAPO_REMOTE_ENV_PASSWORD_B64", "") or "").strip()
74
+ )
75
+
76
+
77
+ def load_remote_env_if_configured(*, override: bool = True, logger_name: str = "kapo.remote_env") -> dict[str, str]:
78
+ logger = logging.getLogger(logger_name)
79
+ if str(os.getenv("KAPO_REMOTE_ENV_LOADED", "")).strip().lower() in {"1", "true", "yes", "on"}:
80
+ return {}
81
+ url = _remote_env_url()
82
+ password = _remote_env_password()
83
+ if not url or not password:
84
+ return {}
85
+ try:
86
+ response = requests.get(url, timeout=30)
87
+ response.raise_for_status()
88
+ payload = dict(response.json() or {})
89
+ env_text = decrypt_remote_env_bundle(payload, password)
90
+ parsed = {
91
+ str(key): str(value)
92
+ for key, value in dotenv_values(stream=io.StringIO(env_text)).items()
93
+ if key and value is not None
94
+ }
95
+ for key, value in parsed.items():
96
+ if override or key not in os.environ or not str(os.getenv(key) or "").strip():
97
+ os.environ[key] = value
98
+ os.environ["KAPO_REMOTE_ENV_LOADED"] = "1"
99
+ return parsed
100
+ except Exception:
101
+ logger.warning("Failed to load encrypted remote env bundle", exc_info=True)
102
+ return {}