MrA7A1 commited on
Commit
8e4fc58
·
verified ·
1 Parent(s): 5b0d4db

KAPO rollout fix: sync brain_server/api/main.py

Browse files
Files changed (1) hide show
  1. brain_server/api/main.py +45 -108
brain_server/api/main.py CHANGED
@@ -80,6 +80,7 @@ FIREBASE_RUNTIME_CACHE: dict[str, tuple[float, Any]] = {}
80
  RUNTIME_LOG_BUFFER: deque[dict[str, Any]] = deque(maxlen=200)
81
  LAST_BRAIN_URL_REPORT: dict[str, Any] = {"url": "", "ts": 0.0}
82
  RUNTIME_STATE_THREAD_STARTED = False
 
83
 
84
  DEFAULT_MODEL_REPO = "QuantFactory/aya-expanse-8b-GGUF"
85
  DEFAULT_MODEL_FILE = "aya-expanse-8b.Q4_K_M.gguf"
@@ -198,7 +199,17 @@ def _drive_bootstrap_configured() -> bool:
198
 
199
  def _bootstrap_shared_state() -> None:
200
  if _drive_bootstrap_configured() or _shared_state_backend() in {"google_drive", "drive", "gdrive"}:
201
- DRIVE_STATE.ensure_bootstrap_loaded(force=False)
 
 
 
 
 
 
 
 
 
 
202
 
203
 
204
  def _startup_self_update_enabled() -> bool:
@@ -312,10 +323,6 @@ def _is_kaggle_runtime() -> bool:
312
  return "/kaggle/" in str(_project_root()).replace("\\", "/") or bool(os.getenv("KAGGLE_KERNEL_RUN_TYPE"))
313
 
314
 
315
- def _is_hf_space_runtime() -> bool:
316
- return str(os.getenv("HF_SPACE_DOCKER", "0")).strip().lower() in {"1", "true", "yes", "on"} or bool(os.getenv("SPACE_ID"))
317
-
318
-
319
  def _apply_executor_settings(settings: dict[str, Any]) -> None:
320
  for key in (
321
  "NGROK_AUTHTOKEN",
@@ -1057,6 +1064,36 @@ def _report_known_public_url() -> str | None:
1057
  return public_url
1058
 
1059
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1060
  def _bootstrap_executor_handshake(start_tunnel: bool = False) -> None:
1061
  executor_url = os.getenv("EXECUTOR_URL", "").strip()
1062
  if not executor_url:
@@ -1066,8 +1103,10 @@ def _bootstrap_executor_handshake(start_tunnel: bool = False) -> None:
1066
  logger.info("Brain public URL started locally without executor handshake: %s", public_url)
1067
  else:
1068
  logger.info("Brain started without publishing a public URL")
 
1069
  return
1070
  logger.info("Skipping executor handshake: EXECUTOR_URL not configured")
 
1071
  return
1072
 
1073
  settings = _pull_executor_settings()
@@ -1084,6 +1123,7 @@ def _bootstrap_executor_handshake(start_tunnel: bool = False) -> None:
1084
  logger.info("Brain public URL reported to executor: %s", public_url)
1085
  else:
1086
  logger.info("Brain started without publishing a public URL")
 
1087
 
1088
 
1089
  @app.on_event("startup")
@@ -1754,9 +1794,6 @@ def _dispatch_background(task, *args) -> None:
1754
 
1755
 
1756
  def _restart_process(delay_sec: float = 1.0) -> None:
1757
- if _is_hf_space_runtime():
1758
- logger.info("Skipping in-process restart on Hugging Face Space runtime")
1759
- return
1760
  def _run() -> None:
1761
  time.sleep(max(0.2, float(delay_sec)))
1762
  target_root = _sync_target_root()
@@ -2127,13 +2164,6 @@ else:
2127
  @app.post("/system/restart")
2128
  async def system_restart(req: RestartRequest | None = None):
2129
  delay_sec = req.delay_sec if req else 1.0
2130
- if _is_hf_space_runtime():
2131
- return {
2132
- "status": "skipped",
2133
- "reason": "restart_disabled_on_hf_space",
2134
- "delay_sec": delay_sec,
2135
- "target_root": _sync_target_root(),
2136
- }
2137
  _restart_process(delay_sec=delay_sec)
2138
  return {
2139
  "status": "restarting",
@@ -2307,96 +2337,3 @@ async def health(executor_url: str | None = None, check_executor: bool = False):
2307
  payload = _health_payload(check_executor=check_executor, executor_url=executor_url)
2308
  _persist_runtime_state_snapshot(reason="health_endpoint")
2309
  return payload
2310
-
2311
-
2312
- # KAPO HF SPACE TRANSFORMERS PATCH
2313
- def _kapo_hf_transformers_enabled() -> bool:
2314
- return str(os.getenv('KAPO_HF_TRANSFORMERS_RUNTIME', '0')).strip().lower() in {'1', 'true', 'yes', 'on'}
2315
-
2316
- def ensure_model_loaded(repo_id: str, filename: str, hf_token: str | None = None) -> None:
2317
- global MODEL, MODEL_ERROR, MODEL_META
2318
- repo_id = (repo_id or '').strip()
2319
- filename = (filename or '').strip()
2320
- if not repo_id:
2321
- MODEL = None
2322
- MODEL_ERROR = 'model repo missing'
2323
- return
2324
- if _kapo_hf_transformers_enabled() or (_is_hf_space_runtime() and not filename):
2325
- try:
2326
- from transformers import AutoModelForCausalLM, AutoTokenizer
2327
- tokenizer = AutoTokenizer.from_pretrained(repo_id, token=hf_token, trust_remote_code=True)
2328
- model = AutoModelForCausalLM.from_pretrained(repo_id, token=hf_token, trust_remote_code=True, device_map='cpu')
2329
- if hasattr(model, 'eval'):
2330
- model.eval()
2331
- MODEL = {'kind': 'transformers', 'model': model, 'tokenizer': tokenizer}
2332
- MODEL_ERROR = None
2333
- MODEL_META = {'repo_id': repo_id, 'filename': filename, 'path': None}
2334
- logger.info('Loaded transformers model %s', repo_id)
2335
- return
2336
- except Exception as exc:
2337
- MODEL = None
2338
- MODEL_ERROR = f'transformers model load failed: {exc}'
2339
- logger.exception('Transformers model load failed')
2340
- return
2341
- if not filename:
2342
- MODEL = None
2343
- MODEL_ERROR = 'model file missing'
2344
- return
2345
- try:
2346
- model_path = _download_model(repo_id, filename, hf_token=hf_token)
2347
- except Exception as exc:
2348
- MODEL = None
2349
- MODEL_ERROR = f'model download failed: {exc}'
2350
- logger.exception('Model download failed')
2351
- return
2352
- try:
2353
- from llama_cpp import Llama
2354
- MODEL = Llama(model_path=model_path, n_ctx=4096)
2355
- MODEL_ERROR = None
2356
- MODEL_META = {'repo_id': repo_id, 'filename': filename, 'path': model_path}
2357
- logger.info('Loaded model %s/%s', repo_id, filename)
2358
- except Exception as exc:
2359
- MODEL = None
2360
- MODEL_ERROR = f'model load failed: {exc}'
2361
- logger.exception('Model load failed')
2362
-
2363
- def _generate_response(user_input: str, history: list[dict[str, str]], context_block: str) -> str:
2364
- language = _detect_language(user_input)
2365
- exact_reply = _extract_exact_reply_instruction_safe(user_input)
2366
- if exact_reply:
2367
- return exact_reply
2368
- fast_reply = _project_specific_fast_reply(user_input)
2369
- if fast_reply:
2370
- return fast_reply
2371
- if MODEL is None:
2372
- try:
2373
- _load_default_model()
2374
- except Exception:
2375
- logger.exception('Lazy model load failed')
2376
- if MODEL is None:
2377
- if language == 'ar':
2378
- return 'الخدمة تعمل لكن توليد الرد الحر غير متاح الآن لأن النموذج غير محمل.'
2379
- return 'The Brain is online, but natural chat generation is unavailable because the model is not loaded.'
2380
- prompt = _build_chat_prompt(user_input, history, context_block)
2381
- try:
2382
- max_tokens = 80 if language == 'ar' else 96
2383
- if isinstance(MODEL, dict) and MODEL.get('kind') == 'transformers':
2384
- tokenizer = MODEL['tokenizer']
2385
- model = MODEL['model']
2386
- inputs = tokenizer(prompt, return_tensors='pt', truncation=True, max_length=2048)
2387
- if hasattr(model, 'device'):
2388
- inputs = {k: v.to(model.device) if hasattr(v, 'to') else v for k, v in inputs.items()}
2389
- output_ids = model.generate(**inputs, max_new_tokens=max_tokens, do_sample=False, pad_token_id=tokenizer.eos_token_id)
2390
- generated = output_ids[0][inputs['input_ids'].shape[1]:]
2391
- text = tokenizer.decode(generated, skip_special_tokens=True).strip()
2392
- else:
2393
- output = MODEL(prompt, max_tokens=max_tokens, temperature=0.1, top_p=0.85, stop=['\nUser:', '\nUSER:', '\n###', '<|EOT|>'])
2394
- text = output['choices'][0]['text'].strip()
2395
- if _response_looks_bad(text, language):
2396
- return _fallback_response(user_input)
2397
- return text or ('تم استلام رسالتك.' if language == 'ar' else 'I received your message.')
2398
- except Exception:
2399
- logger.exception('Model generation failed')
2400
- if language == 'ar':
2401
- return 'فهمت طلبك، لكن حدث خطأ أثناء توليد الرد النصي.'
2402
- return 'I understood your request, but text generation failed.'
 
80
  RUNTIME_LOG_BUFFER: deque[dict[str, Any]] = deque(maxlen=200)
81
  LAST_BRAIN_URL_REPORT: dict[str, Any] = {"url": "", "ts": 0.0}
82
  RUNTIME_STATE_THREAD_STARTED = False
83
+ PUBLIC_URL_RETRY_STARTED = False
84
 
85
  DEFAULT_MODEL_REPO = "QuantFactory/aya-expanse-8b-GGUF"
86
  DEFAULT_MODEL_FILE = "aya-expanse-8b.Q4_K_M.gguf"
 
199
 
200
  def _bootstrap_shared_state() -> None:
201
  if _drive_bootstrap_configured() or _shared_state_backend() in {"google_drive", "drive", "gdrive"}:
202
+ payload = DRIVE_STATE.ensure_bootstrap_loaded(force=False) or {}
203
+ fallback_mappings = {
204
+ "executor_url": "EXECUTOR_URL",
205
+ "control_plane_url": "KAPO_CONTROL_PLANE_URL",
206
+ "cloudflare_control_plane_url": "KAPO_CONTROL_PLANE_URL",
207
+ "cloudflare_queue_name": "KAPO_CLOUDFLARE_QUEUE_NAME",
208
+ }
209
+ for key, env_name in fallback_mappings.items():
210
+ value = payload.get(key)
211
+ if value not in (None, ""):
212
+ os.environ[env_name] = str(value)
213
 
214
 
215
  def _startup_self_update_enabled() -> bool:
 
323
  return "/kaggle/" in str(_project_root()).replace("\\", "/") or bool(os.getenv("KAGGLE_KERNEL_RUN_TYPE"))
324
 
325
 
 
 
 
 
326
  def _apply_executor_settings(settings: dict[str, Any]) -> None:
327
  for key in (
328
  "NGROK_AUTHTOKEN",
 
1064
  return public_url
1065
 
1066
 
1067
+ def _retry_publish_public_url(attempts: int = 8, delay_sec: float = 12.0) -> None:
1068
+ for attempt in range(max(1, int(attempts))):
1069
+ try:
1070
+ public_url = _report_known_public_url()
1071
+ if not public_url and _auto_publish_public_url_on_startup():
1072
+ public_url = start_ngrok(os.getenv("NGROK_AUTHTOKEN") or None)
1073
+ if public_url:
1074
+ logger.info("Recovered brain public URL on retry attempt %s: %s", attempt + 1, public_url)
1075
+ return
1076
+ except Exception:
1077
+ logger.warning("Public URL retry attempt %s failed", attempt + 1, exc_info=True)
1078
+ time.sleep(max(2.0, float(delay_sec)))
1079
+ logger.warning("Brain public URL retry loop exhausted without a published URL")
1080
+
1081
+
1082
+ def _ensure_public_url_background(start_tunnel: bool = False) -> None:
1083
+ global PUBLIC_URL_RETRY_STARTED
1084
+ current = str(os.getenv("BRAIN_PUBLIC_URL") or LAST_BRAIN_URL_REPORT.get("url") or _load_saved_public_url() or "").strip()
1085
+ if current or PUBLIC_URL_RETRY_STARTED:
1086
+ return
1087
+ if not start_tunnel and not _auto_publish_public_url_on_startup():
1088
+ return
1089
+ PUBLIC_URL_RETRY_STARTED = True
1090
+ threading.Thread(
1091
+ target=_retry_publish_public_url,
1092
+ kwargs={"attempts": 8, "delay_sec": 12.0},
1093
+ daemon=True,
1094
+ ).start()
1095
+
1096
+
1097
  def _bootstrap_executor_handshake(start_tunnel: bool = False) -> None:
1098
  executor_url = os.getenv("EXECUTOR_URL", "").strip()
1099
  if not executor_url:
 
1103
  logger.info("Brain public URL started locally without executor handshake: %s", public_url)
1104
  else:
1105
  logger.info("Brain started without publishing a public URL")
1106
+ _ensure_public_url_background(start_tunnel=True)
1107
  return
1108
  logger.info("Skipping executor handshake: EXECUTOR_URL not configured")
1109
+ _ensure_public_url_background(start_tunnel=start_tunnel)
1110
  return
1111
 
1112
  settings = _pull_executor_settings()
 
1123
  logger.info("Brain public URL reported to executor: %s", public_url)
1124
  else:
1125
  logger.info("Brain started without publishing a public URL")
1126
+ _ensure_public_url_background(start_tunnel=start_tunnel)
1127
 
1128
 
1129
  @app.on_event("startup")
 
1794
 
1795
 
1796
  def _restart_process(delay_sec: float = 1.0) -> None:
 
 
 
1797
  def _run() -> None:
1798
  time.sleep(max(0.2, float(delay_sec)))
1799
  target_root = _sync_target_root()
 
2164
  @app.post("/system/restart")
2165
  async def system_restart(req: RestartRequest | None = None):
2166
  delay_sec = req.delay_sec if req else 1.0
 
 
 
 
 
 
 
2167
  _restart_process(delay_sec=delay_sec)
2168
  return {
2169
  "status": "restarting",
 
2337
  payload = _health_payload(check_executor=check_executor, executor_url=executor_url)
2338
  _persist_runtime_state_snapshot(reason="health_endpoint")
2339
  return payload