"""FastAPI entrypoint for the Brain Server.""" import base64 import gc import logging import os import re import shutil import subprocess import sys import tempfile import threading import time import zipfile from collections import deque from pathlib import Path from typing import Any import requests from fastapi import FastAPI, File, UploadFile from pydantic import BaseModel from agents.memory_agent import MemoryAgent from agents.planner_agent import PlannerAgent from agents.reasoning_agent import ReasoningAgent try: from api import deps as deps_module from api.deps import get_executor_headers, get_logger, load_config from api.firebase_store import FirebaseStore from api.routes_analyze import router as analyze_router from api.routes_execute import router as execute_router from api.routes_plan import router as plan_router from shared.google_drive_state import GoogleDriveStateClient except ImportError: from . import deps as deps_module from .deps import get_executor_headers, get_logger, load_config from .firebase_store import FirebaseStore from .routes_analyze import router as analyze_router from .routes_execute import router as execute_router from .routes_plan import router as plan_router from shared.google_drive_state import GoogleDriveStateClient logger = get_logger("kapo.brain.main") def _configure_windows_utf8() -> None: if os.name != "nt": return os.environ.setdefault("PYTHONUTF8", "1") os.environ.setdefault("PYTHONIOENCODING", "utf-8") os.environ.setdefault("PYTHONLEGACYWINDOWSSTDIO", "utf-8") try: import ctypes kernel32 = ctypes.windll.kernel32 kernel32.SetConsoleCP(65001) kernel32.SetConsoleOutputCP(65001) except Exception: pass _configure_windows_utf8() if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8", errors="replace") if hasattr(sys.stderr, "reconfigure"): sys.stderr.reconfigure(encoding="utf-8", errors="replace") app = FastAPI(title="KAPO-AI Brain Server", version="1.0.0") app.include_router(plan_router) app.include_router(execute_router) app.include_router(analyze_router) MODEL = None MODEL_ERROR = None MODEL_META = {"repo_id": None, "filename": None, "path": None} EMBED_MODEL = None FIREBASE = FirebaseStore("brain", logger_name="kapo.brain.firebase") DRIVE_STATE = GoogleDriveStateClient(logger) FIREBASE_RUNTIME_CACHE: dict[str, tuple[float, Any]] = {} RUNTIME_LOG_BUFFER: deque[dict[str, Any]] = deque(maxlen=200) LAST_BRAIN_URL_REPORT: dict[str, Any] = {"url": "", "ts": 0.0} RUNTIME_STATE_THREAD_STARTED = False PUBLIC_URL_RETRY_STARTED = False DEFAULT_MODEL_REPO = "QuantFactory/aya-expanse-8b-GGUF" DEFAULT_MODEL_FILE = "aya-expanse-8b.Q4_K_M.gguf" DEFAULT_MODEL_PROFILE_ID = "supervisor-ar-en-default" HAS_MULTIPART = True try: import multipart # noqa: F401 except Exception: HAS_MULTIPART = False class RuntimeLogHandler(logging.Handler): def emit(self, record) -> None: try: RUNTIME_LOG_BUFFER.append( { "ts": time.time(), "level": record.levelname, "name": record.name, "message": record.getMessage(), } ) except Exception: pass _runtime_log_handler = RuntimeLogHandler(level=logging.WARNING) if not any(isinstance(handler, RuntimeLogHandler) for handler in logger.handlers): logger.addHandler(_runtime_log_handler) def _feature_enabled(name: str, default: bool = False) -> bool: value = os.getenv(name) if value is None or str(value).strip() == "": return default return str(value).strip().lower() in {"1", "true", "yes", "on"} def _hf_transformers_runtime_enabled() -> bool: return _feature_enabled("KAPO_HF_TRANSFORMERS_RUNTIME", default=False) def _embeddings_enabled() -> bool: return not _feature_enabled("KAPO_DISABLE_EMBEDDINGS", default=False) def _remote_brain_only() -> bool: return _feature_enabled("REMOTE_BRAIN_ONLY", default=False) def _ngrok_bootstrap_enabled() -> bool: return _feature_enabled("BRAIN_AUTO_NGROK", default=True) def _configured_public_url() -> str: return str(os.getenv("BRAIN_PUBLIC_URL", "")).strip().rstrip("/") def _prefer_configured_public_url() -> bool: provider = str(os.getenv("BRAIN_PROVIDER", "") or os.getenv("BRAIN_TEMPLATE", "")).strip().lower() if "huggingface" in provider or "hf-space" in provider: return True return _feature_enabled("BRAIN_FORCE_CONFIGURED_PUBLIC_URL", default=False) def _reuse_public_url_on_restart() -> bool: return _feature_enabled("BRAIN_REUSE_PUBLIC_URL_ON_RESTART", default=True) def _auto_publish_public_url_on_startup() -> bool: return _feature_enabled("BRAIN_AUTO_PUBLISH_URL_ON_STARTUP", default=True) def _internal_restart_in_progress() -> bool: return _feature_enabled("KAPO_INTERNAL_RESTART", default=False) def _fast_restart_enabled() -> bool: return _feature_enabled("KAPO_FAST_INTERNAL_RESTART", default=True) def _executor_connect_timeout() -> float: return float(os.getenv("EXECUTOR_CONNECT_TIMEOUT_SEC", "3.0") or 3.0) def _executor_read_timeout(name: str, default: float) -> float: return float(os.getenv(name, str(default)) or default) def _executor_roundtrip_allowed(feature_name: str, default: bool = True) -> bool: executor_url = os.getenv("EXECUTOR_URL", "").strip() if not executor_url: return False kaggle_defaults = { "BRAIN_REMOTE_TRACE_STORE_ENABLED": False, "BRAIN_REMOTE_AUTO_INGEST_ENABLED": False, "BRAIN_REMOTE_STYLE_PROFILE_ENABLED": False, } effective_default = kaggle_defaults.get(feature_name, default) if _is_kaggle_runtime() else default return _feature_enabled(feature_name, default=effective_default) def _remote_runtime_reads_enabled() -> bool: return _feature_enabled("KAPO_REMOTE_STATE_READS", default=False) def _shared_state_backend() -> str: return str(os.getenv("KAPO_SHARED_STATE_BACKEND", "")).strip().lower() def _drive_bootstrap_configured() -> bool: return bool( str(os.getenv("GOOGLE_DRIVE_BOOTSTRAP_URL", "") or os.getenv("KAPO_BOOTSTRAP_URL", "") or "").strip() ) def _bootstrap_shared_state() -> None: if _drive_bootstrap_configured() or _shared_state_backend() in {"google_drive", "drive", "gdrive"}: payload = DRIVE_STATE.ensure_bootstrap_loaded(force=False) or {} fallback_mappings = { "executor_url": "EXECUTOR_URL", "control_plane_url": "KAPO_CONTROL_PLANE_URL", "cloudflare_control_plane_url": "KAPO_CONTROL_PLANE_URL", "cloudflare_queue_name": "KAPO_CLOUDFLARE_QUEUE_NAME", } for key, env_name in fallback_mappings.items(): value = payload.get(key) if value not in (None, ""): os.environ[env_name] = str(value) def _startup_self_update_enabled() -> bool: return _feature_enabled("KAPO_STARTUP_SELF_UPDATE", default=True) def _should_report_brain_url(public_url: str) -> bool: normalized = str(public_url or "").strip().rstrip("/") if not normalized: return False interval_sec = max(30.0, float(os.getenv("BRAIN_REPORT_MIN_INTERVAL_SEC", "600") or 600)) previous_url = str(LAST_BRAIN_URL_REPORT.get("url") or "").strip() previous_ts = float(LAST_BRAIN_URL_REPORT.get("ts") or 0.0) now = time.time() if normalized != previous_url or (now - previous_ts) >= interval_sec: LAST_BRAIN_URL_REPORT["url"] = normalized LAST_BRAIN_URL_REPORT["ts"] = now return True return False def _download_model(repo_id: str, filename: str, hf_token: str | None = None) -> str: from huggingface_hub import hf_hub_download configured_cache = str(os.getenv("MODEL_CACHE_DIR", "") or "").strip() if configured_cache: cache_dir = configured_cache elif _is_kaggle_runtime(): cache_dir = str((_project_root() / "models_cache").resolve()) else: cache_dir = os.path.join(tempfile.gettempdir(), "kapo_models") os.makedirs(cache_dir, exist_ok=True) return hf_hub_download(repo_id=repo_id, filename=filename, cache_dir=cache_dir, token=hf_token) def ensure_model_loaded(repo_id: str, filename: str, hf_token: str | None = None) -> None: global MODEL, MODEL_ERROR, MODEL_META repo_id = (repo_id or "").strip() filename = (filename or "").strip() if not repo_id or not filename: MODEL = None MODEL_ERROR = "model repo/file missing" return try: model_path = _download_model(repo_id, filename, hf_token=hf_token) except Exception as exc: MODEL = None MODEL_ERROR = f"model download failed: {exc}" logger.exception("Model download failed") return try: from llama_cpp import Llama MODEL = Llama(model_path=model_path, n_ctx=4096) MODEL_ERROR = None MODEL_META = {"repo_id": repo_id, "filename": filename, "path": model_path} logger.info("Loaded model %s/%s", repo_id, filename) except Exception as exc: MODEL = None MODEL_ERROR = f"model load failed: {exc}" logger.exception("Model load failed") def _load_embed_model() -> None: global EMBED_MODEL if not _embeddings_enabled(): logger.info("Embedding model disabled by configuration") return if EMBED_MODEL is not None: return try: from sentence_transformers import SentenceTransformer except ModuleNotFoundError: logger.info("Skipping embedding model load because sentence_transformers is unavailable") return model_name = os.getenv("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2") EMBED_MODEL = SentenceTransformer(model_name) logger.info("Loaded embedding model %s", model_name) def _load_default_model() -> None: global MODEL, MODEL_ERROR, MODEL_META repo_id = str(os.getenv("MODEL_REPO", DEFAULT_MODEL_REPO) or DEFAULT_MODEL_REPO).strip() filename = str(os.getenv("MODEL_FILE", "") or "").strip() provider = str(os.getenv("BRAIN_PROVIDER", "") or os.getenv("BRAIN_TEMPLATE", "") or "").strip().lower() if _hf_transformers_runtime_enabled(): MODEL = None MODEL_ERROR = None MODEL_META = {"repo_id": repo_id, "filename": "", "path": None} logger.info("Skipping local model load; HF transformers runtime is enabled for %s", repo_id) return if not filename and ("huggingface" in provider or "hf-space" in provider): MODEL = None MODEL_ERROR = None MODEL_META = {"repo_id": repo_id, "filename": "", "path": None} logger.info("Skipping local model load; using Hugging Face remote generation fallback for %s", repo_id) return ensure_model_loaded(repo_id, filename or DEFAULT_MODEL_FILE, hf_token=os.getenv("HF_TOKEN")) def _brain_headers() -> dict: cfg = load_config() return get_executor_headers(cfg) def _project_root() -> Path: return Path(__file__).resolve().parents[1] def _is_kaggle_runtime() -> bool: return "/kaggle/" in str(_project_root()).replace("\\", "/") or bool(os.getenv("KAGGLE_KERNEL_RUN_TYPE")) def _apply_executor_settings(settings: dict[str, Any]) -> None: for key in ( "NGROK_AUTHTOKEN", "MODEL_REPO", "MODEL_FILE", "MODEL_PROFILE_ID", "SUPERVISOR_MODEL_PROFILE_ID", "EMBED_MODEL", "REQUEST_TIMEOUT_SEC", "REQUEST_RETRIES", "CHAT_TIMEOUT_SEC", "EXECUTOR_BYPASS_HEADER", "EXECUTOR_BYPASS_VALUE", "BRAIN_BYPASS_HEADER", "BRAIN_BYPASS_VALUE", "REMOTE_BRAIN_ONLY", "KAGGLE_AUTO_BOOTSTRAP", "BRAIN_AUTO_NGROK", "BRAIN_AUTO_PUBLISH_URL_ON_STARTUP", "BRAIN_PUBLIC_URL", "BRAIN_REUSE_PUBLIC_URL_ON_RESTART", "KAGGLE_SYNC_SUBDIR", "BRAIN_ROLES", "BRAIN_LANGUAGES", "BRAIN_REMOTE_KNOWLEDGE_ENABLED", "BRAIN_REMOTE_WEB_SEARCH_ENABLED", "BRAIN_REMOTE_TRACE_STORE_ENABLED", "BRAIN_REMOTE_AUTO_INGEST_ENABLED", "BRAIN_LOCAL_RAG_FALLBACK_ENABLED", "EXECUTOR_CONNECT_TIMEOUT_SEC", "BRAIN_REMOTE_KNOWLEDGE_TIMEOUT_SEC", "BRAIN_REMOTE_WEB_SEARCH_TIMEOUT_SEC", "BRAIN_REMOTE_TRACE_STORE_TIMEOUT_SEC", "BRAIN_REMOTE_AUTO_INGEST_TIMEOUT_SEC", "FIREBASE_ENABLED", "FIREBASE_PROJECT_ID", "FIREBASE_SERVICE_ACCOUNT_PATH", "FIREBASE_SERVICE_ACCOUNT_JSON", "FIREBASE_NAMESPACE", "KAPO_SHARED_STATE_BACKEND", "GOOGLE_DRIVE_SHARED_STATE_FOLDER_ID", "GOOGLE_DRIVE_SHARED_STATE_PREFIX", "GOOGLE_DRIVE_BOOTSTRAP_URL", "KAPO_BOOTSTRAP_URL", "GOOGLE_DRIVE_ACCESS_TOKEN", "GOOGLE_DRIVE_REFRESH_TOKEN", "GOOGLE_DRIVE_CLIENT_SECRET_JSON", "GOOGLE_DRIVE_CLIENT_SECRET_JSON_BASE64", "GOOGLE_DRIVE_CLIENT_SECRET_PATH", "GOOGLE_DRIVE_TOKEN_EXPIRES_AT", "KAPO_CONTROL_PLANE_URL", "KAPO_CLOUDFLARE_QUEUE_NAME", ): value = settings.get(key) if value not in (None, ""): os.environ[key] = str(value) deps_module.CONFIG_CACHE = None def _apply_firebase_runtime_settings() -> None: if not _remote_runtime_reads_enabled(): return if not FIREBASE.enabled(): return shared = FIREBASE.get_document("settings", "global") runtime = FIREBASE.get_document("runtime", "executor") role_items = FIREBASE.list_documents("roles", limit=64) merged = {} merged.update(shared or {}) merged.update(runtime or {}) mappings = { "executor_public_url": "EXECUTOR_PUBLIC_URL", "executor_url": "EXECUTOR_URL", "model_repo": "MODEL_REPO", "model_file": "MODEL_FILE", "model_profile_id": "MODEL_PROFILE_ID", "supervisor_model_profile_id": "SUPERVISOR_MODEL_PROFILE_ID", "brain_roles": "BRAIN_ROLES", "brain_languages": "BRAIN_LANGUAGES", } current_brain_url = str(merged.get("current_brain_url") or "").strip() if current_brain_url: os.environ["KAPO_EXECUTOR_CURRENT_BRAIN_URL"] = current_brain_url for key, env_name in mappings.items(): value = merged.get(key) if value not in (None, ""): os.environ[env_name] = str(value) if role_items: enabled_roles = [ str(item.get("name") or item.get("id") or "").strip().lower() for item in role_items if str(item.get("enabled", True)).strip().lower() not in {"0", "false", "no", "off"} ] enabled_roles = [role for role in enabled_roles if role] if enabled_roles: os.environ["BRAIN_ROLES"] = ",".join(dict.fromkeys(enabled_roles)) deps_module.CONFIG_CACHE = None def _firebase_collection_cache_key(name: str) -> str: return f"collection:{name}" def _firebase_list_documents_cached(collection: str, ttl_sec: float = 30.0, limit: int = 200) -> list[dict[str, Any]]: if not _remote_runtime_reads_enabled(): return [] if not FIREBASE.enabled(): return [] key = _firebase_collection_cache_key(collection) now = time.time() cached = FIREBASE_RUNTIME_CACHE.get(key) if cached and (now - cached[0]) < ttl_sec: return list(cached[1] or []) items = FIREBASE.list_documents(collection, limit=limit) FIREBASE_RUNTIME_CACHE[key] = (now, items) return list(items or []) def _firebase_role_profiles() -> list[dict[str, Any]]: items = _firebase_list_documents_cached("roles", ttl_sec=30.0, limit=64) if items: return items roles = [part.strip() for part in str(os.getenv("BRAIN_ROLES", "")).split(",") if part.strip()] return [{"name": role, "enabled": True} for role in roles] def _firebase_runtime_snapshot() -> dict[str, Any]: return { "platforms": _firebase_list_documents_cached("platforms", ttl_sec=45.0, limit=64), "models": _firebase_list_documents_cached("models", ttl_sec=45.0, limit=128), "prompts": _firebase_list_documents_cached("prompts", ttl_sec=20.0, limit=128), "roles": _firebase_role_profiles(), } def _json_safe(value: Any) -> Any: if isinstance(value, dict): return {str(key): _json_safe(item) for key, item in value.items()} if isinstance(value, list): return [_json_safe(item) for item in value] if isinstance(value, tuple): return [_json_safe(item) for item in value] if isinstance(value, (str, int, float, bool)) or value is None: return value return str(value) def _firebase_prompt_body(role_name: str, language: str = "en") -> str: role_name = str(role_name or "").strip().lower() language = str(language or "en").strip().lower() if not role_name: return "" prompts = _firebase_runtime_snapshot().get("prompts", []) exact = [] fallback = [] for item in prompts: if str(item.get("role_name") or "").strip().lower() != role_name: continue if str(item.get("enabled", True)).strip().lower() in {"0", "false", "no", "off"}: continue item_lang = str(item.get("language") or "en").strip().lower() body = str(item.get("body") or "").strip() if not body: continue if item_lang == language: exact.append(body) elif item_lang == "en": fallback.append(body) if exact: return exact[0] if fallback: return fallback[0] return "" def _prepare_runtime_environment() -> None: try: _bootstrap_shared_state() except Exception: logger.warning("Shared-state bootstrap preload failed", exc_info=True) if not _is_kaggle_runtime(): return source_root = _project_root() source_text = str(source_root).replace("\\", "/") runtime_root_env = os.getenv("KAPO_RUNTIME_ROOT", "").strip() if runtime_root_env: runtime_root = Path(runtime_root_env).resolve() elif source_text.startswith("/kaggle/working/"): runtime_root = source_root.resolve() else: runtime_root = Path("/kaggle/working/KAPO-AI-SYSTEM").resolve() sync_root = runtime_root auto_bootstrap = str(os.getenv("KAGGLE_AUTO_BOOTSTRAP", "1")).strip().lower() in {"1", "true", "yes", "on"} if auto_bootstrap and source_text.startswith("/kaggle/input/") and source_root != runtime_root: if runtime_root.exists(): shutil.rmtree(runtime_root, ignore_errors=True) shutil.copytree( source_root, runtime_root, ignore=shutil.ignore_patterns("__pycache__", "*.pyc", ".git", ".venv"), ) if str(runtime_root) not in sys.path: sys.path.insert(0, str(runtime_root)) data_dir = runtime_root / "data" / "local" / "brain_runtime" data_dir.mkdir(parents=True, exist_ok=True) os.environ["KAPO_RUNTIME_ROOT"] = str(runtime_root) os.environ["KAPO_SYNC_ROOT"] = str(sync_root) os.environ["LOCAL_DATA_DIR"] = str(data_dir) os.environ["DB_PATH"] = str(data_dir / "episodic.db") os.environ["TOOLS_DB_PATH"] = str(data_dir / "tools.db") os.environ["FAISS_INDEX_PATH"] = str(data_dir / "faiss.index") os.environ["REMOTE_BRAIN_ONLY"] = str(os.getenv("REMOTE_BRAIN_ONLY", "1") or "1") saved_executor_url = _load_saved_executor_url() current_executor_url = str(os.getenv("EXECUTOR_URL", "") or "").strip() if saved_executor_url and not current_executor_url: os.environ["EXECUTOR_URL"] = saved_executor_url deps_module.CONFIG_CACHE = None def _sync_target_root() -> str: return os.getenv("KAPO_SYNC_ROOT") or os.getenv("KAPO_RUNTIME_ROOT") or os.getcwd() def _sync_root_path() -> Path: return Path(_sync_target_root()).resolve() def _resolve_sync_path(user_path: str | None = None) -> Path: root = _sync_root_path() relative = str(user_path or "").strip().replace("\\", "/").lstrip("/") candidate = (root / relative).resolve() if relative else root if candidate != root and root not in candidate.parents: raise ValueError("Path escapes sync root") return candidate def _describe_sync_entry(path: Path) -> dict[str, Any]: stat = path.stat() return { "name": path.name or str(path), "path": str(path.relative_to(_sync_root_path())).replace("\\", "/") if path != _sync_root_path() else "", "is_dir": path.is_dir(), "size": stat.st_size, "modified_at": stat.st_mtime, } def _public_url_state_path() -> Path: runtime_root = Path(_sync_target_root()).resolve() state_dir = runtime_root / "data" / "local" / "brain_runtime" state_dir.mkdir(parents=True, exist_ok=True) return state_dir / "public_url.txt" def _remember_public_url(public_url: str) -> None: value = str(public_url or "").strip().rstrip("/") if not value: return os.environ["BRAIN_PUBLIC_URL"] = value try: _public_url_state_path().write_text(value, encoding="utf-8") except Exception: logger.warning("Failed to persist public URL", exc_info=True) def _load_saved_public_url() -> str: try: value = _public_url_state_path().read_text(encoding="utf-8").strip().rstrip("/") return value except Exception: return "" def _ngrok_api_state_path() -> Path: runtime_root = Path(_sync_target_root()).resolve() state_dir = runtime_root / "data" / "local" / "brain_runtime" state_dir.mkdir(parents=True, exist_ok=True) return state_dir / "ngrok_api_url.txt" def _executor_url_state_path() -> Path: runtime_root = Path(_sync_target_root()).resolve() state_dir = runtime_root / "data" / "local" / "brain_runtime" state_dir.mkdir(parents=True, exist_ok=True) return state_dir / "executor_url.txt" def _remember_executor_url(executor_url: str) -> None: value = str(executor_url or "").strip().rstrip("/") if not value: return os.environ["EXECUTOR_URL"] = value try: _executor_url_state_path().write_text(value, encoding="utf-8") except Exception: logger.warning("Failed to persist executor URL", exc_info=True) def _load_saved_executor_url() -> str: configured = str(os.getenv("EXECUTOR_URL", "")).strip().rstrip("/") if configured: return configured try: return _executor_url_state_path().read_text(encoding="utf-8").strip().rstrip("/") except Exception: return "" def _brain_state_id() -> str: public_url = str(os.getenv("BRAIN_PUBLIC_URL") or _load_saved_public_url() or "").strip().rstrip("/") if public_url: return re.sub(r"[^A-Za-z0-9._-]+", "_", public_url) runtime_root = str(os.getenv("KAPO_RUNTIME_ROOT") or _sync_target_root() or "brain_runtime").strip() return re.sub(r"[^A-Za-z0-9._-]+", "_", runtime_root) def _applied_runtime_version_path() -> Path: runtime_root = Path(_sync_target_root()).resolve() state_dir = runtime_root / "data" / "local" / "brain_runtime" state_dir.mkdir(parents=True, exist_ok=True) return state_dir / "applied_version.json" def _load_applied_runtime_version() -> dict[str, Any]: try: return dict(json.loads(_applied_runtime_version_path().read_text(encoding="utf-8")) or {}) except Exception: return {} def _write_applied_runtime_version(payload: dict[str, Any]) -> None: _applied_runtime_version_path().write_text( json.dumps(dict(payload or {}), ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8", ) def _download_remote_zip(url: str, destination: Path) -> Path: response = requests.get(str(url).strip(), timeout=120) response.raise_for_status() destination.parent.mkdir(parents=True, exist_ok=True) destination.write_bytes(response.content) return destination def _apply_zip_overlay(zip_path: Path, target_root: Path) -> None: with zipfile.ZipFile(zip_path, "r") as archive: archive.extractall(target_root) def _load_patch_manifest_from_bootstrap() -> dict[str, Any]: bootstrap = DRIVE_STATE.ensure_bootstrap_loaded(force=True) or {} manifest_url = str(bootstrap.get("patch_manifest_url") or "").strip() if not manifest_url: return dict(bootstrap) try: response = requests.get(manifest_url, timeout=30) response.raise_for_status() payload = dict(response.json() or {}) merged = {**bootstrap, **payload} return merged except Exception: logger.warning("Failed to load patch manifest from bootstrap URL", exc_info=True) return dict(bootstrap) def _run_startup_self_update() -> None: if not _startup_self_update_enabled(): return manifest = _load_patch_manifest_from_bootstrap() target_version = str(manifest.get("version") or "").strip() target_hash = str(manifest.get("build_hash") or "").strip() if not target_version and not target_hash: return current = _load_applied_runtime_version() if ( str(current.get("version") or "").strip() == target_version and str(current.get("build_hash") or "").strip() == target_hash and target_version ): return runtime_root = Path(_sync_target_root()).resolve() temp_dir = runtime_root / "data" / "local" / "brain_runtime" / "updates" patch_url = str(manifest.get("patch_bundle_url") or "").strip() full_url = str(manifest.get("full_package_url") or "").strip() applied_mode = "" source_url = "" try: if patch_url: zip_path = _download_remote_zip(patch_url, temp_dir / "patch_bundle.zip") _apply_zip_overlay(zip_path, runtime_root) applied_mode = "patch_bundle" source_url = patch_url elif full_url: zip_path = _download_remote_zip(full_url, temp_dir / "full_package.zip") _apply_zip_overlay(zip_path, runtime_root) applied_mode = "full_package" source_url = full_url else: return _write_applied_runtime_version( { "version": target_version, "build_hash": target_hash, "applied_at": time.time(), "mode": applied_mode, "source_url": source_url, } ) logger.info("Applied startup self-update (%s) version=%s", applied_mode, target_version or target_hash) except Exception: logger.warning("Startup self-update failed", exc_info=True) def _remember_ngrok_api_url(api_url: str) -> None: value = str(api_url or "").strip().rstrip("/") if not value: return os.environ["KAPO_NGROK_API_URL"] = value try: _ngrok_api_state_path().write_text(value, encoding="utf-8") except Exception: logger.warning("Failed to persist ngrok API URL", exc_info=True) def _load_saved_ngrok_api_url() -> str: configured = str(os.getenv("KAPO_NGROK_API_URL", "")).strip().rstrip("/") if configured: return configured try: return _ngrok_api_state_path().read_text(encoding="utf-8").strip().rstrip("/") except Exception: return "" def _ngrok_api_candidates() -> list[str]: seen: set[str] = set() candidates: list[str] = [] for candidate in [_load_saved_ngrok_api_url(), "http://127.0.0.1:4040", "http://127.0.0.1:4041", "http://127.0.0.1:4042"]: value = str(candidate or "").strip().rstrip("/") if value and value not in seen: seen.add(value) candidates.append(value) return candidates def _probe_ngrok_api(api_url: str) -> bool: try: response = requests.get(f"{api_url}/api/tunnels", timeout=2) return response.status_code == 200 except Exception: return False def _find_live_ngrok_api() -> str | None: for api_url in _ngrok_api_candidates(): if _probe_ngrok_api(api_url): _remember_ngrok_api_url(api_url) return api_url return None def _ngrok_binary_path() -> str: env_path = str(os.getenv("NGROK_PATH", "")).strip() if env_path and Path(env_path).exists(): return env_path default_ngrok_path = "" try: from pyngrok import conf default_ngrok_path = str(conf.get_default().ngrok_path or "").strip() if default_ngrok_path and Path(default_ngrok_path).exists(): return default_ngrok_path except Exception: pass try: from pyngrok import installer install_target = default_ngrok_path or str((Path.home() / ".ngrok" / "ngrok").resolve()) Path(install_target).parent.mkdir(parents=True, exist_ok=True) installer.install_ngrok(install_target) if Path(install_target).exists(): return install_target except Exception: logger.warning("Failed to auto-install ngrok binary", exc_info=True) discovered = shutil.which("ngrok") if discovered: return discovered return "ngrok" def _ensure_ngrok_auth(token: str) -> None: ngrok_path = _ngrok_binary_path() subprocess.run( [ngrok_path, "config", "add-authtoken", token], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) def _start_detached_ngrok_agent(token: str) -> str | None: if token: _ensure_ngrok_auth(token) os.environ["NGROK_AUTHTOKEN"] = token ngrok_path = _ngrok_binary_path() popen_kwargs = { "stdout": subprocess.DEVNULL, "stderr": subprocess.DEVNULL, "stdin": subprocess.DEVNULL, } if os.name == "nt": popen_kwargs["creationflags"] = getattr(subprocess, "DETACHED_PROCESS", 0) | getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0) else: popen_kwargs["start_new_session"] = True subprocess.Popen( [ngrok_path, "start", "--none", "--log=stdout"], **popen_kwargs, ) deadline = time.time() + 12 while time.time() < deadline: api_url = _find_live_ngrok_api() if api_url: return api_url time.sleep(0.5) return None def _list_ngrok_tunnels(api_url: str) -> list[dict[str, Any]]: response = requests.get(f"{api_url}/api/tunnels", timeout=5) response.raise_for_status() payload = response.json() tunnels = payload.get("tunnels") return tunnels if isinstance(tunnels, list) else [] def _existing_ngrok_public_url(api_url: str, port: int) -> str | None: for tunnel in _list_ngrok_tunnels(api_url): public_url = str(tunnel.get("public_url") or "").strip() config = tunnel.get("config") or {} addr = str(config.get("addr") or "").strip() if public_url and addr.endswith(f":{port}"): return public_url.rstrip("/") return None def _create_ngrok_tunnel(api_url: str, port: int) -> str | None: response = requests.post( f"{api_url}/api/tunnels", json={ "name": f"http-{port}-kapo", "addr": str(port), "proto": "http", }, timeout=10, ) response.raise_for_status() payload = response.json() return str(payload.get("public_url") or "").strip().rstrip("/") or None def _publish_brain_presence(public_url: str, *, source: str = "runtime") -> None: normalized = str(public_url or "").strip().rstrip("/") if not normalized: return payload = { "url": normalized, "status": "healthy", "source": source, "platform": "kaggle" if _is_kaggle_runtime() else str(os.getenv("BRAIN_PROVIDER") or "remote"), "role": os.getenv("BRAIN_PRIMARY_ROLE", "fallback"), "roles": [part.strip() for part in os.getenv("BRAIN_ROLES", "supervisor,chat,coding,planner,arabic,fallback").split(",") if part.strip()], "languages": [part.strip() for part in os.getenv("BRAIN_LANGUAGES", "ar,en").split(",") if part.strip()], "model_profile_id": os.getenv("MODEL_PROFILE_ID") or os.getenv("SUPERVISOR_MODEL_PROFILE_ID") or DEFAULT_MODEL_PROFILE_ID, "model_repo": MODEL_META.get("repo_id") or os.getenv("MODEL_REPO") or DEFAULT_MODEL_REPO, "model_file": MODEL_META.get("filename") or os.getenv("MODEL_FILE") or DEFAULT_MODEL_FILE, "updated_at": time.time(), } FIREBASE.set_document("brains", normalized, payload) FIREBASE.set_document( "runtime", "brains_last_report", { "brain_url": normalized, "source": source, "updated_at": time.time(), "provider": payload["platform"], "model_profile_id": payload["model_profile_id"], }, ) FIREBASE.set_document( "tunnels", f"brain_{normalized}", { "kind": "brain", "public_url": normalized, "provider": "ngrok" if "ngrok" in normalized else payload["platform"], "updated_at": time.time(), }, ) def _report_brain_url(public_url: str) -> None: _publish_brain_presence(public_url, source="report_attempt") executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") if not executor_url: return if not _should_report_brain_url(public_url): return last_error: Exception | None = None connect_timeout = max(1.0, float(os.getenv("BRAIN_REPORT_CONNECT_TIMEOUT_SEC", "4.0") or 4.0)) read_timeout = max(5.0, float(os.getenv("BRAIN_REPORT_READ_TIMEOUT_SEC", "15.0") or 15.0)) retries = max(1, int(os.getenv("BRAIN_REPORT_RETRIES", "2") or 2)) for _ in range(retries): try: response = requests.post( f"{executor_url}/brain/report-url", json={ "brain_url": public_url, "platform": "kaggle" if _is_kaggle_runtime() else "remote", "role": os.getenv("BRAIN_PRIMARY_ROLE", "fallback"), "roles": [part.strip() for part in os.getenv("BRAIN_ROLES", "supervisor,chat,coding,planner,arabic,fallback").split(",") if part.strip()], "languages": [part.strip() for part in os.getenv("BRAIN_LANGUAGES", "ar,en").split(",") if part.strip()], "model_profile_id": os.getenv("MODEL_PROFILE_ID") or os.getenv("SUPERVISOR_MODEL_PROFILE_ID") or DEFAULT_MODEL_PROFILE_ID, "model_repo": MODEL_META.get("repo_id") or os.getenv("MODEL_REPO") or DEFAULT_MODEL_REPO, "model_file": MODEL_META.get("filename") or os.getenv("MODEL_FILE") or DEFAULT_MODEL_FILE, }, headers=_brain_headers(), timeout=(connect_timeout, read_timeout), ) response.raise_for_status() _publish_brain_presence(public_url, source="executor_report") return except Exception as exc: last_error = exc time.sleep(1) logger.info( "Brain URL report to executor timed out or failed; continuing (%s)", last_error, ) def _pull_executor_settings() -> dict[str, Any]: if _shared_state_backend() in {"google_drive", "drive", "gdrive"} or _drive_bootstrap_configured(): return {} executor_url = _load_saved_executor_url() if not executor_url: return {} try: response = requests.get( f"{executor_url}/share/settings", headers=_brain_headers(), timeout=( max(1.5, float(os.getenv("EXECUTOR_CONNECT_TIMEOUT_SEC", "3.0") or 3.0)), max(2.0, float(os.getenv("EXECUTOR_SETTINGS_READ_TIMEOUT_SEC", "5.0") or 5.0)), ), ) if response.status_code == 200: return response.json() logger.warning("Executor settings request failed: %s", response.text[:400]) except Exception as exc: logger.warning("Failed to pull executor settings (%s)", exc) return {} def start_ngrok(token: str | None = None) -> str | None: restart_reuse = str(os.getenv("KAPO_RESTART_REUSE_PUBLIC_URL", "")).strip().lower() in {"1", "true", "yes", "on"} if restart_reuse: saved_public_url = _load_saved_public_url() if saved_public_url: _remember_public_url(saved_public_url) _report_brain_url(saved_public_url) FIREBASE.set_document("brains", saved_public_url, {"url": saved_public_url, "status": "healthy", "source": "restart_reuse"}) FIREBASE.set_document("tunnels", f"brain_{saved_public_url}", {"kind": "brain", "public_url": saved_public_url, "provider": "ngrok"}) logger.info("Reusing saved brain public URL after restart: %s", saved_public_url) os.environ["KAPO_RESTART_REUSE_PUBLIC_URL"] = "0" return saved_public_url configured_public_url = _configured_public_url() if configured_public_url and _prefer_configured_public_url(): _remember_public_url(configured_public_url) _report_brain_url(configured_public_url) FIREBASE.set_document("brains", configured_public_url, {"url": configured_public_url, "status": "healthy", "source": "configured_public_url"}) FIREBASE.set_document("tunnels", f"brain_{configured_public_url}", {"kind": "brain", "public_url": configured_public_url, "provider": "configured"}) logger.info("Using configured brain public URL without starting ngrok: %s", configured_public_url) return configured_public_url if not _ngrok_bootstrap_enabled(): logger.info("Skipping ngrok bootstrap because BRAIN_AUTO_NGROK is disabled") return None try: authtoken = str(token or os.getenv("NGROK_AUTHTOKEN") or "").strip() if not authtoken: return None port = int(os.getenv("BRAIN_PORT", "7860")) api_url = _find_live_ngrok_api() if not api_url: api_url = _start_detached_ngrok_agent(authtoken) if not api_url: logger.warning("Ngrok agent did not expose a local API URL") return None public_url = _existing_ngrok_public_url(api_url, port) if not public_url: public_url = _create_ngrok_tunnel(api_url, port) if not public_url: return None match = re.search(r"https://[A-Za-z0-9.-]+", public_url) if match: public_url = match.group(0) _remember_ngrok_api_url(api_url) _remember_public_url(public_url) _report_brain_url(public_url) FIREBASE.set_document("brains", public_url, {"url": public_url, "status": "healthy", "source": "ngrok_bootstrap"}) FIREBASE.set_document("tunnels", f"brain_{public_url}", {"kind": "brain", "public_url": public_url, "provider": "ngrok", "api_url": api_url}) return public_url except Exception: logger.exception("Ngrok startup failed") return None def _report_known_public_url() -> str | None: public_url = _load_saved_public_url() if not public_url: return None _remember_public_url(public_url) _report_brain_url(public_url) FIREBASE.set_document("brains", public_url, {"url": public_url, "status": "healthy", "source": "saved_public_url"}) logger.info("Reported known brain public URL without starting ngrok: %s", public_url) return public_url def _retry_publish_public_url(attempts: int = 8, delay_sec: float = 12.0) -> None: for attempt in range(max(1, int(attempts))): try: public_url = _report_known_public_url() if not public_url and _auto_publish_public_url_on_startup(): public_url = start_ngrok(os.getenv("NGROK_AUTHTOKEN") or None) if public_url: logger.info("Recovered brain public URL on retry attempt %s: %s", attempt + 1, public_url) return except Exception: logger.warning("Public URL retry attempt %s failed", attempt + 1, exc_info=True) time.sleep(max(2.0, float(delay_sec))) logger.warning("Brain public URL retry loop exhausted without a published URL") def _ensure_public_url_background(start_tunnel: bool = False) -> None: global PUBLIC_URL_RETRY_STARTED current = str(os.getenv("BRAIN_PUBLIC_URL") or LAST_BRAIN_URL_REPORT.get("url") or _load_saved_public_url() or "").strip() if current or PUBLIC_URL_RETRY_STARTED: return if not start_tunnel and not _auto_publish_public_url_on_startup(): return PUBLIC_URL_RETRY_STARTED = True threading.Thread( target=_retry_publish_public_url, kwargs={"attempts": 8, "delay_sec": 12.0}, daemon=True, ).start() def _bootstrap_executor_handshake(start_tunnel: bool = False) -> None: executor_url = os.getenv("EXECUTOR_URL", "").strip() if not executor_url: if start_tunnel: public_url = start_ngrok(os.getenv("NGROK_AUTHTOKEN") or None) if public_url: logger.info("Brain public URL started locally without executor handshake: %s", public_url) else: logger.info("Brain started without publishing a public URL") _ensure_public_url_background(start_tunnel=True) return logger.info("Skipping executor handshake: EXECUTOR_URL not configured") _ensure_public_url_background(start_tunnel=start_tunnel) return settings = _pull_executor_settings() _apply_executor_settings(settings) public_url = None if start_tunnel: public_url = start_ngrok(os.getenv("NGROK_AUTHTOKEN") or None) if not public_url: public_url = _report_known_public_url() else: public_url = _report_known_public_url() if public_url: logger.info("Brain public URL reported to executor: %s", public_url) else: logger.info("Brain started without publishing a public URL") _ensure_public_url_background(start_tunnel=start_tunnel) @app.on_event("startup") async def startup_event(): global RUNTIME_STATE_THREAD_STARTED try: _bootstrap_shared_state() except Exception: logger.exception("Shared-state bootstrap failed") try: _prepare_runtime_environment() except Exception: logger.exception("Runtime environment bootstrap failed") try: _run_startup_self_update() except Exception: logger.exception("Startup self-update bootstrap failed") internal_restart = _internal_restart_in_progress() fast_restart = internal_restart and _fast_restart_enabled() if not fast_restart: try: settings = _pull_executor_settings() _apply_executor_settings(settings) except Exception: logger.exception("Executor settings bootstrap failed") try: _apply_firebase_runtime_settings() except Exception: logger.exception("Firebase runtime bootstrap failed") else: logger.info("Fast internal restart enabled; skipping executor/Firebase startup bootstrap") if not fast_restart: _load_default_model() try: if not fast_restart: _load_embed_model() except Exception: logger.exception("Embedding model startup failed") try: start_tunnel = _auto_publish_public_url_on_startup() and not internal_restart _bootstrap_executor_handshake(start_tunnel=start_tunnel) except Exception: logger.exception("Executor handshake startup failed") finally: os.environ["KAPO_INTERNAL_RESTART"] = "0" _persist_runtime_state_snapshot(reason="startup") if not RUNTIME_STATE_THREAD_STARTED: RUNTIME_STATE_THREAD_STARTED = True threading.Thread(target=_runtime_state_pulse, daemon=True).start() class ModelLoadRequest(BaseModel): repo_id: str filename: str hf_token: str | None = None class ConnectionInit(BaseModel): executor_url: str ngrok_token: str | None = None class PublishUrlRequest(BaseModel): ngrok_token: str | None = None public_url: str | None = None start_tunnel: bool = True class RestartRequest(BaseModel): delay_sec: float = 1.0 class FileWriteRequest(BaseModel): path: str content: str = "" overwrite: bool = True class FileDeleteRequest(BaseModel): path: str recursive: bool = False class FileMkdirRequest(BaseModel): path: str class ChatRequest(BaseModel): request_id: str user_input: str context: dict[str, Any] = {} history: list[dict[str, str]] = [] auto_execute: bool = True def _contains_arabic(text: str) -> bool: return bool(re.search(r"[\u0600-\u06FF]", text or "")) def _detect_language(text: str) -> str: return "ar" if _contains_arabic(text) else "en" def _is_task_request(text: str) -> bool: lower = (text or "").strip().lower() task_words = [ "build", "fix", "debug", "create project", "generate project", "implement", "refactor", "run", "execute", "install", "modify", "edit", "update", "write code", "make app", "انشئ", "أنشئ", "اعمل", "نفذ", "شغل", "اصلح", "أصلح", "عدّل", "عدل", "ابني", "كوّن مشروع", ] return any(word in lower for word in task_words) def _is_research_request(text: str) -> bool: lower = (text or "").strip().lower() research_words = [ "search", "research", "look up", "find out", "web", "browse", "ابحث", "ابحث عن", "دور", "فتش", "معلومة عن", "معلومات عن", ] return any(word in lower for word in research_words) def _is_knowledge_request(text: str, context: dict[str, Any] | None = None) -> bool: context = context or {} if bool(context.get("use_executor_knowledge")): return True lower = (text or "").strip().lower() knowledge_words = [ "remember", "memory", "knowledge", "docs", "documentation", "project structure", "architecture", "تذكر", "الذاكرة", "المعرفة", "الوثائق", "الدليل", "بنية المشروع", "هيكل المشروع", "معمارية", ] return any(word in lower for word in knowledge_words) def _prune_history(history: list[dict[str, str]], keep_last: int = 6) -> list[dict[str, str]]: if len(history) <= keep_last: return history return history[-keep_last:] def _retrieve_knowledge(query: str, top_k: int = 4) -> list[dict[str, Any]]: executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") expanded_query = _expand_project_query(query) if _executor_roundtrip_allowed("BRAIN_REMOTE_KNOWLEDGE_ENABLED", default=True): try: response = requests.get( f"{executor_url}/rag/search", params={"query": expanded_query, "top_k": top_k}, headers=_brain_headers(), timeout=( _executor_connect_timeout(), _executor_read_timeout("BRAIN_REMOTE_KNOWLEDGE_TIMEOUT_SEC", 6.0), ), ) if response.status_code == 200: payload = response.json() results = payload.get("results", []) if isinstance(results, list): return results except requests.exceptions.ReadTimeout: logger.info("Executor knowledge retrieval timed out; continuing without remote knowledge") except Exception: logger.warning("Executor knowledge retrieval failed", exc_info=True) if _remote_brain_only() or not _feature_enabled("BRAIN_LOCAL_RAG_FALLBACK_ENABLED", default=False): return [] try: from rag.retriever import retrieve return retrieve(expanded_query, top_k=top_k) except Exception: logger.warning("Knowledge retrieval failed", exc_info=True) return [] def _search_web(query: str) -> list[dict[str, Any]]: executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") if not _executor_roundtrip_allowed("BRAIN_REMOTE_WEB_SEARCH_ENABLED", default=True): return [] try: response = requests.post( f"{executor_url}/tools/search", json={"query": query, "num_results": 5}, headers=_brain_headers(), timeout=( _executor_connect_timeout(), _executor_read_timeout("BRAIN_REMOTE_WEB_SEARCH_TIMEOUT_SEC", 8.0), ), ) if response.status_code == 200: payload = response.json() results = payload.get("results", []) return results if isinstance(results, list) else [] except Exception: logger.warning("Web search failed", exc_info=True) return [] def _format_context_blocks(knowledge: list[dict[str, Any]], web_results: list[dict[str, Any]]) -> str: blocks: list[str] = [] if knowledge: lines = [] for item in knowledge[:4]: source = item.get("source", "knowledge") content = item.get("content", "") or item.get("text", "") lines.append(f"- [{source}] {content[:500]}") blocks.append("Knowledge:\n" + "\n".join(lines)) if web_results: lines = [] for item in web_results[:5]: lines.append(f"- {item.get('title', '')}: {item.get('snippet', '')}") blocks.append("Web:\n" + "\n".join(lines)) return "\n\n".join(blocks).strip() def _project_context_tags(text: str) -> list[str]: source = str(text or "") lowered = source.lower() tags: list[str] = [] tag_rules = [ ("brain_runtime", ["العقل", "brain", "model", "موديل", "النموذج"]), ("executor_runtime", ["الوكيل التنفيذي", "executor", "agent"]), ("tunnel_runtime", ["النفق", "tunnel", "ngrok", "cloudflare"]), ("url_routing", ["الرابط", "url", "endpoint", "لينك"]), ("restart_sync", ["restart", "ريستارت", "إعادة تشغيل", "اعادة تشغيل", "sync", "مزامنة"]), ("knowledge_memory", ["memory", "ذاكرة", "معرفة", "knowledge", "rag", "طبقات", "embedding", "embeddings"]), ("kaggle_runtime", ["kaggle", "كاجل"]), ] for tag, markers in tag_rules: if any(marker in source or marker in lowered for marker in markers): tags.append(tag) return tags def _expand_project_query(query: str) -> str: tags = _project_context_tags(query) additions: list[str] = [] if "tunnel_runtime" in tags: additions.append("ngrok tunnel public url reverse proxy runtime restart") if "restart_sync" in tags: additions.append("system restart sync uvicorn process reuse public url") if "executor_runtime" in tags: additions.append("executor share settings control plane local machine") if "knowledge_memory" in tags: additions.append("knowledge layers rag embeddings preferences profile") if "brain_runtime" in tags: additions.append("brain server kaggle model startup runtime") return query if not additions else f"{query}\nContext expansion: {' | '.join(additions)}" def _fetch_style_profile() -> dict[str, Any]: firebase_profile = FIREBASE.get_document("profiles", "style", ttl_sec=30.0) if firebase_profile: return firebase_profile executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") if not executor_url or not _executor_roundtrip_allowed("BRAIN_REMOTE_STYLE_PROFILE_ENABLED", default=False): return {} try: response = requests.get( f"{executor_url}/preferences/profile", headers=_brain_headers(), timeout=( _executor_connect_timeout(), _executor_read_timeout("BRAIN_REMOTE_STYLE_PROFILE_TIMEOUT_SEC", 2.5), ), ) if response.status_code != 200: return {} payload = response.json().get("profile", {}) return payload if isinstance(payload, dict) else {} except requests.exceptions.ReadTimeout: logger.info("Style profile load timed out; continuing without remote style profile") return {} except Exception: logger.warning("Failed to load style profile", exc_info=True) return {} def _render_style_profile_context(profile: dict[str, Any]) -> str: if not profile: return "" preferences = profile.get("preferences", []) or [] examples = profile.get("examples", []) or [] lexical_signals = profile.get("lexical_signals", []) or [] style_markers = profile.get("style_markers", {}) or {} persona_summary = str(profile.get("persona_summary") or "").strip() response_contract = str(profile.get("response_contract") or "").strip() lines: list[str] = [] if persona_summary: lines.append(f"User persona summary: {persona_summary}") if response_contract: lines.append(f"Response contract: {response_contract}") if preferences: lines.append("User Style Preferences:") for item in preferences[:10]: lines.append(f"- {item}") if lexical_signals: lines.append("User Lexical Signals:") for item in lexical_signals[:10]: lines.append(f"- {item}") if style_markers: lines.append("Style Markers:") for key, value in sorted(style_markers.items()): lines.append(f"- {key}: {value}") if examples: lines.append("Recent User Style Examples:") for sample in examples[-3:]: user_text = str(sample.get("user_input") or "").strip() assistant_text = str(sample.get("assistant_reply") or "").strip() if user_text: lines.append(f"- User: {user_text}") if assistant_text: lines.append(f" Assistant: {assistant_text}") return "\n".join(lines).strip() def _project_domain_context(user_input: str, context: dict[str, Any] | None = None) -> str: tags = _project_context_tags(user_input) if not tags: return "" lines = [ "Project Domain Glossary:", "- In this project, terms like العقل, الوكيل التنفيذي, النفق, الرابط, الريستارت, المزامنة, الذاكرة, والطبقات usually refer to software runtime and operations concepts.", "- Treat النفق as ngrok or cloudflare tunnel unless the user explicitly asks for a literal/civil meaning.", "- Treat الرابط as public URL, endpoint, or routing target when the surrounding context mentions deployment, Kaggle, restart, or sync.", "- Treat العقل as the remote Brain service/model runtime, and الوكيل التنفيذي as the local executor/control plane on the user's device.", ] if "restart_sync" in tags: lines.append("- Restart means process/service restart; preserving the same public URL matters more than creating a fresh tunnel.") if "knowledge_memory" in tags: lines.append("- Knowledge, embeddings, layers, and memory refer to the RAG and memory system inside this project.") if "kaggle_runtime" in tags: lines.append("- Kaggle here is the remote runtime hosting the Brain service.") role_name = str((context or {}).get("role_name") or "").strip() if role_name: lines.append(f"- Current assigned role: {role_name}.") return "\n".join(lines) def _firebase_runtime_context(role_name: str, language: str) -> str: snapshot = _firebase_runtime_snapshot() lines: list[str] = [] roles = snapshot.get("roles") or [] if roles: enabled_roles = [ str(item.get("name") or item.get("id") or "").strip() for item in roles if str(item.get("enabled", True)).strip().lower() not in {"0", "false", "no", "off"} ] enabled_roles = [item for item in enabled_roles if item] if enabled_roles: lines.append("Live roles from Firestore: " + ", ".join(enabled_roles[:12])) models = snapshot.get("models") or [] if models: preferred = [ item for item in models if str(item.get("enabled", True)).strip().lower() not in {"0", "false", "no", "off"} ] if preferred: labels = [str(item.get("label") or item.get("id") or "").strip() for item in preferred[:5]] labels = [item for item in labels if item] if labels: lines.append("Live model profiles: " + ", ".join(labels)) platforms = snapshot.get("platforms") or [] if platforms: names = [str(item.get("name") or "").strip() for item in platforms[:6] if str(item.get("name") or "").strip()] if names: lines.append("Live platforms: " + ", ".join(names)) prompt_body = _firebase_prompt_body(role_name or "chat", language) or _firebase_prompt_body(role_name or "chat", "en") if prompt_body: lines.append(f"Live Firestore prompt for role '{role_name or 'chat'}': {prompt_body}") return "\n".join(lines).strip() def _append_runtime_instructions(context_block: str, context: dict[str, Any]) -> str: instructions = str((context or {}).get("system_instructions") or "").strip() role_name = str((context or {}).get("role_name") or "").strip() user_input = str((context or {}).get("user_input") or "").strip() language = _detect_language(user_input) style_profile = _render_style_profile_context(_fetch_style_profile()) domain_context = _project_domain_context(user_input, context) firebase_context = _firebase_runtime_context(role_name, language) if not instructions and not role_name and not style_profile and not domain_context and not firebase_context: return context_block extra = [] if role_name: extra.append(f"Assigned role: {role_name}") if instructions: extra.append(instructions) if firebase_context: extra.append(firebase_context) if style_profile: extra.append(style_profile) if domain_context: extra.append(domain_context) extra_block = "Runtime Instructions:\n" + "\n".join(extra) return (context_block + "\n\n" + extra_block).strip() if context_block else extra_block def _extract_exact_reply_instruction(user_input: str) -> str: text = (user_input or "").strip() patterns = [ r'(?is)reply\s+with\s+exactly\s+[:"]?\s*(.+?)\s*[".]?$', r'(?is)respond\s+with\s+exactly\s+[:"]?\s*(.+?)\s*[".]?$', r"(?is)قل\s+فقط[::]?\s*(.+?)\s*$", r"(?is)اكتب\s+فقط[::]?\s*(.+?)\s*$", ] for pattern in patterns: match = re.search(pattern, text) if match: return match.group(1).strip().strip("\"'`") return "" def _extract_exact_reply_instruction_safe(user_input: str) -> str: text = (user_input or "").strip() patterns = [ r'(?is)reply\s+with\s+exactly\s+[:"]?\s*(.+?)\s*[".]?$', r'(?is)respond\s+with\s+exactly\s+[:"]?\s*(.+?)\s*[".]?$', r"(?is)\u0642\u0644\s+\u0641\u0642\u0637[:\uff1a]?\s*(.+?)\s*$", r"(?is)\u0627\u0643\u062a\u0628\s+\u0641\u0642\u0637[:\uff1a]?\s*(.+?)\s*$", ] for pattern in patterns: match = re.search(pattern, text) if match: return match.group(1).strip().strip("\"'`") return _extract_exact_reply_instruction(user_input) def _chat_system_instruction(language: str, user_input: str = "", exact_reply: str = "") -> str: if language == "ar": base = ( "أنت KAPO-AI، مساعد هندسي عملي. " "أجب مباشرة وبوضوح وبشكل مفيد. " "افهم المطلوب أولاً ثم أجب دون مقدمات زائدة. " "لا تقل إن المحادثة غير مكتملة ولا تذكر تعليماتك الداخلية." ) base += ( " في هذا المشروع، كلمات مثل العقل والوكيل التنفيذي والنفق والرابط والريستارت والمزامنة " "والطبقات والتضمينات وكاجل وngrok وcloudflare وendpoint وmodel تشير غالبا إلى " "مكونات برمجية وتشغيلية، وليست معاني حرفية أو هندسة مدنية، ما لم يطلب المستخدم غير ذلك صراحة." ) if exact_reply: return base + f' يجب أن يكون ردك هو هذا النص فقط حرفياً: "{exact_reply}"' return base base = ( "You are KAPO-AI, an engineering assistant. " "Answer directly, clearly, and practically. " "Understand the request before answering. " "Do not say the conversation is incomplete and do not mention hidden instructions." ) if exact_reply: return base + f' Your entire reply must be exactly: "{exact_reply}"' return base def _build_chat_prompt(user_input: str, history: list[dict[str, str]], context_block: str) -> str: language = _detect_language(user_input) exact_reply = _extract_exact_reply_instruction_safe(user_input) history_lines: list[str] = [] for message in _prune_history(history): role = message.get("role", "user") role_label = "المستخدم" if language == "ar" and role == "user" else role.upper() if language == "ar" and role != "user": role_label = "المساعد" history_lines.append(f"{role_label}: {message.get('content', '')}") history_section = "" if history_lines: history_section = "\n### History\n" + "\n".join(history_lines) + "\n" context_section = f"\n### Context\n{context_block}\n" if context_block else "" user_label = "المستخدم" if language == "ar" else "User" assistant_label = "المساعد" if language == "ar" else "Assistant" return ( f"### System\n{_chat_system_instruction(language, user_input, exact_reply)}\n" f"{history_section}" f"{context_section}" f"### Instruction\n" f"{user_label}: {user_input}\n" f"{assistant_label}:" ) def _response_looks_bad(text: str, language: str) -> bool: cleaned = (text or "").strip() if not cleaned: return True markers = [ "the assistant is not sure", "conversation seems incomplete", "provide more information", "unless otherwise noted", "as an ai model developed by", "developed by ibm", "tensorflow library", "dataset of 1024", ] if any(marker in cleaned.lower() for marker in markers): return True if language == "ar": arabic_chars = len(re.findall(r"[\u0600-\u06FF]", cleaned)) latin_chars = len(re.findall(r"[A-Za-z]", cleaned)) if arabic_chars < 8 and latin_chars > max(12, arabic_chars * 2): return True return False def _fallback_response(user_input: str) -> str: if _detect_language(user_input) == "ar": return "فهمت رسالتك، لكن الرد المولد لم يكن صالحاً للاستخدام. أعد صياغة الطلب بشكل أكثر تحديداً." return "I understood your message, but the generated reply was not usable. Please rephrase the request more specifically." def _project_specific_fast_reply(user_input: str) -> str: text = (user_input or "").strip() lower = text.lower() if any(token in text for token in ("ايه اللي اتصلح", "إيه اللي اتصلح", "هو ايه اللي اتصلح", "هو إيه اللي اتصلح")) and any( token in text or token in lower for token in ("النفق", "الرابط", "الريستارت", "restart") ): return ( "الذي اتصلح هو أن الريستارت الداخلي بقى يعيد تشغيل خدمة العقل نفسها من غير ما يكسر النفق أو يغيّر الرابط العام. " "ولو ظهر توقف قصير أثناء الإقلاع فهذا يكون من رجوع الخدمة، لا من إنشاء نفق جديد." ) if "النفق" in text and any(token in text for token in ("إصلاح", "اصلاح", "الذي تم", "اتصلح", "تم إصلاح", "تم اصلاح")): return ( "تم فصل دورة حياة النفق عن دورة حياة خدمة العقل، فأصبح ngrok يعمل كعامل مستقل عن عملية Uvicorn. " "وبالتالي يحتفظ /system/restart بنفس الرابط العام بدل إنشاء رابط جديد، وقد يظهر ERR_NGROK_8012 مؤقتًا فقط أثناء الإقلاع." ) if "نفس الرابط" in text and ("ريستارت" in text or "restart" in lower): return ( "الهدف هنا أن تعيد الخدمة الإقلاع داخليًا مع الإبقاء على نفس الـ public URL. " "لذلك يبقى tunnel حيًا، بينما تعود خدمة localhost:7860 للعمل بعد ثوانٍ قليلة على نفس الرابط." ) return "" def _hf_chat_messages(user_input: str, history: list[dict[str, str]], context_block: str) -> list[dict[str, str]]: messages: list[dict[str, str]] = [] context_text = str(context_block or "").strip() if context_text: messages.append({"role": "system", "content": context_text}) for item in history or []: role = str((item or {}).get("role") or "").strip().lower() content = str((item or {}).get("content") or "").strip() if role in {"system", "user", "assistant"} and content: messages.append({"role": role, "content": content}) messages.append({"role": "user", "content": str(user_input or "").strip()}) return messages def _hf_chat_completion_text(client: Any, messages: list[dict[str, str]], max_tokens: int) -> str: result = client.chat_completion(messages=messages, max_tokens=max_tokens) choices = getattr(result, "choices", None) if choices is None and isinstance(result, dict): choices = result.get("choices") choices = choices or [] if not choices: return "" first = choices[0] message = getattr(first, "message", None) if message is None and isinstance(first, dict): message = first.get("message") content = getattr(message, "content", None) if content is None and isinstance(message, dict): content = message.get("content") if isinstance(content, list): parts: list[str] = [] for item in content: if isinstance(item, dict): text = item.get("text") if text: parts.append(str(text)) elif item: parts.append(str(item)) content = "\n".join(part for part in parts if part.strip()) return str(content or "").strip() def _generate_response(user_input: str, history: list[dict[str, str]], context_block: str) -> str: language = _detect_language(user_input) exact_reply = _extract_exact_reply_instruction_safe(user_input) if exact_reply: return exact_reply fast_reply = _project_specific_fast_reply(user_input) if fast_reply: return fast_reply if MODEL is None: provider = str(os.getenv("BRAIN_PROVIDER", "") or os.getenv("BRAIN_TEMPLATE", "") or "").strip().lower() if _feature_enabled("KAPO_HF_INFERENCE_API", default=False) or "huggingface" in provider or "hf-space" in provider or _hf_transformers_runtime_enabled(): try: from huggingface_hub import InferenceClient prompt = _build_chat_prompt(user_input, history, context_block) messages = _hf_chat_messages(user_input, history, context_block) max_tokens = 80 if language == "ar" else 96 model_repo = str(os.getenv("MODEL_REPO", DEFAULT_MODEL_REPO) or DEFAULT_MODEL_REPO).strip() client = InferenceClient(model=model_repo, api_key=(str(os.getenv("HF_TOKEN", "") or "").strip() or None)) try: generated_text = _hf_chat_completion_text(client, messages, max_tokens) if generated_text: return generated_text except Exception as exc: logger.info("HF chat-completion path failed; falling back to text-generation (%s)", exc) generated = client.text_generation( prompt, max_new_tokens=max_tokens, return_full_text=False, ) generated_text = str(generated or "").strip() if generated_text: return generated_text except Exception as exc: logger.warning("HF inference fallback failed: %s", exc, exc_info=True) if language == "ar": return "الخدمة تعمل لكن توليد الرد الحر غير متاح الآن لأن النموذج غير محمل." return "The Brain is online, but natural chat generation is unavailable because the model is not loaded." prompt = _build_chat_prompt(user_input, history, context_block) try: max_tokens = 80 if language == "ar" else 96 output = MODEL( prompt, max_tokens=max_tokens, temperature=0.1, top_p=0.85, stop=["\nUser:", "\nUSER:", "\nالمستخدم:", "\n###", "<|EOT|>"], ) text = output["choices"][0]["text"].strip() if _response_looks_bad(text, language): return _fallback_response(user_input) return text or ("تم استلام رسالتك." if language == "ar" else "I received your message.") except Exception: logger.exception("Model generation failed") if language == "ar": return "فهمت طلبك، لكن فشل توليد الرد النصي." return "I understood your request, but text generation failed." def _store_chat_trace(request_id: str, payload: dict[str, Any]) -> None: executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") if not _executor_roundtrip_allowed("BRAIN_REMOTE_TRACE_STORE_ENABLED", default=True): return if not executor_url: return try: requests.post( f"{executor_url}/memory/store", json={"request_id": request_id, "payload": payload}, headers=_brain_headers(), timeout=( _executor_connect_timeout(), _executor_read_timeout("BRAIN_REMOTE_TRACE_STORE_TIMEOUT_SEC", 2.5), ), ) except requests.exceptions.ReadTimeout: logger.info("Chat trace store timed out; continuing") except requests.exceptions.ConnectionError: logger.info("Chat trace store skipped because executor is unreachable") except Exception: logger.warning("Failed to store chat trace on executor", exc_info=True) def _ingest_chat_knowledge(request_id: str, user_input: str, reply: str) -> None: if len(reply or "") < 180: return executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") if not _executor_roundtrip_allowed("BRAIN_REMOTE_AUTO_INGEST_ENABLED", default=False): return payload = { "request_id": request_id, "payload": { "source": "auto_chat", "content": f"User: {user_input}\nAssistant: {reply}", }, } try: requests.post( f"{executor_url}/rag/ingest", json=payload, headers=_brain_headers(), timeout=( _executor_connect_timeout(), _executor_read_timeout("BRAIN_REMOTE_AUTO_INGEST_TIMEOUT_SEC", 3.0), ), ) except requests.exceptions.ReadTimeout: logger.info("Auto-ingest chat knowledge timed out; continuing") except Exception: logger.warning("Failed to auto-ingest chat knowledge", exc_info=True) def _learn_user_style(request_id: str, user_input: str, reply: str, context: dict[str, Any]) -> None: executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") if not executor_url or not _executor_roundtrip_allowed("BRAIN_REMOTE_STYLE_PROFILE_ENABLED", default=False): return try: requests.post( f"{executor_url}/preferences/learn", json={ "request_id": request_id, "user_input": user_input, "assistant_reply": reply, "context": context or {}, }, headers=_brain_headers(), timeout=( _executor_connect_timeout(), _executor_read_timeout("BRAIN_REMOTE_TRACE_STORE_TIMEOUT_SEC", 2.5), ), ) except requests.exceptions.ReadTimeout: logger.info("Style learning timed out; continuing") except Exception: logger.warning("Failed to learn user style on executor", exc_info=True) def _dispatch_background(task, *args) -> None: try: threading.Thread(target=task, args=args, daemon=True).start() except Exception: logger.warning("Background task dispatch failed", exc_info=True) def _restart_process(delay_sec: float = 1.0) -> None: def _run() -> None: time.sleep(max(0.2, float(delay_sec))) target_root = _sync_target_root() os.chdir(target_root) os.environ["KAPO_INTERNAL_RESTART"] = "1" os.environ.setdefault("KAPO_FAST_INTERNAL_RESTART", "1") if _reuse_public_url_on_restart(): current_public_url = _load_saved_public_url() if current_public_url: _remember_public_url(current_public_url) os.environ["KAPO_RESTART_REUSE_PUBLIC_URL"] = "1" port = str(os.getenv("BRAIN_PORT", "7860") or "7860") app_module = str(os.getenv("KAPO_UVICORN_APP") or "").strip() if not app_module: if (Path(target_root) / "brain_server" / "api" / "main.py").exists(): app_module = "brain_server.api.main:app" elif (Path(target_root) / "api" / "main.py").exists(): app_module = "api.main:app" else: app_module = "brain_server.api.main:app" os.execv( sys.executable, [ sys.executable, "-m", "uvicorn", app_module, "--host", "0.0.0.0", "--port", port, ], ) threading.Thread(target=_run, daemon=True).start() @app.get("/") async def root(): return {"status": "ok", "service": "brain_server", "docs": "/docs", "health": "/health"} @app.get("/runtime/firebase") async def runtime_firebase_snapshot(): try: return {"status": "ok", "firebase": _json_safe(_firebase_runtime_snapshot())} except Exception as exc: logger.warning("Failed to build firebase runtime snapshot", exc_info=True) return {"status": "degraded", "firebase": {"platforms": [], "models": [], "prompts": [], "roles": []}, "detail": str(exc)} @app.get("/runtime/errors") async def runtime_errors(limit: int = 50, level: str = "WARNING"): normalized = str(level or "WARNING").strip().upper() allowed = {"DEBUG": 10, "INFO": 20, "WARNING": 30, "ERROR": 40, "CRITICAL": 50} threshold = allowed.get(normalized, 30) items = [item for item in list(RUNTIME_LOG_BUFFER) if allowed.get(str(item.get("level") or "").upper(), 0) >= threshold] return {"status": "ok", "count": len(items[-limit:]), "items": items[-limit:]} @app.get("/runtime/modernization") async def runtime_modernization(): try: return {"status": "ok", "modernization": _runtime_modernization_snapshot()} except Exception as exc: logger.warning("Failed to build runtime modernization snapshot", exc_info=True) return {"status": "degraded", "detail": str(exc), "modernization": {}} @app.get("/model/status") async def model_status(): return { "loaded": MODEL is not None, "error": MODEL_ERROR, "repo_id": MODEL_META.get("repo_id"), "filename": MODEL_META.get("filename"), } @app.post("/model/load") async def model_load(req: ModelLoadRequest): ensure_model_loaded(req.repo_id, req.filename, hf_token=req.hf_token) return await model_status() @app.post("/model/hotswap") async def model_hotswap(req: ModelLoadRequest): global MODEL if MODEL is not None: del MODEL gc.collect() MODEL = None ensure_model_loaded(req.repo_id, req.filename, hf_token=req.hf_token) return await model_status() @app.post("/embeddings") async def embeddings(payload: dict[str, Any]): if EMBED_MODEL is None: _load_embed_model() texts = payload.get("texts") or [] if not texts: return {"embeddings": []} return {"embeddings": EMBED_MODEL.encode(texts).tolist()} @app.post("/chat") async def chat(req: ChatRequest): try: exact_reply = _extract_exact_reply_instruction_safe(req.user_input) if exact_reply: runtime_context = {**(req.context or {}), "user_input": req.user_input} trace_payload = { "mode": "chat", "user_input": req.user_input, "reply": exact_reply, "plan": None, "execution": None, "knowledge": [], "web_results": [], "context": runtime_context, } memory = MemoryAgent() memory.write_short_term(req.request_id, trace_payload) return { "status": "ok", "mode": "chat", "reply": exact_reply, "plan": None, "rationale": None, "execution": None, "knowledge": [], "web_results": [], "timestamp": time.time(), } planner = PlannerAgent() reasoning = ReasoningAgent() memory = MemoryAgent() runtime_context = {**(req.context or {}), "user_input": req.user_input} routing_context = runtime_context.get("routing") if isinstance(runtime_context.get("routing"), dict) else {} requested_role = str(runtime_context.get("role_name") or routing_context.get("role") or "").strip().lower() mode = "task" if _is_task_request(req.user_input) else "chat" if not req.auto_execute and requested_role: mode = "chat" knowledge = _retrieve_knowledge(req.user_input, top_k=4) if _is_knowledge_request(req.user_input, req.context) else [] web_results = _search_web(req.user_input) if _is_research_request(req.user_input) else [] context_block = _append_runtime_instructions(_format_context_blocks(knowledge, web_results), runtime_context) response_text = "" plan_steps = None execution = None rationale = None if mode == "task": plan_steps = planner.run(req.user_input, req.context) rationale = reasoning.run(req.user_input, plan_steps) response_text = ( "سأتعامل مع هذه الرسالة كطلب تنفيذ، وقمت ببناء خطة مبدئية وسأبدأ التنفيذ تلقائياً." if _contains_arabic(req.user_input) else "I treated this as an execution request, built a plan, and started automatic execution." ) if req.auto_execute: from api.routes_execute import ExecuteRequest, execute as execute_route execution = await execute_route( ExecuteRequest( request_id=req.request_id, plan={"steps": plan_steps}, executor_url=os.getenv("EXECUTOR_URL", "").strip() or None, ) ) if execution.get("report", {}).get("success"): response_text += ( "\n\nتم التنفيذ بنجاح مبدئياً." if _contains_arabic(req.user_input) else "\n\nExecution completed successfully." ) else: response_text += ( "\n\nتمت المحاولة لكن توجد مخرجات تحتاج مراجعة." if _contains_arabic(req.user_input) else "\n\nExecution ran, but the result still needs review." ) else: response_text = _generate_response(req.user_input, req.history, context_block) trace_payload = { "mode": mode, "user_input": req.user_input, "reply": response_text, "plan": plan_steps, "execution": execution, "knowledge": knowledge, "web_results": web_results, "context": runtime_context, } memory.write_short_term(req.request_id, trace_payload) _dispatch_background(_store_chat_trace, req.request_id, trace_payload) _dispatch_background(_ingest_chat_knowledge, req.request_id, req.user_input, response_text) _dispatch_background(_learn_user_style, req.request_id, req.user_input, response_text, runtime_context) return { "status": "ok", "mode": mode, "reply": response_text, "plan": plan_steps, "rationale": rationale, "execution": execution, "knowledge": knowledge, "web_results": web_results, "timestamp": time.time(), } except Exception as exc: logger.exception("Chat failed") return { "status": "error", "mode": "chat", "reply": "حدث خطأ أثناء معالجة الرسالة." if _contains_arabic(req.user_input) else "Chat processing failed.", "detail": str(exc), "timestamp": time.time(), } @app.post("/init-connection") async def init_connection(payload: ConnectionInit): _remember_executor_url(payload.executor_url) FIREBASE.set_document("runtime", "executor", {"executor_url": payload.executor_url}) public_url = _report_known_public_url() if not public_url: public_url = start_ngrok(payload.ngrok_token) return {"status": "connected", "brain_public_url": public_url} @app.post("/system/publish-url") async def system_publish_url(req: PublishUrlRequest | None = None): payload = req or PublishUrlRequest() explicit_public_url = str(payload.public_url or "").strip().rstrip("/") if explicit_public_url: _remember_public_url(explicit_public_url) _report_brain_url(explicit_public_url) FIREBASE.set_document("brains", explicit_public_url, {"url": explicit_public_url, "status": "healthy", "source": "explicit_publish"}) return {"status": "published", "brain_public_url": explicit_public_url, "mode": "explicit"} if not payload.start_tunnel: public_url = _report_known_public_url() if public_url: return {"status": "published", "brain_public_url": public_url, "mode": "saved"} return {"status": "skipped", "brain_public_url": None, "mode": "none"} public_url = start_ngrok(payload.ngrok_token) if public_url: return {"status": "published", "brain_public_url": public_url, "mode": "ngrok"} public_url = _report_known_public_url() return {"status": "published" if public_url else "error", "brain_public_url": public_url, "mode": "saved" if public_url else "none"} @app.get("/system/files") async def system_files(path: str = "", include_content: bool = False): try: target = _resolve_sync_path(path) if not target.exists(): return {"status": "error", "detail": "Path not found", "path": path} if target.is_file(): payload = {"status": "ok", "entry": _describe_sync_entry(target)} if include_content: payload["content"] = target.read_text(encoding="utf-8", errors="ignore") return payload items = sorted((_describe_sync_entry(item) for item in target.iterdir()), key=lambda item: (not item["is_dir"], item["name"].lower())) return {"status": "ok", "root": str(_sync_root_path()), "path": path, "items": items} except Exception as exc: logger.exception("File listing failed") return {"status": "error", "detail": str(exc), "path": path} @app.post("/system/files/write") async def system_files_write(payload: FileWriteRequest): try: target = _resolve_sync_path(payload.path) target.parent.mkdir(parents=True, exist_ok=True) if target.exists() and target.is_dir(): return {"status": "error", "detail": "Target path is a directory"} if target.exists() and not payload.overwrite: return {"status": "error", "detail": "File already exists"} target.write_text(payload.content or "", encoding="utf-8") return {"status": "saved", "entry": _describe_sync_entry(target)} except Exception as exc: logger.exception("File write failed") return {"status": "error", "detail": str(exc), "path": payload.path} @app.post("/system/files/mkdir") async def system_files_mkdir(payload: FileMkdirRequest): try: target = _resolve_sync_path(payload.path) target.mkdir(parents=True, exist_ok=True) return {"status": "created", "entry": _describe_sync_entry(target)} except Exception as exc: logger.exception("Directory creation failed") return {"status": "error", "detail": str(exc), "path": payload.path} @app.delete("/system/files") async def system_files_delete(payload: FileDeleteRequest): try: target = _resolve_sync_path(payload.path) if not target.exists(): return {"status": "deleted", "path": payload.path, "existed": False} if target.is_dir(): if payload.recursive: shutil.rmtree(target) else: target.rmdir() else: target.unlink() return {"status": "deleted", "path": payload.path, "existed": True} except Exception as exc: logger.exception("Delete failed") return {"status": "error", "detail": str(exc), "path": payload.path} if HAS_MULTIPART: @app.post("/system/sync") async def sync_codebase(file: UploadFile = File(...), restart: bool = False): temp_zip = os.path.join(tempfile.gettempdir(), "kapo_update.zip") try: with open(temp_zip, "wb") as buffer: shutil.copyfileobj(file.file, buffer) with zipfile.ZipFile(temp_zip, "r") as zip_ref: zip_ref.extractall(_sync_target_root()) if restart: _restart_process() return {"status": "synced", "target_root": _sync_target_root(), "restart_scheduled": restart} except Exception as exc: logger.exception("Code sync failed") return {"status": "error", "detail": str(exc)} @app.post("/system/archive/upload") async def system_archive_upload(file: UploadFile = File(...), target_path: str = "", restart: bool = False): temp_zip = os.path.join(tempfile.gettempdir(), "kapo_archive_upload.zip") try: extract_root = _resolve_sync_path(target_path) extract_root.mkdir(parents=True, exist_ok=True) with open(temp_zip, "wb") as buffer: shutil.copyfileobj(file.file, buffer) with zipfile.ZipFile(temp_zip, "r") as zip_ref: zip_ref.extractall(extract_root) if restart: _restart_process() return { "status": "extracted", "target_root": str(extract_root), "restart_scheduled": restart, } except Exception as exc: logger.exception("Archive upload failed") return {"status": "error", "detail": str(exc)} else: @app.post("/system/sync") async def sync_codebase_unavailable(): return {"status": "error", "detail": "python-multipart is required for /system/sync"} @app.post("/system/archive/upload") async def system_archive_upload_unavailable(): return {"status": "error", "detail": "python-multipart is required for /system/archive/upload"} @app.post("/system/restart") async def system_restart(req: RestartRequest | None = None): delay_sec = req.delay_sec if req else 1.0 _restart_process(delay_sec=delay_sec) return { "status": "restarting", "delay_sec": delay_sec, "target_root": _sync_target_root(), } def _health_payload(check_executor: bool = False, executor_url: str | None = None) -> dict[str, Any]: cfg = load_config() base_exec_url = (executor_url or os.getenv("EXECUTOR_URL", "")).strip().rstrip("/") exec_ok = False exec_checked = False exec_error = None health_timeout = int(cfg.get("REQUEST_TIMEOUT_SEC", 20) or 20) health_retries = max(1, int(cfg.get("REQUEST_RETRIES", 2) or 2)) if base_exec_url and check_executor: exec_checked = True for _ in range(health_retries): try: response = requests.get( f"{base_exec_url}/health", headers=get_executor_headers(cfg), timeout=health_timeout, ) exec_ok = response.status_code == 200 if exec_ok: exec_error = None break exec_error = response.text except Exception as exc: exec_error = str(exc) faiss_path = cfg.get("FAISS_INDEX_PATH") return { "status": "ok", "model_loaded": MODEL is not None, "model_error": MODEL_ERROR, "embedding_loaded": EMBED_MODEL is not None, "faiss_ok": bool(faiss_path and os.path.exists(faiss_path)), "executor_checked": exec_checked, "executor_ok": exec_ok, "executor_error": exec_error, "remote_brain_only": str(os.getenv("REMOTE_BRAIN_ONLY", "")).strip().lower() in {"1", "true", "yes", "on"}, "runtime_root": os.getenv("KAPO_RUNTIME_ROOT", ""), "sync_root": _sync_target_root(), "timestamp": time.time(), } def _decode_b64_text(value: str) -> str: raw = str(value or "").strip() if not raw: return "" padding = "=" * (-len(raw) % 4) try: return base64.urlsafe_b64decode((raw + padding).encode("utf-8")).decode("utf-8").strip() except Exception: return "" def _runtime_modernization_snapshot() -> dict[str, Any]: bootstrap = DRIVE_STATE.ensure_bootstrap_loaded(force=False) or {} patch_manifest_url = str( os.getenv("KAPO_PATCH_MANIFEST_URL", "") or bootstrap.get("patch_manifest_url") or "" ).strip() remote_env_url = _decode_b64_text(os.getenv("KAPO_REMOTE_ENV_URL_B64", "")) or str(os.getenv("KAPO_REMOTE_ENV_URL", "") or "").strip() public_url = str(os.getenv("BRAIN_PUBLIC_URL", "") or LAST_BRAIN_URL_REPORT.get("url") or _load_saved_public_url() or "").strip() applied_version = _load_applied_runtime_version() control_plane_url = str(os.getenv("KAPO_CONTROL_PLANE_URL", "") or "").strip() queue_name = str(os.getenv("KAPO_CLOUDFLARE_QUEUE_NAME", "") or "").strip() return { "shared_state_backend": _shared_state_backend(), "firebase_enabled": str(os.getenv("FIREBASE_ENABLED", "0")).strip().lower() in {"1", "true", "yes", "on"}, "drive": { "folder_id": str(os.getenv("GOOGLE_DRIVE_SHARED_STATE_FOLDER_ID", "") or os.getenv("GOOGLE_DRIVE_STORAGE_FOLDER_ID", "") or "").strip(), "prefix": str(os.getenv("GOOGLE_DRIVE_SHARED_STATE_PREFIX", "") or os.getenv("GOOGLE_DRIVE_STORAGE_PREFIX", "") or "").strip(), "bootstrap_loaded": bool(bootstrap), }, "bootstrap": { "configured": bool(str(os.getenv("KAPO_BOOTSTRAP_URL", "") or os.getenv("GOOGLE_DRIVE_BOOTSTRAP_URL", "") or "").strip()), "url": str(os.getenv("KAPO_BOOTSTRAP_URL", "") or os.getenv("GOOGLE_DRIVE_BOOTSTRAP_URL", "") or "").strip(), "loaded": bool(bootstrap), "version": str(bootstrap.get("version") or "").strip(), "updated_at": bootstrap.get("updated_at"), }, "patch": { "configured": bool(patch_manifest_url), "manifest_url": patch_manifest_url, "startup_self_update_enabled": _startup_self_update_enabled(), "applied_version": applied_version, }, "remote_env": { "configured": bool(remote_env_url), "loaded": str(os.getenv("KAPO_REMOTE_ENV_LOADED", "0")).strip().lower() in {"1", "true", "yes", "on"}, "url": remote_env_url, }, "control_plane": { "configured": bool(control_plane_url), "url": control_plane_url, "queue_name": queue_name, "queue_configured": bool(queue_name), }, "transport": { "provider": str(os.getenv("BRAIN_TUNNEL_PROVIDER", "ngrok") or "ngrok").strip().lower(), "auto_ngrok": _ngrok_bootstrap_enabled(), "reuse_public_url_on_restart": _reuse_public_url_on_restart(), "public_url": public_url, "executor_url": str(os.getenv("EXECUTOR_URL", "") or "").strip(), }, "runtime": { "root": str(_sync_target_root()), "model_profile_id": str(os.getenv("MODEL_PROFILE_ID", "") or "").strip(), "model_loaded": MODEL is not None, "embed_loaded": EMBED_MODEL is not None, }, } def _persist_runtime_state_snapshot(reason: str = "periodic") -> dict[str, Any]: payload = _health_payload(check_executor=False) public_url = os.getenv("BRAIN_PUBLIC_URL") or _load_saved_public_url() or "" state_id = _brain_state_id() FIREBASE.set_document( "brains", public_url or state_id, { "url": public_url, "health": payload, "status": "healthy" if payload["model_loaded"] else "degraded", "model_repo": os.getenv("MODEL_REPO", DEFAULT_MODEL_REPO), "model_file": os.getenv("MODEL_FILE", DEFAULT_MODEL_FILE), "model_profile_id": os.getenv("MODEL_PROFILE_ID", DEFAULT_MODEL_PROFILE_ID), "roles": os.getenv("BRAIN_ROLES", "supervisor,chat,coding,planner,arabic,fallback"), "languages": os.getenv("BRAIN_LANGUAGES", "ar,en"), "updated_at": time.time(), "source": reason, }, min_interval_sec=5.0, ) FIREBASE.set_document( "brain_runtime", state_id, { "brain_url": public_url, "reason": reason, "health": payload, "recent_logs": list(RUNTIME_LOG_BUFFER)[-25:], "updated_at": time.time(), }, min_interval_sec=5.0, ) return payload def _runtime_state_pulse() -> None: interval = max(20.0, float(os.getenv("BRAIN_STATE_SYNC_INTERVAL_SEC", "60") or 60)) while True: try: _persist_runtime_state_snapshot(reason="background_pulse") except Exception: logger.warning("Background runtime state sync failed", exc_info=True) time.sleep(interval) @app.get("/health") async def health(executor_url: str | None = None, check_executor: bool = False): payload = _health_payload(check_executor=check_executor, executor_url=executor_url) _persist_runtime_state_snapshot(reason="health_endpoint") return payload