Spaces:
Sleeping
Sleeping
| """FastAPI entrypoint for the Brain Server.""" | |
| import base64 | |
| import gc | |
| import logging | |
| import os | |
| import re | |
| import shutil | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import threading | |
| import time | |
| import zipfile | |
| from collections import deque | |
| from pathlib import Path | |
| from typing import Any | |
| import requests | |
| from fastapi import FastAPI, File, UploadFile | |
| from pydantic import BaseModel | |
| from agents.memory_agent import MemoryAgent | |
| from agents.planner_agent import PlannerAgent | |
| from agents.reasoning_agent import ReasoningAgent | |
| try: | |
| from api import deps as deps_module | |
| from api.deps import get_executor_headers, get_logger, load_config | |
| from api.firebase_store import FirebaseStore | |
| from api.routes_analyze import router as analyze_router | |
| from api.routes_execute import router as execute_router | |
| from api.routes_plan import router as plan_router | |
| from shared.google_drive_state import GoogleDriveStateClient | |
| except ImportError: | |
| from . import deps as deps_module | |
| from .deps import get_executor_headers, get_logger, load_config | |
| from .firebase_store import FirebaseStore | |
| from .routes_analyze import router as analyze_router | |
| from .routes_execute import router as execute_router | |
| from .routes_plan import router as plan_router | |
| from shared.google_drive_state import GoogleDriveStateClient | |
| logger = get_logger("kapo.brain.main") | |
| def _configure_windows_utf8() -> None: | |
| if os.name != "nt": | |
| return | |
| os.environ.setdefault("PYTHONUTF8", "1") | |
| os.environ.setdefault("PYTHONIOENCODING", "utf-8") | |
| os.environ.setdefault("PYTHONLEGACYWINDOWSSTDIO", "utf-8") | |
| try: | |
| import ctypes | |
| kernel32 = ctypes.windll.kernel32 | |
| kernel32.SetConsoleCP(65001) | |
| kernel32.SetConsoleOutputCP(65001) | |
| except Exception: | |
| pass | |
| _configure_windows_utf8() | |
| if hasattr(sys.stdout, "reconfigure"): | |
| sys.stdout.reconfigure(encoding="utf-8", errors="replace") | |
| if hasattr(sys.stderr, "reconfigure"): | |
| sys.stderr.reconfigure(encoding="utf-8", errors="replace") | |
| app = FastAPI(title="KAPO-AI Brain Server", version="1.0.0") | |
| app.include_router(plan_router) | |
| app.include_router(execute_router) | |
| app.include_router(analyze_router) | |
| MODEL = None | |
| MODEL_ERROR = None | |
| MODEL_META = {"repo_id": None, "filename": None, "path": None} | |
| EMBED_MODEL = None | |
| FIREBASE = FirebaseStore("brain", logger_name="kapo.brain.firebase") | |
| DRIVE_STATE = GoogleDriveStateClient(logger) | |
| FIREBASE_RUNTIME_CACHE: dict[str, tuple[float, Any]] = {} | |
| RUNTIME_LOG_BUFFER: deque[dict[str, Any]] = deque(maxlen=200) | |
| LAST_BRAIN_URL_REPORT: dict[str, Any] = {"url": "", "ts": 0.0} | |
| RUNTIME_STATE_THREAD_STARTED = False | |
| PUBLIC_URL_RETRY_STARTED = False | |
| DEFAULT_MODEL_REPO = "QuantFactory/aya-expanse-8b-GGUF" | |
| DEFAULT_MODEL_FILE = "aya-expanse-8b.Q4_K_M.gguf" | |
| DEFAULT_MODEL_PROFILE_ID = "supervisor-ar-en-default" | |
| HAS_MULTIPART = True | |
| try: | |
| import multipart # noqa: F401 | |
| except Exception: | |
| HAS_MULTIPART = False | |
| class RuntimeLogHandler(logging.Handler): | |
| def emit(self, record) -> None: | |
| try: | |
| RUNTIME_LOG_BUFFER.append( | |
| { | |
| "ts": time.time(), | |
| "level": record.levelname, | |
| "name": record.name, | |
| "message": record.getMessage(), | |
| } | |
| ) | |
| except Exception: | |
| pass | |
| _runtime_log_handler = RuntimeLogHandler(level=logging.WARNING) | |
| if not any(isinstance(handler, RuntimeLogHandler) for handler in logger.handlers): | |
| logger.addHandler(_runtime_log_handler) | |
| def _feature_enabled(name: str, default: bool = False) -> bool: | |
| value = os.getenv(name) | |
| if value is None or str(value).strip() == "": | |
| return default | |
| return str(value).strip().lower() in {"1", "true", "yes", "on"} | |
| def _hf_transformers_runtime_enabled() -> bool: | |
| return _feature_enabled("KAPO_HF_TRANSFORMERS_RUNTIME", default=False) | |
| def _embeddings_enabled() -> bool: | |
| return not _feature_enabled("KAPO_DISABLE_EMBEDDINGS", default=False) | |
| def _remote_brain_only() -> bool: | |
| return _feature_enabled("REMOTE_BRAIN_ONLY", default=False) | |
| def _ngrok_bootstrap_enabled() -> bool: | |
| return _feature_enabled("BRAIN_AUTO_NGROK", default=True) | |
| def _configured_public_url() -> str: | |
| return str(os.getenv("BRAIN_PUBLIC_URL", "")).strip().rstrip("/") | |
| def _prefer_configured_public_url() -> bool: | |
| provider = str(os.getenv("BRAIN_PROVIDER", "") or os.getenv("BRAIN_TEMPLATE", "")).strip().lower() | |
| if "huggingface" in provider or "hf-space" in provider: | |
| return True | |
| return _feature_enabled("BRAIN_FORCE_CONFIGURED_PUBLIC_URL", default=False) | |
| def _reuse_public_url_on_restart() -> bool: | |
| return _feature_enabled("BRAIN_REUSE_PUBLIC_URL_ON_RESTART", default=True) | |
| def _auto_publish_public_url_on_startup() -> bool: | |
| return _feature_enabled("BRAIN_AUTO_PUBLISH_URL_ON_STARTUP", default=True) | |
| def _internal_restart_in_progress() -> bool: | |
| return _feature_enabled("KAPO_INTERNAL_RESTART", default=False) | |
| def _fast_restart_enabled() -> bool: | |
| return _feature_enabled("KAPO_FAST_INTERNAL_RESTART", default=True) | |
| def _executor_connect_timeout() -> float: | |
| return float(os.getenv("EXECUTOR_CONNECT_TIMEOUT_SEC", "3.0") or 3.0) | |
| def _executor_read_timeout(name: str, default: float) -> float: | |
| return float(os.getenv(name, str(default)) or default) | |
| def _executor_roundtrip_allowed(feature_name: str, default: bool = True) -> bool: | |
| executor_url = os.getenv("EXECUTOR_URL", "").strip() | |
| if not executor_url: | |
| return False | |
| kaggle_defaults = { | |
| "BRAIN_REMOTE_TRACE_STORE_ENABLED": False, | |
| "BRAIN_REMOTE_AUTO_INGEST_ENABLED": False, | |
| "BRAIN_REMOTE_STYLE_PROFILE_ENABLED": False, | |
| } | |
| effective_default = kaggle_defaults.get(feature_name, default) if _is_kaggle_runtime() else default | |
| return _feature_enabled(feature_name, default=effective_default) | |
| def _remote_runtime_reads_enabled() -> bool: | |
| return _feature_enabled("KAPO_REMOTE_STATE_READS", default=False) | |
| def _shared_state_backend() -> str: | |
| return str(os.getenv("KAPO_SHARED_STATE_BACKEND", "")).strip().lower() | |
| def _drive_bootstrap_configured() -> bool: | |
| return bool( | |
| str(os.getenv("GOOGLE_DRIVE_BOOTSTRAP_URL", "") or os.getenv("KAPO_BOOTSTRAP_URL", "") or "").strip() | |
| ) | |
| def _bootstrap_shared_state() -> None: | |
| if _drive_bootstrap_configured() or _shared_state_backend() in {"google_drive", "drive", "gdrive"}: | |
| payload = DRIVE_STATE.ensure_bootstrap_loaded(force=False) or {} | |
| fallback_mappings = { | |
| "executor_url": "EXECUTOR_URL", | |
| "control_plane_url": "KAPO_CONTROL_PLANE_URL", | |
| "cloudflare_control_plane_url": "KAPO_CONTROL_PLANE_URL", | |
| "cloudflare_queue_name": "KAPO_CLOUDFLARE_QUEUE_NAME", | |
| } | |
| for key, env_name in fallback_mappings.items(): | |
| value = payload.get(key) | |
| if value not in (None, ""): | |
| os.environ[env_name] = str(value) | |
| def _startup_self_update_enabled() -> bool: | |
| return _feature_enabled("KAPO_STARTUP_SELF_UPDATE", default=True) | |
| def _should_report_brain_url(public_url: str) -> bool: | |
| normalized = str(public_url or "").strip().rstrip("/") | |
| if not normalized: | |
| return False | |
| interval_sec = max(30.0, float(os.getenv("BRAIN_REPORT_MIN_INTERVAL_SEC", "600") or 600)) | |
| previous_url = str(LAST_BRAIN_URL_REPORT.get("url") or "").strip() | |
| previous_ts = float(LAST_BRAIN_URL_REPORT.get("ts") or 0.0) | |
| now = time.time() | |
| if normalized != previous_url or (now - previous_ts) >= interval_sec: | |
| LAST_BRAIN_URL_REPORT["url"] = normalized | |
| LAST_BRAIN_URL_REPORT["ts"] = now | |
| return True | |
| return False | |
| def _download_model(repo_id: str, filename: str, hf_token: str | None = None) -> str: | |
| from huggingface_hub import hf_hub_download | |
| configured_cache = str(os.getenv("MODEL_CACHE_DIR", "") or "").strip() | |
| if configured_cache: | |
| cache_dir = configured_cache | |
| elif _is_kaggle_runtime(): | |
| cache_dir = str((_project_root() / "models_cache").resolve()) | |
| else: | |
| cache_dir = os.path.join(tempfile.gettempdir(), "kapo_models") | |
| os.makedirs(cache_dir, exist_ok=True) | |
| return hf_hub_download(repo_id=repo_id, filename=filename, cache_dir=cache_dir, token=hf_token) | |
| def ensure_model_loaded(repo_id: str, filename: str, hf_token: str | None = None) -> None: | |
| global MODEL, MODEL_ERROR, MODEL_META | |
| repo_id = (repo_id or "").strip() | |
| filename = (filename or "").strip() | |
| if not repo_id or not filename: | |
| MODEL = None | |
| MODEL_ERROR = "model repo/file missing" | |
| return | |
| try: | |
| model_path = _download_model(repo_id, filename, hf_token=hf_token) | |
| except Exception as exc: | |
| MODEL = None | |
| MODEL_ERROR = f"model download failed: {exc}" | |
| logger.exception("Model download failed") | |
| return | |
| try: | |
| from llama_cpp import Llama | |
| MODEL = Llama(model_path=model_path, n_ctx=4096) | |
| MODEL_ERROR = None | |
| MODEL_META = {"repo_id": repo_id, "filename": filename, "path": model_path} | |
| logger.info("Loaded model %s/%s", repo_id, filename) | |
| except Exception as exc: | |
| MODEL = None | |
| MODEL_ERROR = f"model load failed: {exc}" | |
| logger.exception("Model load failed") | |
| def _load_embed_model() -> None: | |
| global EMBED_MODEL | |
| if not _embeddings_enabled(): | |
| logger.info("Embedding model disabled by configuration") | |
| return | |
| if EMBED_MODEL is not None: | |
| return | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| except ModuleNotFoundError: | |
| logger.info("Skipping embedding model load because sentence_transformers is unavailable") | |
| return | |
| model_name = os.getenv("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2") | |
| EMBED_MODEL = SentenceTransformer(model_name) | |
| logger.info("Loaded embedding model %s", model_name) | |
| def _load_default_model() -> None: | |
| global MODEL, MODEL_ERROR, MODEL_META | |
| repo_id = str(os.getenv("MODEL_REPO", DEFAULT_MODEL_REPO) or DEFAULT_MODEL_REPO).strip() | |
| filename = str(os.getenv("MODEL_FILE", "") or "").strip() | |
| provider = str(os.getenv("BRAIN_PROVIDER", "") or os.getenv("BRAIN_TEMPLATE", "") or "").strip().lower() | |
| if _hf_transformers_runtime_enabled(): | |
| MODEL = None | |
| MODEL_ERROR = None | |
| MODEL_META = {"repo_id": repo_id, "filename": "", "path": None} | |
| logger.info("Skipping local model load; HF transformers runtime is enabled for %s", repo_id) | |
| return | |
| if not filename and ("huggingface" in provider or "hf-space" in provider): | |
| MODEL = None | |
| MODEL_ERROR = None | |
| MODEL_META = {"repo_id": repo_id, "filename": "", "path": None} | |
| logger.info("Skipping local model load; using Hugging Face remote generation fallback for %s", repo_id) | |
| return | |
| ensure_model_loaded(repo_id, filename or DEFAULT_MODEL_FILE, hf_token=os.getenv("HF_TOKEN")) | |
| def _brain_headers() -> dict: | |
| cfg = load_config() | |
| return get_executor_headers(cfg) | |
| def _project_root() -> Path: | |
| return Path(__file__).resolve().parents[1] | |
| def _is_kaggle_runtime() -> bool: | |
| return "/kaggle/" in str(_project_root()).replace("\\", "/") or bool(os.getenv("KAGGLE_KERNEL_RUN_TYPE")) | |
| def _apply_executor_settings(settings: dict[str, Any]) -> None: | |
| for key in ( | |
| "NGROK_AUTHTOKEN", | |
| "MODEL_REPO", | |
| "MODEL_FILE", | |
| "MODEL_PROFILE_ID", | |
| "SUPERVISOR_MODEL_PROFILE_ID", | |
| "EMBED_MODEL", | |
| "REQUEST_TIMEOUT_SEC", | |
| "REQUEST_RETRIES", | |
| "CHAT_TIMEOUT_SEC", | |
| "EXECUTOR_BYPASS_HEADER", | |
| "EXECUTOR_BYPASS_VALUE", | |
| "BRAIN_BYPASS_HEADER", | |
| "BRAIN_BYPASS_VALUE", | |
| "REMOTE_BRAIN_ONLY", | |
| "KAGGLE_AUTO_BOOTSTRAP", | |
| "BRAIN_AUTO_NGROK", | |
| "BRAIN_AUTO_PUBLISH_URL_ON_STARTUP", | |
| "BRAIN_PUBLIC_URL", | |
| "BRAIN_REUSE_PUBLIC_URL_ON_RESTART", | |
| "KAGGLE_SYNC_SUBDIR", | |
| "BRAIN_ROLES", | |
| "BRAIN_LANGUAGES", | |
| "BRAIN_REMOTE_KNOWLEDGE_ENABLED", | |
| "BRAIN_REMOTE_WEB_SEARCH_ENABLED", | |
| "BRAIN_REMOTE_TRACE_STORE_ENABLED", | |
| "BRAIN_REMOTE_AUTO_INGEST_ENABLED", | |
| "BRAIN_LOCAL_RAG_FALLBACK_ENABLED", | |
| "EXECUTOR_CONNECT_TIMEOUT_SEC", | |
| "BRAIN_REMOTE_KNOWLEDGE_TIMEOUT_SEC", | |
| "BRAIN_REMOTE_WEB_SEARCH_TIMEOUT_SEC", | |
| "BRAIN_REMOTE_TRACE_STORE_TIMEOUT_SEC", | |
| "BRAIN_REMOTE_AUTO_INGEST_TIMEOUT_SEC", | |
| "FIREBASE_ENABLED", | |
| "FIREBASE_PROJECT_ID", | |
| "FIREBASE_SERVICE_ACCOUNT_PATH", | |
| "FIREBASE_SERVICE_ACCOUNT_JSON", | |
| "FIREBASE_NAMESPACE", | |
| "KAPO_SHARED_STATE_BACKEND", | |
| "GOOGLE_DRIVE_SHARED_STATE_FOLDER_ID", | |
| "GOOGLE_DRIVE_SHARED_STATE_PREFIX", | |
| "GOOGLE_DRIVE_BOOTSTRAP_URL", | |
| "KAPO_BOOTSTRAP_URL", | |
| "GOOGLE_DRIVE_ACCESS_TOKEN", | |
| "GOOGLE_DRIVE_REFRESH_TOKEN", | |
| "GOOGLE_DRIVE_CLIENT_SECRET_JSON", | |
| "GOOGLE_DRIVE_CLIENT_SECRET_JSON_BASE64", | |
| "GOOGLE_DRIVE_CLIENT_SECRET_PATH", | |
| "GOOGLE_DRIVE_TOKEN_EXPIRES_AT", | |
| "KAPO_CONTROL_PLANE_URL", | |
| "KAPO_CLOUDFLARE_QUEUE_NAME", | |
| ): | |
| value = settings.get(key) | |
| if value not in (None, ""): | |
| os.environ[key] = str(value) | |
| deps_module.CONFIG_CACHE = None | |
| def _apply_firebase_runtime_settings() -> None: | |
| if not _remote_runtime_reads_enabled(): | |
| return | |
| if not FIREBASE.enabled(): | |
| return | |
| shared = FIREBASE.get_document("settings", "global") | |
| runtime = FIREBASE.get_document("runtime", "executor") | |
| role_items = FIREBASE.list_documents("roles", limit=64) | |
| merged = {} | |
| merged.update(shared or {}) | |
| merged.update(runtime or {}) | |
| mappings = { | |
| "executor_public_url": "EXECUTOR_PUBLIC_URL", | |
| "executor_url": "EXECUTOR_URL", | |
| "model_repo": "MODEL_REPO", | |
| "model_file": "MODEL_FILE", | |
| "model_profile_id": "MODEL_PROFILE_ID", | |
| "supervisor_model_profile_id": "SUPERVISOR_MODEL_PROFILE_ID", | |
| "brain_roles": "BRAIN_ROLES", | |
| "brain_languages": "BRAIN_LANGUAGES", | |
| } | |
| current_brain_url = str(merged.get("current_brain_url") or "").strip() | |
| if current_brain_url: | |
| os.environ["KAPO_EXECUTOR_CURRENT_BRAIN_URL"] = current_brain_url | |
| for key, env_name in mappings.items(): | |
| value = merged.get(key) | |
| if value not in (None, ""): | |
| os.environ[env_name] = str(value) | |
| if role_items: | |
| enabled_roles = [ | |
| str(item.get("name") or item.get("id") or "").strip().lower() | |
| for item in role_items | |
| if str(item.get("enabled", True)).strip().lower() not in {"0", "false", "no", "off"} | |
| ] | |
| enabled_roles = [role for role in enabled_roles if role] | |
| if enabled_roles: | |
| os.environ["BRAIN_ROLES"] = ",".join(dict.fromkeys(enabled_roles)) | |
| deps_module.CONFIG_CACHE = None | |
| def _firebase_collection_cache_key(name: str) -> str: | |
| return f"collection:{name}" | |
| def _firebase_list_documents_cached(collection: str, ttl_sec: float = 30.0, limit: int = 200) -> list[dict[str, Any]]: | |
| if not _remote_runtime_reads_enabled(): | |
| return [] | |
| if not FIREBASE.enabled(): | |
| return [] | |
| key = _firebase_collection_cache_key(collection) | |
| now = time.time() | |
| cached = FIREBASE_RUNTIME_CACHE.get(key) | |
| if cached and (now - cached[0]) < ttl_sec: | |
| return list(cached[1] or []) | |
| items = FIREBASE.list_documents(collection, limit=limit) | |
| FIREBASE_RUNTIME_CACHE[key] = (now, items) | |
| return list(items or []) | |
| def _firebase_role_profiles() -> list[dict[str, Any]]: | |
| items = _firebase_list_documents_cached("roles", ttl_sec=30.0, limit=64) | |
| if items: | |
| return items | |
| roles = [part.strip() for part in str(os.getenv("BRAIN_ROLES", "")).split(",") if part.strip()] | |
| return [{"name": role, "enabled": True} for role in roles] | |
| def _firebase_runtime_snapshot() -> dict[str, Any]: | |
| return { | |
| "platforms": _firebase_list_documents_cached("platforms", ttl_sec=45.0, limit=64), | |
| "models": _firebase_list_documents_cached("models", ttl_sec=45.0, limit=128), | |
| "prompts": _firebase_list_documents_cached("prompts", ttl_sec=20.0, limit=128), | |
| "roles": _firebase_role_profiles(), | |
| } | |
| def _json_safe(value: Any) -> Any: | |
| if isinstance(value, dict): | |
| return {str(key): _json_safe(item) for key, item in value.items()} | |
| if isinstance(value, list): | |
| return [_json_safe(item) for item in value] | |
| if isinstance(value, tuple): | |
| return [_json_safe(item) for item in value] | |
| if isinstance(value, (str, int, float, bool)) or value is None: | |
| return value | |
| return str(value) | |
| def _firebase_prompt_body(role_name: str, language: str = "en") -> str: | |
| role_name = str(role_name or "").strip().lower() | |
| language = str(language or "en").strip().lower() | |
| if not role_name: | |
| return "" | |
| prompts = _firebase_runtime_snapshot().get("prompts", []) | |
| exact = [] | |
| fallback = [] | |
| for item in prompts: | |
| if str(item.get("role_name") or "").strip().lower() != role_name: | |
| continue | |
| if str(item.get("enabled", True)).strip().lower() in {"0", "false", "no", "off"}: | |
| continue | |
| item_lang = str(item.get("language") or "en").strip().lower() | |
| body = str(item.get("body") or "").strip() | |
| if not body: | |
| continue | |
| if item_lang == language: | |
| exact.append(body) | |
| elif item_lang == "en": | |
| fallback.append(body) | |
| if exact: | |
| return exact[0] | |
| if fallback: | |
| return fallback[0] | |
| return "" | |
| def _prepare_runtime_environment() -> None: | |
| try: | |
| _bootstrap_shared_state() | |
| except Exception: | |
| logger.warning("Shared-state bootstrap preload failed", exc_info=True) | |
| if not _is_kaggle_runtime(): | |
| return | |
| source_root = _project_root() | |
| source_text = str(source_root).replace("\\", "/") | |
| runtime_root_env = os.getenv("KAPO_RUNTIME_ROOT", "").strip() | |
| if runtime_root_env: | |
| runtime_root = Path(runtime_root_env).resolve() | |
| elif source_text.startswith("/kaggle/working/"): | |
| runtime_root = source_root.resolve() | |
| else: | |
| runtime_root = Path("/kaggle/working/KAPO-AI-SYSTEM").resolve() | |
| sync_root = runtime_root | |
| auto_bootstrap = str(os.getenv("KAGGLE_AUTO_BOOTSTRAP", "1")).strip().lower() in {"1", "true", "yes", "on"} | |
| if auto_bootstrap and source_text.startswith("/kaggle/input/") and source_root != runtime_root: | |
| if runtime_root.exists(): | |
| shutil.rmtree(runtime_root, ignore_errors=True) | |
| shutil.copytree( | |
| source_root, | |
| runtime_root, | |
| ignore=shutil.ignore_patterns("__pycache__", "*.pyc", ".git", ".venv"), | |
| ) | |
| if str(runtime_root) not in sys.path: | |
| sys.path.insert(0, str(runtime_root)) | |
| data_dir = runtime_root / "data" / "local" / "brain_runtime" | |
| data_dir.mkdir(parents=True, exist_ok=True) | |
| os.environ["KAPO_RUNTIME_ROOT"] = str(runtime_root) | |
| os.environ["KAPO_SYNC_ROOT"] = str(sync_root) | |
| os.environ["LOCAL_DATA_DIR"] = str(data_dir) | |
| os.environ["DB_PATH"] = str(data_dir / "episodic.db") | |
| os.environ["TOOLS_DB_PATH"] = str(data_dir / "tools.db") | |
| os.environ["FAISS_INDEX_PATH"] = str(data_dir / "faiss.index") | |
| os.environ["REMOTE_BRAIN_ONLY"] = str(os.getenv("REMOTE_BRAIN_ONLY", "1") or "1") | |
| saved_executor_url = _load_saved_executor_url() | |
| current_executor_url = str(os.getenv("EXECUTOR_URL", "") or "").strip() | |
| if saved_executor_url and not current_executor_url: | |
| os.environ["EXECUTOR_URL"] = saved_executor_url | |
| deps_module.CONFIG_CACHE = None | |
| def _sync_target_root() -> str: | |
| return os.getenv("KAPO_SYNC_ROOT") or os.getenv("KAPO_RUNTIME_ROOT") or os.getcwd() | |
| def _sync_root_path() -> Path: | |
| return Path(_sync_target_root()).resolve() | |
| def _resolve_sync_path(user_path: str | None = None) -> Path: | |
| root = _sync_root_path() | |
| relative = str(user_path or "").strip().replace("\\", "/").lstrip("/") | |
| candidate = (root / relative).resolve() if relative else root | |
| if candidate != root and root not in candidate.parents: | |
| raise ValueError("Path escapes sync root") | |
| return candidate | |
| def _describe_sync_entry(path: Path) -> dict[str, Any]: | |
| stat = path.stat() | |
| return { | |
| "name": path.name or str(path), | |
| "path": str(path.relative_to(_sync_root_path())).replace("\\", "/") if path != _sync_root_path() else "", | |
| "is_dir": path.is_dir(), | |
| "size": stat.st_size, | |
| "modified_at": stat.st_mtime, | |
| } | |
| def _public_url_state_path() -> Path: | |
| runtime_root = Path(_sync_target_root()).resolve() | |
| state_dir = runtime_root / "data" / "local" / "brain_runtime" | |
| state_dir.mkdir(parents=True, exist_ok=True) | |
| return state_dir / "public_url.txt" | |
| def _remember_public_url(public_url: str) -> None: | |
| value = str(public_url or "").strip().rstrip("/") | |
| if not value: | |
| return | |
| os.environ["BRAIN_PUBLIC_URL"] = value | |
| try: | |
| _public_url_state_path().write_text(value, encoding="utf-8") | |
| except Exception: | |
| logger.warning("Failed to persist public URL", exc_info=True) | |
| def _load_saved_public_url() -> str: | |
| try: | |
| value = _public_url_state_path().read_text(encoding="utf-8").strip().rstrip("/") | |
| return value | |
| except Exception: | |
| return "" | |
| def _ngrok_api_state_path() -> Path: | |
| runtime_root = Path(_sync_target_root()).resolve() | |
| state_dir = runtime_root / "data" / "local" / "brain_runtime" | |
| state_dir.mkdir(parents=True, exist_ok=True) | |
| return state_dir / "ngrok_api_url.txt" | |
| def _executor_url_state_path() -> Path: | |
| runtime_root = Path(_sync_target_root()).resolve() | |
| state_dir = runtime_root / "data" / "local" / "brain_runtime" | |
| state_dir.mkdir(parents=True, exist_ok=True) | |
| return state_dir / "executor_url.txt" | |
| def _remember_executor_url(executor_url: str) -> None: | |
| value = str(executor_url or "").strip().rstrip("/") | |
| if not value: | |
| return | |
| os.environ["EXECUTOR_URL"] = value | |
| try: | |
| _executor_url_state_path().write_text(value, encoding="utf-8") | |
| except Exception: | |
| logger.warning("Failed to persist executor URL", exc_info=True) | |
| def _load_saved_executor_url() -> str: | |
| configured = str(os.getenv("EXECUTOR_URL", "")).strip().rstrip("/") | |
| if configured: | |
| return configured | |
| try: | |
| return _executor_url_state_path().read_text(encoding="utf-8").strip().rstrip("/") | |
| except Exception: | |
| return "" | |
| def _brain_state_id() -> str: | |
| public_url = str(os.getenv("BRAIN_PUBLIC_URL") or _load_saved_public_url() or "").strip().rstrip("/") | |
| if public_url: | |
| return re.sub(r"[^A-Za-z0-9._-]+", "_", public_url) | |
| runtime_root = str(os.getenv("KAPO_RUNTIME_ROOT") or _sync_target_root() or "brain_runtime").strip() | |
| return re.sub(r"[^A-Za-z0-9._-]+", "_", runtime_root) | |
| def _applied_runtime_version_path() -> Path: | |
| runtime_root = Path(_sync_target_root()).resolve() | |
| state_dir = runtime_root / "data" / "local" / "brain_runtime" | |
| state_dir.mkdir(parents=True, exist_ok=True) | |
| return state_dir / "applied_version.json" | |
| def _load_applied_runtime_version() -> dict[str, Any]: | |
| try: | |
| return dict(json.loads(_applied_runtime_version_path().read_text(encoding="utf-8")) or {}) | |
| except Exception: | |
| return {} | |
| def _write_applied_runtime_version(payload: dict[str, Any]) -> None: | |
| _applied_runtime_version_path().write_text( | |
| json.dumps(dict(payload or {}), ensure_ascii=False, indent=2, sort_keys=True), | |
| encoding="utf-8", | |
| ) | |
| def _download_remote_zip(url: str, destination: Path) -> Path: | |
| response = requests.get(str(url).strip(), timeout=120) | |
| response.raise_for_status() | |
| destination.parent.mkdir(parents=True, exist_ok=True) | |
| destination.write_bytes(response.content) | |
| return destination | |
| def _apply_zip_overlay(zip_path: Path, target_root: Path) -> None: | |
| with zipfile.ZipFile(zip_path, "r") as archive: | |
| archive.extractall(target_root) | |
| def _load_patch_manifest_from_bootstrap() -> dict[str, Any]: | |
| bootstrap = DRIVE_STATE.ensure_bootstrap_loaded(force=True) or {} | |
| manifest_url = str(bootstrap.get("patch_manifest_url") or "").strip() | |
| if not manifest_url: | |
| return dict(bootstrap) | |
| try: | |
| response = requests.get(manifest_url, timeout=30) | |
| response.raise_for_status() | |
| payload = dict(response.json() or {}) | |
| merged = {**bootstrap, **payload} | |
| return merged | |
| except Exception: | |
| logger.warning("Failed to load patch manifest from bootstrap URL", exc_info=True) | |
| return dict(bootstrap) | |
| def _run_startup_self_update() -> None: | |
| if not _startup_self_update_enabled(): | |
| return | |
| manifest = _load_patch_manifest_from_bootstrap() | |
| target_version = str(manifest.get("version") or "").strip() | |
| target_hash = str(manifest.get("build_hash") or "").strip() | |
| if not target_version and not target_hash: | |
| return | |
| current = _load_applied_runtime_version() | |
| if ( | |
| str(current.get("version") or "").strip() == target_version | |
| and str(current.get("build_hash") or "").strip() == target_hash | |
| and target_version | |
| ): | |
| return | |
| runtime_root = Path(_sync_target_root()).resolve() | |
| temp_dir = runtime_root / "data" / "local" / "brain_runtime" / "updates" | |
| patch_url = str(manifest.get("patch_bundle_url") or "").strip() | |
| full_url = str(manifest.get("full_package_url") or "").strip() | |
| applied_mode = "" | |
| source_url = "" | |
| try: | |
| if patch_url: | |
| zip_path = _download_remote_zip(patch_url, temp_dir / "patch_bundle.zip") | |
| _apply_zip_overlay(zip_path, runtime_root) | |
| applied_mode = "patch_bundle" | |
| source_url = patch_url | |
| elif full_url: | |
| zip_path = _download_remote_zip(full_url, temp_dir / "full_package.zip") | |
| _apply_zip_overlay(zip_path, runtime_root) | |
| applied_mode = "full_package" | |
| source_url = full_url | |
| else: | |
| return | |
| _write_applied_runtime_version( | |
| { | |
| "version": target_version, | |
| "build_hash": target_hash, | |
| "applied_at": time.time(), | |
| "mode": applied_mode, | |
| "source_url": source_url, | |
| } | |
| ) | |
| logger.info("Applied startup self-update (%s) version=%s", applied_mode, target_version or target_hash) | |
| except Exception: | |
| logger.warning("Startup self-update failed", exc_info=True) | |
| def _remember_ngrok_api_url(api_url: str) -> None: | |
| value = str(api_url or "").strip().rstrip("/") | |
| if not value: | |
| return | |
| os.environ["KAPO_NGROK_API_URL"] = value | |
| try: | |
| _ngrok_api_state_path().write_text(value, encoding="utf-8") | |
| except Exception: | |
| logger.warning("Failed to persist ngrok API URL", exc_info=True) | |
| def _load_saved_ngrok_api_url() -> str: | |
| configured = str(os.getenv("KAPO_NGROK_API_URL", "")).strip().rstrip("/") | |
| if configured: | |
| return configured | |
| try: | |
| return _ngrok_api_state_path().read_text(encoding="utf-8").strip().rstrip("/") | |
| except Exception: | |
| return "" | |
| def _ngrok_api_candidates() -> list[str]: | |
| seen: set[str] = set() | |
| candidates: list[str] = [] | |
| for candidate in [_load_saved_ngrok_api_url(), "http://127.0.0.1:4040", "http://127.0.0.1:4041", "http://127.0.0.1:4042"]: | |
| value = str(candidate or "").strip().rstrip("/") | |
| if value and value not in seen: | |
| seen.add(value) | |
| candidates.append(value) | |
| return candidates | |
| def _probe_ngrok_api(api_url: str) -> bool: | |
| try: | |
| response = requests.get(f"{api_url}/api/tunnels", timeout=2) | |
| return response.status_code == 200 | |
| except Exception: | |
| return False | |
| def _find_live_ngrok_api() -> str | None: | |
| for api_url in _ngrok_api_candidates(): | |
| if _probe_ngrok_api(api_url): | |
| _remember_ngrok_api_url(api_url) | |
| return api_url | |
| return None | |
| def _ngrok_binary_path() -> str: | |
| env_path = str(os.getenv("NGROK_PATH", "")).strip() | |
| if env_path and Path(env_path).exists(): | |
| return env_path | |
| default_ngrok_path = "" | |
| try: | |
| from pyngrok import conf | |
| default_ngrok_path = str(conf.get_default().ngrok_path or "").strip() | |
| if default_ngrok_path and Path(default_ngrok_path).exists(): | |
| return default_ngrok_path | |
| except Exception: | |
| pass | |
| try: | |
| from pyngrok import installer | |
| install_target = default_ngrok_path or str((Path.home() / ".ngrok" / "ngrok").resolve()) | |
| Path(install_target).parent.mkdir(parents=True, exist_ok=True) | |
| installer.install_ngrok(install_target) | |
| if Path(install_target).exists(): | |
| return install_target | |
| except Exception: | |
| logger.warning("Failed to auto-install ngrok binary", exc_info=True) | |
| discovered = shutil.which("ngrok") | |
| if discovered: | |
| return discovered | |
| return "ngrok" | |
| def _ensure_ngrok_auth(token: str) -> None: | |
| ngrok_path = _ngrok_binary_path() | |
| subprocess.run( | |
| [ngrok_path, "config", "add-authtoken", token], | |
| check=False, | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.DEVNULL, | |
| ) | |
| def _start_detached_ngrok_agent(token: str) -> str | None: | |
| if token: | |
| _ensure_ngrok_auth(token) | |
| os.environ["NGROK_AUTHTOKEN"] = token | |
| ngrok_path = _ngrok_binary_path() | |
| popen_kwargs = { | |
| "stdout": subprocess.DEVNULL, | |
| "stderr": subprocess.DEVNULL, | |
| "stdin": subprocess.DEVNULL, | |
| } | |
| if os.name == "nt": | |
| popen_kwargs["creationflags"] = getattr(subprocess, "DETACHED_PROCESS", 0) | getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0) | |
| else: | |
| popen_kwargs["start_new_session"] = True | |
| subprocess.Popen( | |
| [ngrok_path, "start", "--none", "--log=stdout"], | |
| **popen_kwargs, | |
| ) | |
| deadline = time.time() + 12 | |
| while time.time() < deadline: | |
| api_url = _find_live_ngrok_api() | |
| if api_url: | |
| return api_url | |
| time.sleep(0.5) | |
| return None | |
| def _list_ngrok_tunnels(api_url: str) -> list[dict[str, Any]]: | |
| response = requests.get(f"{api_url}/api/tunnels", timeout=5) | |
| response.raise_for_status() | |
| payload = response.json() | |
| tunnels = payload.get("tunnels") | |
| return tunnels if isinstance(tunnels, list) else [] | |
| def _existing_ngrok_public_url(api_url: str, port: int) -> str | None: | |
| for tunnel in _list_ngrok_tunnels(api_url): | |
| public_url = str(tunnel.get("public_url") or "").strip() | |
| config = tunnel.get("config") or {} | |
| addr = str(config.get("addr") or "").strip() | |
| if public_url and addr.endswith(f":{port}"): | |
| return public_url.rstrip("/") | |
| return None | |
| def _create_ngrok_tunnel(api_url: str, port: int) -> str | None: | |
| response = requests.post( | |
| f"{api_url}/api/tunnels", | |
| json={ | |
| "name": f"http-{port}-kapo", | |
| "addr": str(port), | |
| "proto": "http", | |
| }, | |
| timeout=10, | |
| ) | |
| response.raise_for_status() | |
| payload = response.json() | |
| return str(payload.get("public_url") or "").strip().rstrip("/") or None | |
| def _publish_brain_presence(public_url: str, *, source: str = "runtime") -> None: | |
| normalized = str(public_url or "").strip().rstrip("/") | |
| if not normalized: | |
| return | |
| payload = { | |
| "url": normalized, | |
| "status": "healthy", | |
| "source": source, | |
| "platform": "kaggle" if _is_kaggle_runtime() else str(os.getenv("BRAIN_PROVIDER") or "remote"), | |
| "role": os.getenv("BRAIN_PRIMARY_ROLE", "fallback"), | |
| "roles": [part.strip() for part in os.getenv("BRAIN_ROLES", "supervisor,chat,coding,planner,arabic,fallback").split(",") if part.strip()], | |
| "languages": [part.strip() for part in os.getenv("BRAIN_LANGUAGES", "ar,en").split(",") if part.strip()], | |
| "model_profile_id": os.getenv("MODEL_PROFILE_ID") or os.getenv("SUPERVISOR_MODEL_PROFILE_ID") or DEFAULT_MODEL_PROFILE_ID, | |
| "model_repo": MODEL_META.get("repo_id") or os.getenv("MODEL_REPO") or DEFAULT_MODEL_REPO, | |
| "model_file": MODEL_META.get("filename") or os.getenv("MODEL_FILE") or DEFAULT_MODEL_FILE, | |
| "updated_at": time.time(), | |
| } | |
| FIREBASE.set_document("brains", normalized, payload) | |
| FIREBASE.set_document( | |
| "runtime", | |
| "brains_last_report", | |
| { | |
| "brain_url": normalized, | |
| "source": source, | |
| "updated_at": time.time(), | |
| "provider": payload["platform"], | |
| "model_profile_id": payload["model_profile_id"], | |
| }, | |
| ) | |
| FIREBASE.set_document( | |
| "tunnels", | |
| f"brain_{normalized}", | |
| { | |
| "kind": "brain", | |
| "public_url": normalized, | |
| "provider": "ngrok" if "ngrok" in normalized else payload["platform"], | |
| "updated_at": time.time(), | |
| }, | |
| ) | |
| def _report_brain_url(public_url: str) -> None: | |
| _publish_brain_presence(public_url, source="report_attempt") | |
| executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") | |
| if not executor_url: | |
| return | |
| if not _should_report_brain_url(public_url): | |
| return | |
| last_error: Exception | None = None | |
| connect_timeout = max(1.0, float(os.getenv("BRAIN_REPORT_CONNECT_TIMEOUT_SEC", "4.0") or 4.0)) | |
| read_timeout = max(5.0, float(os.getenv("BRAIN_REPORT_READ_TIMEOUT_SEC", "15.0") or 15.0)) | |
| retries = max(1, int(os.getenv("BRAIN_REPORT_RETRIES", "2") or 2)) | |
| for _ in range(retries): | |
| try: | |
| response = requests.post( | |
| f"{executor_url}/brain/report-url", | |
| json={ | |
| "brain_url": public_url, | |
| "platform": "kaggle" if _is_kaggle_runtime() else "remote", | |
| "role": os.getenv("BRAIN_PRIMARY_ROLE", "fallback"), | |
| "roles": [part.strip() for part in os.getenv("BRAIN_ROLES", "supervisor,chat,coding,planner,arabic,fallback").split(",") if part.strip()], | |
| "languages": [part.strip() for part in os.getenv("BRAIN_LANGUAGES", "ar,en").split(",") if part.strip()], | |
| "model_profile_id": os.getenv("MODEL_PROFILE_ID") or os.getenv("SUPERVISOR_MODEL_PROFILE_ID") or DEFAULT_MODEL_PROFILE_ID, | |
| "model_repo": MODEL_META.get("repo_id") or os.getenv("MODEL_REPO") or DEFAULT_MODEL_REPO, | |
| "model_file": MODEL_META.get("filename") or os.getenv("MODEL_FILE") or DEFAULT_MODEL_FILE, | |
| }, | |
| headers=_brain_headers(), | |
| timeout=(connect_timeout, read_timeout), | |
| ) | |
| response.raise_for_status() | |
| _publish_brain_presence(public_url, source="executor_report") | |
| return | |
| except Exception as exc: | |
| last_error = exc | |
| time.sleep(1) | |
| logger.info( | |
| "Brain URL report to executor timed out or failed; continuing (%s)", | |
| last_error, | |
| ) | |
| def _pull_executor_settings() -> dict[str, Any]: | |
| if _shared_state_backend() in {"google_drive", "drive", "gdrive"} or _drive_bootstrap_configured(): | |
| return {} | |
| executor_url = _load_saved_executor_url() | |
| if not executor_url: | |
| return {} | |
| try: | |
| response = requests.get( | |
| f"{executor_url}/share/settings", | |
| headers=_brain_headers(), | |
| timeout=( | |
| max(1.5, float(os.getenv("EXECUTOR_CONNECT_TIMEOUT_SEC", "3.0") or 3.0)), | |
| max(2.0, float(os.getenv("EXECUTOR_SETTINGS_READ_TIMEOUT_SEC", "5.0") or 5.0)), | |
| ), | |
| ) | |
| if response.status_code == 200: | |
| return response.json() | |
| logger.warning("Executor settings request failed: %s", response.text[:400]) | |
| except Exception as exc: | |
| logger.warning("Failed to pull executor settings (%s)", exc) | |
| return {} | |
| def start_ngrok(token: str | None = None) -> str | None: | |
| restart_reuse = str(os.getenv("KAPO_RESTART_REUSE_PUBLIC_URL", "")).strip().lower() in {"1", "true", "yes", "on"} | |
| if restart_reuse: | |
| saved_public_url = _load_saved_public_url() | |
| if saved_public_url: | |
| _remember_public_url(saved_public_url) | |
| _report_brain_url(saved_public_url) | |
| FIREBASE.set_document("brains", saved_public_url, {"url": saved_public_url, "status": "healthy", "source": "restart_reuse"}) | |
| FIREBASE.set_document("tunnels", f"brain_{saved_public_url}", {"kind": "brain", "public_url": saved_public_url, "provider": "ngrok"}) | |
| logger.info("Reusing saved brain public URL after restart: %s", saved_public_url) | |
| os.environ["KAPO_RESTART_REUSE_PUBLIC_URL"] = "0" | |
| return saved_public_url | |
| configured_public_url = _configured_public_url() | |
| if configured_public_url and _prefer_configured_public_url(): | |
| _remember_public_url(configured_public_url) | |
| _report_brain_url(configured_public_url) | |
| FIREBASE.set_document("brains", configured_public_url, {"url": configured_public_url, "status": "healthy", "source": "configured_public_url"}) | |
| FIREBASE.set_document("tunnels", f"brain_{configured_public_url}", {"kind": "brain", "public_url": configured_public_url, "provider": "configured"}) | |
| logger.info("Using configured brain public URL without starting ngrok: %s", configured_public_url) | |
| return configured_public_url | |
| if not _ngrok_bootstrap_enabled(): | |
| logger.info("Skipping ngrok bootstrap because BRAIN_AUTO_NGROK is disabled") | |
| return None | |
| try: | |
| authtoken = str(token or os.getenv("NGROK_AUTHTOKEN") or "").strip() | |
| if not authtoken: | |
| return None | |
| port = int(os.getenv("BRAIN_PORT", "7860")) | |
| api_url = _find_live_ngrok_api() | |
| if not api_url: | |
| api_url = _start_detached_ngrok_agent(authtoken) | |
| if not api_url: | |
| logger.warning("Ngrok agent did not expose a local API URL") | |
| return None | |
| public_url = _existing_ngrok_public_url(api_url, port) | |
| if not public_url: | |
| public_url = _create_ngrok_tunnel(api_url, port) | |
| if not public_url: | |
| return None | |
| match = re.search(r"https://[A-Za-z0-9.-]+", public_url) | |
| if match: | |
| public_url = match.group(0) | |
| _remember_ngrok_api_url(api_url) | |
| _remember_public_url(public_url) | |
| _report_brain_url(public_url) | |
| FIREBASE.set_document("brains", public_url, {"url": public_url, "status": "healthy", "source": "ngrok_bootstrap"}) | |
| FIREBASE.set_document("tunnels", f"brain_{public_url}", {"kind": "brain", "public_url": public_url, "provider": "ngrok", "api_url": api_url}) | |
| return public_url | |
| except Exception: | |
| logger.exception("Ngrok startup failed") | |
| return None | |
| def _report_known_public_url() -> str | None: | |
| public_url = _load_saved_public_url() | |
| if not public_url: | |
| return None | |
| _remember_public_url(public_url) | |
| _report_brain_url(public_url) | |
| FIREBASE.set_document("brains", public_url, {"url": public_url, "status": "healthy", "source": "saved_public_url"}) | |
| logger.info("Reported known brain public URL without starting ngrok: %s", public_url) | |
| return public_url | |
| def _retry_publish_public_url(attempts: int = 8, delay_sec: float = 12.0) -> None: | |
| for attempt in range(max(1, int(attempts))): | |
| try: | |
| public_url = _report_known_public_url() | |
| if not public_url and _auto_publish_public_url_on_startup(): | |
| public_url = start_ngrok(os.getenv("NGROK_AUTHTOKEN") or None) | |
| if public_url: | |
| logger.info("Recovered brain public URL on retry attempt %s: %s", attempt + 1, public_url) | |
| return | |
| except Exception: | |
| logger.warning("Public URL retry attempt %s failed", attempt + 1, exc_info=True) | |
| time.sleep(max(2.0, float(delay_sec))) | |
| logger.warning("Brain public URL retry loop exhausted without a published URL") | |
| def _ensure_public_url_background(start_tunnel: bool = False) -> None: | |
| global PUBLIC_URL_RETRY_STARTED | |
| current = str(os.getenv("BRAIN_PUBLIC_URL") or LAST_BRAIN_URL_REPORT.get("url") or _load_saved_public_url() or "").strip() | |
| if current or PUBLIC_URL_RETRY_STARTED: | |
| return | |
| if not start_tunnel and not _auto_publish_public_url_on_startup(): | |
| return | |
| PUBLIC_URL_RETRY_STARTED = True | |
| threading.Thread( | |
| target=_retry_publish_public_url, | |
| kwargs={"attempts": 8, "delay_sec": 12.0}, | |
| daemon=True, | |
| ).start() | |
| def _bootstrap_executor_handshake(start_tunnel: bool = False) -> None: | |
| executor_url = os.getenv("EXECUTOR_URL", "").strip() | |
| if not executor_url: | |
| if start_tunnel: | |
| public_url = start_ngrok(os.getenv("NGROK_AUTHTOKEN") or None) | |
| if public_url: | |
| logger.info("Brain public URL started locally without executor handshake: %s", public_url) | |
| else: | |
| logger.info("Brain started without publishing a public URL") | |
| _ensure_public_url_background(start_tunnel=True) | |
| return | |
| logger.info("Skipping executor handshake: EXECUTOR_URL not configured") | |
| _ensure_public_url_background(start_tunnel=start_tunnel) | |
| return | |
| settings = _pull_executor_settings() | |
| _apply_executor_settings(settings) | |
| public_url = None | |
| if start_tunnel: | |
| public_url = start_ngrok(os.getenv("NGROK_AUTHTOKEN") or None) | |
| if not public_url: | |
| public_url = _report_known_public_url() | |
| else: | |
| public_url = _report_known_public_url() | |
| if public_url: | |
| logger.info("Brain public URL reported to executor: %s", public_url) | |
| else: | |
| logger.info("Brain started without publishing a public URL") | |
| _ensure_public_url_background(start_tunnel=start_tunnel) | |
| async def startup_event(): | |
| global RUNTIME_STATE_THREAD_STARTED | |
| try: | |
| _bootstrap_shared_state() | |
| except Exception: | |
| logger.exception("Shared-state bootstrap failed") | |
| try: | |
| _prepare_runtime_environment() | |
| except Exception: | |
| logger.exception("Runtime environment bootstrap failed") | |
| try: | |
| _run_startup_self_update() | |
| except Exception: | |
| logger.exception("Startup self-update bootstrap failed") | |
| internal_restart = _internal_restart_in_progress() | |
| fast_restart = internal_restart and _fast_restart_enabled() | |
| if not fast_restart: | |
| try: | |
| settings = _pull_executor_settings() | |
| _apply_executor_settings(settings) | |
| except Exception: | |
| logger.exception("Executor settings bootstrap failed") | |
| try: | |
| _apply_firebase_runtime_settings() | |
| except Exception: | |
| logger.exception("Firebase runtime bootstrap failed") | |
| else: | |
| logger.info("Fast internal restart enabled; skipping executor/Firebase startup bootstrap") | |
| if not fast_restart: | |
| _load_default_model() | |
| try: | |
| if not fast_restart: | |
| _load_embed_model() | |
| except Exception: | |
| logger.exception("Embedding model startup failed") | |
| try: | |
| start_tunnel = _auto_publish_public_url_on_startup() and not internal_restart | |
| _bootstrap_executor_handshake(start_tunnel=start_tunnel) | |
| except Exception: | |
| logger.exception("Executor handshake startup failed") | |
| finally: | |
| os.environ["KAPO_INTERNAL_RESTART"] = "0" | |
| _persist_runtime_state_snapshot(reason="startup") | |
| if not RUNTIME_STATE_THREAD_STARTED: | |
| RUNTIME_STATE_THREAD_STARTED = True | |
| threading.Thread(target=_runtime_state_pulse, daemon=True).start() | |
| class ModelLoadRequest(BaseModel): | |
| repo_id: str | |
| filename: str | |
| hf_token: str | None = None | |
| class ConnectionInit(BaseModel): | |
| executor_url: str | |
| ngrok_token: str | None = None | |
| class PublishUrlRequest(BaseModel): | |
| ngrok_token: str | None = None | |
| public_url: str | None = None | |
| start_tunnel: bool = True | |
| class RestartRequest(BaseModel): | |
| delay_sec: float = 1.0 | |
| class FileWriteRequest(BaseModel): | |
| path: str | |
| content: str = "" | |
| overwrite: bool = True | |
| class FileDeleteRequest(BaseModel): | |
| path: str | |
| recursive: bool = False | |
| class FileMkdirRequest(BaseModel): | |
| path: str | |
| class ChatRequest(BaseModel): | |
| request_id: str | |
| user_input: str | |
| context: dict[str, Any] = {} | |
| history: list[dict[str, str]] = [] | |
| auto_execute: bool = True | |
| def _contains_arabic(text: str) -> bool: | |
| return bool(re.search(r"[\u0600-\u06FF]", text or "")) | |
| def _detect_language(text: str) -> str: | |
| return "ar" if _contains_arabic(text) else "en" | |
| def _is_task_request(text: str) -> bool: | |
| lower = (text or "").strip().lower() | |
| task_words = [ | |
| "build", "fix", "debug", "create project", "generate project", "implement", "refactor", | |
| "run", "execute", "install", "modify", "edit", "update", "write code", "make app", | |
| "Ø§ÙØ´Ø¦", "Ø£ÙØ´Ø¦", "اع٠Ù", "ÙÙØ°", "شغÙ", "Ø§ØµÙØ", "Ø£ØµÙØ", "عدÙÙ", "عدÙ", "ابÙÙ", "ÙÙÙÙ Ù Ø´Ø±ÙØ¹", | |
| ] | |
| return any(word in lower for word in task_words) | |
| def _is_research_request(text: str) -> bool: | |
| lower = (text or "").strip().lower() | |
| research_words = [ | |
| "search", "research", "look up", "find out", "web", "browse", | |
| "Ø§Ø¨ØØ«", "Ø§Ø¨ØØ« عÙ", "Ø¯ÙØ±", "ÙØªØ´", "٠عÙÙÙ Ø© عÙ", "٠عÙÙ٠ات عÙ", | |
| ] | |
| return any(word in lower for word in research_words) | |
| def _is_knowledge_request(text: str, context: dict[str, Any] | None = None) -> bool: | |
| context = context or {} | |
| if bool(context.get("use_executor_knowledge")): | |
| return True | |
| lower = (text or "").strip().lower() | |
| knowledge_words = [ | |
| "remember", "memory", "knowledge", "docs", "documentation", "project structure", "architecture", | |
| "ØªØ°ÙØ±", "Ø§ÙØ°Ø§Ùرة", "اÙÙ Ø¹Ø±ÙØ©", "اÙÙØ«Ø§Ø¦Ù", "Ø§ÙØ¯ÙÙÙ", "بÙÙØ© اÙÙ Ø´Ø±ÙØ¹", "ÙÙÙ٠اÙÙ Ø´Ø±ÙØ¹", "Ù Ø¹Ù Ø§Ø±ÙØ©", | |
| ] | |
| return any(word in lower for word in knowledge_words) | |
| def _prune_history(history: list[dict[str, str]], keep_last: int = 6) -> list[dict[str, str]]: | |
| if len(history) <= keep_last: | |
| return history | |
| return history[-keep_last:] | |
| def _retrieve_knowledge(query: str, top_k: int = 4) -> list[dict[str, Any]]: | |
| executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") | |
| expanded_query = _expand_project_query(query) | |
| if _executor_roundtrip_allowed("BRAIN_REMOTE_KNOWLEDGE_ENABLED", default=True): | |
| try: | |
| response = requests.get( | |
| f"{executor_url}/rag/search", | |
| params={"query": expanded_query, "top_k": top_k}, | |
| headers=_brain_headers(), | |
| timeout=( | |
| _executor_connect_timeout(), | |
| _executor_read_timeout("BRAIN_REMOTE_KNOWLEDGE_TIMEOUT_SEC", 6.0), | |
| ), | |
| ) | |
| if response.status_code == 200: | |
| payload = response.json() | |
| results = payload.get("results", []) | |
| if isinstance(results, list): | |
| return results | |
| except requests.exceptions.ReadTimeout: | |
| logger.info("Executor knowledge retrieval timed out; continuing without remote knowledge") | |
| except Exception: | |
| logger.warning("Executor knowledge retrieval failed", exc_info=True) | |
| if _remote_brain_only() or not _feature_enabled("BRAIN_LOCAL_RAG_FALLBACK_ENABLED", default=False): | |
| return [] | |
| try: | |
| from rag.retriever import retrieve | |
| return retrieve(expanded_query, top_k=top_k) | |
| except Exception: | |
| logger.warning("Knowledge retrieval failed", exc_info=True) | |
| return [] | |
| def _search_web(query: str) -> list[dict[str, Any]]: | |
| executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") | |
| if not _executor_roundtrip_allowed("BRAIN_REMOTE_WEB_SEARCH_ENABLED", default=True): | |
| return [] | |
| try: | |
| response = requests.post( | |
| f"{executor_url}/tools/search", | |
| json={"query": query, "num_results": 5}, | |
| headers=_brain_headers(), | |
| timeout=( | |
| _executor_connect_timeout(), | |
| _executor_read_timeout("BRAIN_REMOTE_WEB_SEARCH_TIMEOUT_SEC", 8.0), | |
| ), | |
| ) | |
| if response.status_code == 200: | |
| payload = response.json() | |
| results = payload.get("results", []) | |
| return results if isinstance(results, list) else [] | |
| except Exception: | |
| logger.warning("Web search failed", exc_info=True) | |
| return [] | |
| def _format_context_blocks(knowledge: list[dict[str, Any]], web_results: list[dict[str, Any]]) -> str: | |
| blocks: list[str] = [] | |
| if knowledge: | |
| lines = [] | |
| for item in knowledge[:4]: | |
| source = item.get("source", "knowledge") | |
| content = item.get("content", "") or item.get("text", "") | |
| lines.append(f"- [{source}] {content[:500]}") | |
| blocks.append("Knowledge:\n" + "\n".join(lines)) | |
| if web_results: | |
| lines = [] | |
| for item in web_results[:5]: | |
| lines.append(f"- {item.get('title', '')}: {item.get('snippet', '')}") | |
| blocks.append("Web:\n" + "\n".join(lines)) | |
| return "\n\n".join(blocks).strip() | |
| def _project_context_tags(text: str) -> list[str]: | |
| source = str(text or "") | |
| lowered = source.lower() | |
| tags: list[str] = [] | |
| tag_rules = [ | |
| ("brain_runtime", ["Ø§ÙØ¹ÙÙ", "brain", "model", "Ù ÙØ¯ÙÙ", "اÙÙÙ ÙØ°Ø¬"]), | |
| ("executor_runtime", ["اÙÙÙÙÙ Ø§ÙØªÙÙÙØ°Ù", "executor", "agent"]), | |
| ("tunnel_runtime", ["اÙÙÙÙ", "tunnel", "ngrok", "cloudflare"]), | |
| ("url_routing", ["Ø§ÙØ±Ø§Ø¨Ø·", "url", "endpoint", "ÙÙÙÙ"]), | |
| ("restart_sync", ["restart", "Ø±ÙØ³ØªØ§Ø±Øª", "إعادة تشغÙÙ", "اعادة تشغÙÙ", "sync", "Ù Ø²Ø§Ù ÙØ©"]), | |
| ("knowledge_memory", ["memory", "Ø°Ø§ÙØ±Ø©", "Ù Ø¹Ø±ÙØ©", "knowledge", "rag", "Ø·Ø¨ÙØ§Øª", "embedding", "embeddings"]), | |
| ("kaggle_runtime", ["kaggle", "ÙØ§Ø¬Ù"]), | |
| ] | |
| for tag, markers in tag_rules: | |
| if any(marker in source or marker in lowered for marker in markers): | |
| tags.append(tag) | |
| return tags | |
| def _expand_project_query(query: str) -> str: | |
| tags = _project_context_tags(query) | |
| additions: list[str] = [] | |
| if "tunnel_runtime" in tags: | |
| additions.append("ngrok tunnel public url reverse proxy runtime restart") | |
| if "restart_sync" in tags: | |
| additions.append("system restart sync uvicorn process reuse public url") | |
| if "executor_runtime" in tags: | |
| additions.append("executor share settings control plane local machine") | |
| if "knowledge_memory" in tags: | |
| additions.append("knowledge layers rag embeddings preferences profile") | |
| if "brain_runtime" in tags: | |
| additions.append("brain server kaggle model startup runtime") | |
| return query if not additions else f"{query}\nContext expansion: {' | '.join(additions)}" | |
| def _fetch_style_profile() -> dict[str, Any]: | |
| firebase_profile = FIREBASE.get_document("profiles", "style", ttl_sec=30.0) | |
| if firebase_profile: | |
| return firebase_profile | |
| executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") | |
| if not executor_url or not _executor_roundtrip_allowed("BRAIN_REMOTE_STYLE_PROFILE_ENABLED", default=False): | |
| return {} | |
| try: | |
| response = requests.get( | |
| f"{executor_url}/preferences/profile", | |
| headers=_brain_headers(), | |
| timeout=( | |
| _executor_connect_timeout(), | |
| _executor_read_timeout("BRAIN_REMOTE_STYLE_PROFILE_TIMEOUT_SEC", 2.5), | |
| ), | |
| ) | |
| if response.status_code != 200: | |
| return {} | |
| payload = response.json().get("profile", {}) | |
| return payload if isinstance(payload, dict) else {} | |
| except requests.exceptions.ReadTimeout: | |
| logger.info("Style profile load timed out; continuing without remote style profile") | |
| return {} | |
| except Exception: | |
| logger.warning("Failed to load style profile", exc_info=True) | |
| return {} | |
| def _render_style_profile_context(profile: dict[str, Any]) -> str: | |
| if not profile: | |
| return "" | |
| preferences = profile.get("preferences", []) or [] | |
| examples = profile.get("examples", []) or [] | |
| lexical_signals = profile.get("lexical_signals", []) or [] | |
| style_markers = profile.get("style_markers", {}) or {} | |
| persona_summary = str(profile.get("persona_summary") or "").strip() | |
| response_contract = str(profile.get("response_contract") or "").strip() | |
| lines: list[str] = [] | |
| if persona_summary: | |
| lines.append(f"User persona summary: {persona_summary}") | |
| if response_contract: | |
| lines.append(f"Response contract: {response_contract}") | |
| if preferences: | |
| lines.append("User Style Preferences:") | |
| for item in preferences[:10]: | |
| lines.append(f"- {item}") | |
| if lexical_signals: | |
| lines.append("User Lexical Signals:") | |
| for item in lexical_signals[:10]: | |
| lines.append(f"- {item}") | |
| if style_markers: | |
| lines.append("Style Markers:") | |
| for key, value in sorted(style_markers.items()): | |
| lines.append(f"- {key}: {value}") | |
| if examples: | |
| lines.append("Recent User Style Examples:") | |
| for sample in examples[-3:]: | |
| user_text = str(sample.get("user_input") or "").strip() | |
| assistant_text = str(sample.get("assistant_reply") or "").strip() | |
| if user_text: | |
| lines.append(f"- User: {user_text}") | |
| if assistant_text: | |
| lines.append(f" Assistant: {assistant_text}") | |
| return "\n".join(lines).strip() | |
| def _project_domain_context(user_input: str, context: dict[str, Any] | None = None) -> str: | |
| tags = _project_context_tags(user_input) | |
| if not tags: | |
| return "" | |
| lines = [ | |
| "Project Domain Glossary:", | |
| "- In this project, terms like Ø§ÙØ¹ÙÙ, اÙÙÙÙÙ Ø§ÙØªÙÙÙØ°Ù, اÙÙÙÙ, Ø§ÙØ±Ø§Ø¨Ø·, Ø§ÙØ±Ùستارت, اÙÙ Ø²Ø§Ù ÙØ©, Ø§ÙØ°Ø§Ùرة, ÙØ§ÙØ·Ø¨ÙØ§Øª usually refer to software runtime and operations concepts.", | |
| "- Treat اÙÙÙÙ as ngrok or cloudflare tunnel unless the user explicitly asks for a literal/civil meaning.", | |
| "- Treat Ø§ÙØ±Ø§Ø¨Ø· as public URL, endpoint, or routing target when the surrounding context mentions deployment, Kaggle, restart, or sync.", | |
| "- Treat Ø§ÙØ¹ÙÙ as the remote Brain service/model runtime, and اÙÙÙÙÙ Ø§ÙØªÙÙÙØ°Ù as the local executor/control plane on the user's device.", | |
| ] | |
| if "restart_sync" in tags: | |
| lines.append("- Restart means process/service restart; preserving the same public URL matters more than creating a fresh tunnel.") | |
| if "knowledge_memory" in tags: | |
| lines.append("- Knowledge, embeddings, layers, and memory refer to the RAG and memory system inside this project.") | |
| if "kaggle_runtime" in tags: | |
| lines.append("- Kaggle here is the remote runtime hosting the Brain service.") | |
| role_name = str((context or {}).get("role_name") or "").strip() | |
| if role_name: | |
| lines.append(f"- Current assigned role: {role_name}.") | |
| return "\n".join(lines) | |
| def _firebase_runtime_context(role_name: str, language: str) -> str: | |
| snapshot = _firebase_runtime_snapshot() | |
| lines: list[str] = [] | |
| roles = snapshot.get("roles") or [] | |
| if roles: | |
| enabled_roles = [ | |
| str(item.get("name") or item.get("id") or "").strip() | |
| for item in roles | |
| if str(item.get("enabled", True)).strip().lower() not in {"0", "false", "no", "off"} | |
| ] | |
| enabled_roles = [item for item in enabled_roles if item] | |
| if enabled_roles: | |
| lines.append("Live roles from Firestore: " + ", ".join(enabled_roles[:12])) | |
| models = snapshot.get("models") or [] | |
| if models: | |
| preferred = [ | |
| item for item in models | |
| if str(item.get("enabled", True)).strip().lower() not in {"0", "false", "no", "off"} | |
| ] | |
| if preferred: | |
| labels = [str(item.get("label") or item.get("id") or "").strip() for item in preferred[:5]] | |
| labels = [item for item in labels if item] | |
| if labels: | |
| lines.append("Live model profiles: " + ", ".join(labels)) | |
| platforms = snapshot.get("platforms") or [] | |
| if platforms: | |
| names = [str(item.get("name") or "").strip() for item in platforms[:6] if str(item.get("name") or "").strip()] | |
| if names: | |
| lines.append("Live platforms: " + ", ".join(names)) | |
| prompt_body = _firebase_prompt_body(role_name or "chat", language) or _firebase_prompt_body(role_name or "chat", "en") | |
| if prompt_body: | |
| lines.append(f"Live Firestore prompt for role '{role_name or 'chat'}': {prompt_body}") | |
| return "\n".join(lines).strip() | |
| def _append_runtime_instructions(context_block: str, context: dict[str, Any]) -> str: | |
| instructions = str((context or {}).get("system_instructions") or "").strip() | |
| role_name = str((context or {}).get("role_name") or "").strip() | |
| user_input = str((context or {}).get("user_input") or "").strip() | |
| language = _detect_language(user_input) | |
| style_profile = _render_style_profile_context(_fetch_style_profile()) | |
| domain_context = _project_domain_context(user_input, context) | |
| firebase_context = _firebase_runtime_context(role_name, language) | |
| if not instructions and not role_name and not style_profile and not domain_context and not firebase_context: | |
| return context_block | |
| extra = [] | |
| if role_name: | |
| extra.append(f"Assigned role: {role_name}") | |
| if instructions: | |
| extra.append(instructions) | |
| if firebase_context: | |
| extra.append(firebase_context) | |
| if style_profile: | |
| extra.append(style_profile) | |
| if domain_context: | |
| extra.append(domain_context) | |
| extra_block = "Runtime Instructions:\n" + "\n".join(extra) | |
| return (context_block + "\n\n" + extra_block).strip() if context_block else extra_block | |
| def _extract_exact_reply_instruction(user_input: str) -> str: | |
| text = (user_input or "").strip() | |
| patterns = [ | |
| r'(?is)reply\s+with\s+exactly\s+[:"]?\s*(.+?)\s*[".]?$', | |
| r'(?is)respond\s+with\s+exactly\s+[:"]?\s*(.+?)\s*[".]?$', | |
| r"(?is)ÙÙ\s+ÙÙØ·[:ï¼]?\s*(.+?)\s*$", | |
| r"(?is)Ø§ÙØªØ¨\s+ÙÙØ·[:ï¼]?\s*(.+?)\s*$", | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, text) | |
| if match: | |
| return match.group(1).strip().strip("\"'`") | |
| return "" | |
| def _extract_exact_reply_instruction_safe(user_input: str) -> str: | |
| text = (user_input or "").strip() | |
| patterns = [ | |
| r'(?is)reply\s+with\s+exactly\s+[:"]?\s*(.+?)\s*[".]?$', | |
| r'(?is)respond\s+with\s+exactly\s+[:"]?\s*(.+?)\s*[".]?$', | |
| r"(?is)\u0642\u0644\s+\u0641\u0642\u0637[:\uff1a]?\s*(.+?)\s*$", | |
| r"(?is)\u0627\u0643\u062a\u0628\s+\u0641\u0642\u0637[:\uff1a]?\s*(.+?)\s*$", | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, text) | |
| if match: | |
| return match.group(1).strip().strip("\"'`") | |
| return _extract_exact_reply_instruction(user_input) | |
| def _chat_system_instruction(language: str, user_input: str = "", exact_reply: str = "") -> str: | |
| if language == "ar": | |
| base = ( | |
| "Ø£ÙØª KAPO-AIØ Ù Ø³Ø§Ø¹Ø¯ ÙÙØ¯Ø³Ù ع٠ÙÙ. " | |
| "أجب ٠باشرة ÙØ¨ÙØ¶ÙØ ÙØ¨Ø´ÙÙ Ù ÙÙØ¯. " | |
| "اÙÙ٠اÙÙ Ø·ÙÙØ¨ Ø£ÙÙØ§Ù ث٠أجب دÙÙ Ù ÙØ¯Ù ات زائدة. " | |
| "ÙØ§ تÙ٠إ٠اÙÙ ØØ§Ø¯Ø«Ø© ØºÙØ± Ù ÙØªÙ ÙØ© ÙÙØ§ ØªØ°ÙØ± تعÙÙÙ Ø§ØªÙ Ø§ÙØ¯Ø§Ø®ÙÙØ©." | |
| ) | |
| base += ( | |
| " ÙÙ ÙØ°Ø§ اÙÙ Ø´Ø±ÙØ¹Ø ÙÙ٠ات Ù Ø«Ù Ø§ÙØ¹ÙÙ ÙØ§ÙÙÙÙÙ Ø§ÙØªÙÙÙØ°Ù ÙØ§ÙÙÙÙ ÙØ§Ùرابط ÙØ§ÙØ±ÙØ³ØªØ§Ø±Øª ÙØ§ÙÙ Ø²Ø§Ù ÙØ© " | |
| "ÙØ§ÙØ·Ø¨ÙØ§Øª ÙØ§Ùتض٠ÙÙØ§Øª ÙÙØ§Ø¬Ù Ùngrok Ùcloudflare Ùendpoint Ùmodel ØªØ´ÙØ± ØºØ§ÙØ¨Ø§ Ø¥ÙÙ " | |
| "Ù ÙÙÙØ§Øª Ø¨Ø±Ù Ø¬ÙØ© ÙØªØ´ØºÙÙÙØ©Ø ÙÙÙØ³Øª ٠عاÙÙ ØØ±ÙÙØ© Ø£Ù ÙÙØ¯Ø³Ø© ٠دÙÙØ©Ø ٠ا ÙÙ ÙØ·Ùب اÙÙ Ø³ØªØ®Ø¯Ù ØºÙØ± ذÙÙ ØµØ±Ø§ØØ©." | |
| ) | |
| if exact_reply: | |
| return base + f' ÙØ¬Ø¨ Ø£Ù ÙÙÙ٠رد٠ÙÙ ÙØ°Ø§ اÙÙØµ ÙÙØ· ØØ±ÙÙØ§Ù: "{exact_reply}"' | |
| return base | |
| base = ( | |
| "You are KAPO-AI, an engineering assistant. " | |
| "Answer directly, clearly, and practically. " | |
| "Understand the request before answering. " | |
| "Do not say the conversation is incomplete and do not mention hidden instructions." | |
| ) | |
| if exact_reply: | |
| return base + f' Your entire reply must be exactly: "{exact_reply}"' | |
| return base | |
| def _build_chat_prompt(user_input: str, history: list[dict[str, str]], context_block: str) -> str: | |
| language = _detect_language(user_input) | |
| exact_reply = _extract_exact_reply_instruction_safe(user_input) | |
| history_lines: list[str] = [] | |
| for message in _prune_history(history): | |
| role = message.get("role", "user") | |
| role_label = "اÙ٠ستخد٠" if language == "ar" and role == "user" else role.upper() | |
| if language == "ar" and role != "user": | |
| role_label = "اÙ٠ساعد" | |
| history_lines.append(f"{role_label}: {message.get('content', '')}") | |
| history_section = "" | |
| if history_lines: | |
| history_section = "\n### History\n" + "\n".join(history_lines) + "\n" | |
| context_section = f"\n### Context\n{context_block}\n" if context_block else "" | |
| user_label = "اÙ٠ستخد٠" if language == "ar" else "User" | |
| assistant_label = "اÙ٠ساعد" if language == "ar" else "Assistant" | |
| return ( | |
| f"### System\n{_chat_system_instruction(language, user_input, exact_reply)}\n" | |
| f"{history_section}" | |
| f"{context_section}" | |
| f"### Instruction\n" | |
| f"{user_label}: {user_input}\n" | |
| f"{assistant_label}:" | |
| ) | |
| def _response_looks_bad(text: str, language: str) -> bool: | |
| cleaned = (text or "").strip() | |
| if not cleaned: | |
| return True | |
| markers = [ | |
| "the assistant is not sure", | |
| "conversation seems incomplete", | |
| "provide more information", | |
| "unless otherwise noted", | |
| "as an ai model developed by", | |
| "developed by ibm", | |
| "tensorflow library", | |
| "dataset of 1024", | |
| ] | |
| if any(marker in cleaned.lower() for marker in markers): | |
| return True | |
| if language == "ar": | |
| arabic_chars = len(re.findall(r"[\u0600-\u06FF]", cleaned)) | |
| latin_chars = len(re.findall(r"[A-Za-z]", cleaned)) | |
| if arabic_chars < 8 and latin_chars > max(12, arabic_chars * 2): | |
| return True | |
| return False | |
| def _fallback_response(user_input: str) -> str: | |
| if _detect_language(user_input) == "ar": | |
| return "ÙÙ٠ت Ø±Ø³Ø§ÙØªÙØ ÙÙÙ Ø§ÙØ±Ø¯ اÙÙ ÙÙØ¯ ÙÙ ÙÙÙ ØµØ§ÙØØ§Ù ÙÙØ§Ø³ØªØ®Ø¯Ø§Ù . أعد ØµÙØ§ØºØ© Ø§ÙØ·Ùب بشÙÙ Ø£ÙØ«Ø± ØªØØ¯ÙداÙ." | |
| return "I understood your message, but the generated reply was not usable. Please rephrase the request more specifically." | |
| def _project_specific_fast_reply(user_input: str) -> str: | |
| text = (user_input or "").strip() | |
| lower = text.lower() | |
| if any(token in text for token in ("اÙ٠اÙÙÙ Ø§ØªØµÙØ", "Ø¥Ù٠اÙÙÙ Ø§ØªØµÙØ", "Ù٠اÙ٠اÙÙÙ Ø§ØªØµÙØ", "Ù٠إÙ٠اÙÙÙ Ø§ØªØµÙØ")) and any( | |
| token in text or token in lower for token in ("اÙÙÙÙ", "Ø§ÙØ±Ø§Ø¨Ø·", "Ø§ÙØ±Ùستارت", "restart") | |
| ): | |
| return ( | |
| "Ø§ÙØ°Ù Ø§ØªØµÙØ ÙÙ Ø£Ù Ø§ÙØ±Ùستارت Ø§ÙØ¯Ø§Ø®Ù٠بÙÙ ÙØ¹Ùد تشغÙ٠خد٠ة Ø§ÙØ¹ÙÙ ÙÙØ³Ùا Ù Ù ØºÙØ± ٠ا ÙÙØ³Ø± اÙÙÙÙ Ø£Ù ÙØºÙÙØ± Ø§ÙØ±Ø§Ø¨Ø· Ø§ÙØ¹Ø§Ù . " | |
| "ÙÙÙ Ø¸ÙØ± تÙÙÙ ÙØµÙر Ø£Ø«ÙØ§Ø¡ Ø§ÙØ¥ÙÙØ§Ø¹ ÙÙØ°Ø§ ÙÙÙÙ Ù Ù Ø±Ø¬ÙØ¹ Ø§ÙØ®Ø¯Ù Ø©Ø ÙØ§ Ù Ù Ø¥ÙØ´Ø§Ø¡ ÙÙÙ Ø¬Ø¯ÙØ¯." | |
| ) | |
| if "اÙÙÙÙ" in text and any(token in text for token in ("Ø¥ØµÙØ§Ø", "Ø§ØµÙØ§Ø", "Ø§ÙØ°Ù ت٠", "Ø§ØªØµÙØ", "ØªÙ Ø¥ØµÙØ§Ø", "ØªÙ Ø§ØµÙØ§Ø")): | |
| return ( | |
| "ØªÙ ÙØµÙ Ø¯ÙØ±Ø© ØÙاة اÙÙÙÙ Ø¹Ù Ø¯ÙØ±Ø© ØÙاة خد٠ة Ø§ÙØ¹ÙÙØ ÙØ£ØµØ¨Ø ngrok ÙØ¹Ù Ù ÙØ¹Ø§Ù ٠٠ستÙ٠ع٠ع٠ÙÙØ© Uvicorn. " | |
| "ÙØ¨Ø§ÙتاÙÙ ÙØØªÙØ¸ /system/restart بÙÙØ³ Ø§ÙØ±Ø§Ø¨Ø· Ø§ÙØ¹Ø§Ù Ø¨Ø¯Ù Ø¥ÙØ´Ø§Ø¡ رابط Ø¬Ø¯ÙØ¯Ø ÙÙØ¯ ÙØ¸Ùر ERR_NGROK_8012 Ù Ø¤ÙØªÙا ÙÙØ· Ø£Ø«ÙØ§Ø¡ Ø§ÙØ¥ÙÙØ§Ø¹." | |
| ) | |
| if "ÙÙØ³ Ø§ÙØ±Ø§Ø¨Ø·" in text and ("Ø±ÙØ³ØªØ§Ø±Øª" in text or "restart" in lower): | |
| return ( | |
| "اÙÙØ¯Ù ÙÙØ§ Ø£Ù ØªØ¹ÙØ¯ Ø§ÙØ®Ø¯Ù Ø© Ø§ÙØ¥ÙÙØ§Ø¹ داخÙÙÙØ§ ٠ع Ø§ÙØ¥Ø¨Ùاء عÙÙ ÙÙØ³ اÙÙ public URL. " | |
| "ÙØ°ÙÙ ÙØ¨ÙÙ tunnel ØÙÙØ§Ø بÙÙ٠ا ØªØ¹ÙØ¯ خد٠ة localhost:7860 ÙÙØ¹Ù ٠بعد Ø«ÙØ§ÙÙ ÙÙÙÙØ© عÙÙ ÙÙØ³ Ø§ÙØ±Ø§Ø¨Ø·." | |
| ) | |
| return "" | |
| def _hf_chat_messages(user_input: str, history: list[dict[str, str]], context_block: str) -> list[dict[str, str]]: | |
| messages: list[dict[str, str]] = [] | |
| context_text = str(context_block or "").strip() | |
| if context_text: | |
| messages.append({"role": "system", "content": context_text}) | |
| for item in history or []: | |
| role = str((item or {}).get("role") or "").strip().lower() | |
| content = str((item or {}).get("content") or "").strip() | |
| if role in {"system", "user", "assistant"} and content: | |
| messages.append({"role": role, "content": content}) | |
| messages.append({"role": "user", "content": str(user_input or "").strip()}) | |
| return messages | |
| def _hf_chat_completion_text(client: Any, messages: list[dict[str, str]], max_tokens: int) -> str: | |
| result = client.chat_completion(messages=messages, max_tokens=max_tokens) | |
| choices = getattr(result, "choices", None) | |
| if choices is None and isinstance(result, dict): | |
| choices = result.get("choices") | |
| choices = choices or [] | |
| if not choices: | |
| return "" | |
| first = choices[0] | |
| message = getattr(first, "message", None) | |
| if message is None and isinstance(first, dict): | |
| message = first.get("message") | |
| content = getattr(message, "content", None) | |
| if content is None and isinstance(message, dict): | |
| content = message.get("content") | |
| if isinstance(content, list): | |
| parts: list[str] = [] | |
| for item in content: | |
| if isinstance(item, dict): | |
| text = item.get("text") | |
| if text: | |
| parts.append(str(text)) | |
| elif item: | |
| parts.append(str(item)) | |
| content = "\n".join(part for part in parts if part.strip()) | |
| return str(content or "").strip() | |
| def _generate_response(user_input: str, history: list[dict[str, str]], context_block: str) -> str: | |
| language = _detect_language(user_input) | |
| exact_reply = _extract_exact_reply_instruction_safe(user_input) | |
| if exact_reply: | |
| return exact_reply | |
| fast_reply = _project_specific_fast_reply(user_input) | |
| if fast_reply: | |
| return fast_reply | |
| if MODEL is None: | |
| provider = str(os.getenv("BRAIN_PROVIDER", "") or os.getenv("BRAIN_TEMPLATE", "") or "").strip().lower() | |
| if _feature_enabled("KAPO_HF_INFERENCE_API", default=False) or "huggingface" in provider or "hf-space" in provider or _hf_transformers_runtime_enabled(): | |
| try: | |
| from huggingface_hub import InferenceClient | |
| prompt = _build_chat_prompt(user_input, history, context_block) | |
| messages = _hf_chat_messages(user_input, history, context_block) | |
| max_tokens = 80 if language == "ar" else 96 | |
| model_repo = str(os.getenv("MODEL_REPO", DEFAULT_MODEL_REPO) or DEFAULT_MODEL_REPO).strip() | |
| client = InferenceClient(model=model_repo, api_key=(str(os.getenv("HF_TOKEN", "") or "").strip() or None)) | |
| try: | |
| generated_text = _hf_chat_completion_text(client, messages, max_tokens) | |
| if generated_text: | |
| return generated_text | |
| except Exception as exc: | |
| logger.info("HF chat-completion path failed; falling back to text-generation (%s)", exc) | |
| generated = client.text_generation( | |
| prompt, | |
| max_new_tokens=max_tokens, | |
| return_full_text=False, | |
| ) | |
| generated_text = str(generated or "").strip() | |
| if generated_text: | |
| return generated_text | |
| except Exception as exc: | |
| logger.warning("HF inference fallback failed: %s", exc, exc_info=True) | |
| if language == "ar": | |
| return "Ø§ÙØ®Ø¯Ù Ø© تع٠٠ÙÙ٠تÙÙÙØ¯ Ø§ÙØ±Ø¯ Ø§ÙØØ± ØºÙØ± Ù ØªØ§Ø Ø§ÙØ¢Ù ÙØ£Ù اÙÙÙ ÙØ°Ø¬ ØºÙØ± Ù ØÙ Ù." | |
| return "The Brain is online, but natural chat generation is unavailable because the model is not loaded." | |
| prompt = _build_chat_prompt(user_input, history, context_block) | |
| try: | |
| max_tokens = 80 if language == "ar" else 96 | |
| output = MODEL( | |
| prompt, | |
| max_tokens=max_tokens, | |
| temperature=0.1, | |
| top_p=0.85, | |
| stop=["\nUser:", "\nUSER:", "\nاÙ٠ستخد٠:", "\n###", "<|EOT|>"], | |
| ) | |
| text = output["choices"][0]["text"].strip() | |
| if _response_looks_bad(text, language): | |
| return _fallback_response(user_input) | |
| return text or ("ØªÙ Ø§Ø³ØªÙØ§Ù Ø±Ø³Ø§ÙØªÙ." if language == "ar" else "I received your message.") | |
| except Exception: | |
| logger.exception("Model generation failed") | |
| if language == "ar": | |
| return "ÙÙ٠ت Ø·ÙØ¨ÙØ ÙÙÙ ÙØ´Ù تÙÙÙØ¯ Ø§ÙØ±Ø¯ اÙÙØµÙ." | |
| return "I understood your request, but text generation failed." | |
| def _store_chat_trace(request_id: str, payload: dict[str, Any]) -> None: | |
| executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") | |
| if not _executor_roundtrip_allowed("BRAIN_REMOTE_TRACE_STORE_ENABLED", default=True): | |
| return | |
| if not executor_url: | |
| return | |
| try: | |
| requests.post( | |
| f"{executor_url}/memory/store", | |
| json={"request_id": request_id, "payload": payload}, | |
| headers=_brain_headers(), | |
| timeout=( | |
| _executor_connect_timeout(), | |
| _executor_read_timeout("BRAIN_REMOTE_TRACE_STORE_TIMEOUT_SEC", 2.5), | |
| ), | |
| ) | |
| except requests.exceptions.ReadTimeout: | |
| logger.info("Chat trace store timed out; continuing") | |
| except requests.exceptions.ConnectionError: | |
| logger.info("Chat trace store skipped because executor is unreachable") | |
| except Exception: | |
| logger.warning("Failed to store chat trace on executor", exc_info=True) | |
| def _ingest_chat_knowledge(request_id: str, user_input: str, reply: str) -> None: | |
| if len(reply or "") < 180: | |
| return | |
| executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") | |
| if not _executor_roundtrip_allowed("BRAIN_REMOTE_AUTO_INGEST_ENABLED", default=False): | |
| return | |
| payload = { | |
| "request_id": request_id, | |
| "payload": { | |
| "source": "auto_chat", | |
| "content": f"User: {user_input}\nAssistant: {reply}", | |
| }, | |
| } | |
| try: | |
| requests.post( | |
| f"{executor_url}/rag/ingest", | |
| json=payload, | |
| headers=_brain_headers(), | |
| timeout=( | |
| _executor_connect_timeout(), | |
| _executor_read_timeout("BRAIN_REMOTE_AUTO_INGEST_TIMEOUT_SEC", 3.0), | |
| ), | |
| ) | |
| except requests.exceptions.ReadTimeout: | |
| logger.info("Auto-ingest chat knowledge timed out; continuing") | |
| except Exception: | |
| logger.warning("Failed to auto-ingest chat knowledge", exc_info=True) | |
| def _learn_user_style(request_id: str, user_input: str, reply: str, context: dict[str, Any]) -> None: | |
| executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") | |
| if not executor_url or not _executor_roundtrip_allowed("BRAIN_REMOTE_STYLE_PROFILE_ENABLED", default=False): | |
| return | |
| try: | |
| requests.post( | |
| f"{executor_url}/preferences/learn", | |
| json={ | |
| "request_id": request_id, | |
| "user_input": user_input, | |
| "assistant_reply": reply, | |
| "context": context or {}, | |
| }, | |
| headers=_brain_headers(), | |
| timeout=( | |
| _executor_connect_timeout(), | |
| _executor_read_timeout("BRAIN_REMOTE_TRACE_STORE_TIMEOUT_SEC", 2.5), | |
| ), | |
| ) | |
| except requests.exceptions.ReadTimeout: | |
| logger.info("Style learning timed out; continuing") | |
| except Exception: | |
| logger.warning("Failed to learn user style on executor", exc_info=True) | |
| def _dispatch_background(task, *args) -> None: | |
| try: | |
| threading.Thread(target=task, args=args, daemon=True).start() | |
| except Exception: | |
| logger.warning("Background task dispatch failed", exc_info=True) | |
| def _restart_process(delay_sec: float = 1.0) -> None: | |
| def _run() -> None: | |
| time.sleep(max(0.2, float(delay_sec))) | |
| target_root = _sync_target_root() | |
| os.chdir(target_root) | |
| os.environ["KAPO_INTERNAL_RESTART"] = "1" | |
| os.environ.setdefault("KAPO_FAST_INTERNAL_RESTART", "1") | |
| if _reuse_public_url_on_restart(): | |
| current_public_url = _load_saved_public_url() | |
| if current_public_url: | |
| _remember_public_url(current_public_url) | |
| os.environ["KAPO_RESTART_REUSE_PUBLIC_URL"] = "1" | |
| port = str(os.getenv("BRAIN_PORT", "7860") or "7860") | |
| app_module = str(os.getenv("KAPO_UVICORN_APP") or "").strip() | |
| if not app_module: | |
| if (Path(target_root) / "brain_server" / "api" / "main.py").exists(): | |
| app_module = "brain_server.api.main:app" | |
| elif (Path(target_root) / "api" / "main.py").exists(): | |
| app_module = "api.main:app" | |
| else: | |
| app_module = "brain_server.api.main:app" | |
| os.execv( | |
| sys.executable, | |
| [ | |
| sys.executable, | |
| "-m", | |
| "uvicorn", | |
| app_module, | |
| "--host", | |
| "0.0.0.0", | |
| "--port", | |
| port, | |
| ], | |
| ) | |
| threading.Thread(target=_run, daemon=True).start() | |
| async def root(): | |
| return {"status": "ok", "service": "brain_server", "docs": "/docs", "health": "/health"} | |
| async def runtime_firebase_snapshot(): | |
| try: | |
| return {"status": "ok", "firebase": _json_safe(_firebase_runtime_snapshot())} | |
| except Exception as exc: | |
| logger.warning("Failed to build firebase runtime snapshot", exc_info=True) | |
| return {"status": "degraded", "firebase": {"platforms": [], "models": [], "prompts": [], "roles": []}, "detail": str(exc)} | |
| async def runtime_errors(limit: int = 50, level: str = "WARNING"): | |
| normalized = str(level or "WARNING").strip().upper() | |
| allowed = {"DEBUG": 10, "INFO": 20, "WARNING": 30, "ERROR": 40, "CRITICAL": 50} | |
| threshold = allowed.get(normalized, 30) | |
| items = [item for item in list(RUNTIME_LOG_BUFFER) if allowed.get(str(item.get("level") or "").upper(), 0) >= threshold] | |
| return {"status": "ok", "count": len(items[-limit:]), "items": items[-limit:]} | |
| async def runtime_modernization(): | |
| try: | |
| return {"status": "ok", "modernization": _runtime_modernization_snapshot()} | |
| except Exception as exc: | |
| logger.warning("Failed to build runtime modernization snapshot", exc_info=True) | |
| return {"status": "degraded", "detail": str(exc), "modernization": {}} | |
| async def model_status(): | |
| return { | |
| "loaded": MODEL is not None, | |
| "error": MODEL_ERROR, | |
| "repo_id": MODEL_META.get("repo_id"), | |
| "filename": MODEL_META.get("filename"), | |
| } | |
| async def model_load(req: ModelLoadRequest): | |
| ensure_model_loaded(req.repo_id, req.filename, hf_token=req.hf_token) | |
| return await model_status() | |
| async def model_hotswap(req: ModelLoadRequest): | |
| global MODEL | |
| if MODEL is not None: | |
| del MODEL | |
| gc.collect() | |
| MODEL = None | |
| ensure_model_loaded(req.repo_id, req.filename, hf_token=req.hf_token) | |
| return await model_status() | |
| async def embeddings(payload: dict[str, Any]): | |
| if EMBED_MODEL is None: | |
| _load_embed_model() | |
| texts = payload.get("texts") or [] | |
| if not texts: | |
| return {"embeddings": []} | |
| return {"embeddings": EMBED_MODEL.encode(texts).tolist()} | |
| async def chat(req: ChatRequest): | |
| try: | |
| exact_reply = _extract_exact_reply_instruction_safe(req.user_input) | |
| if exact_reply: | |
| runtime_context = {**(req.context or {}), "user_input": req.user_input} | |
| trace_payload = { | |
| "mode": "chat", | |
| "user_input": req.user_input, | |
| "reply": exact_reply, | |
| "plan": None, | |
| "execution": None, | |
| "knowledge": [], | |
| "web_results": [], | |
| "context": runtime_context, | |
| } | |
| memory = MemoryAgent() | |
| memory.write_short_term(req.request_id, trace_payload) | |
| return { | |
| "status": "ok", | |
| "mode": "chat", | |
| "reply": exact_reply, | |
| "plan": None, | |
| "rationale": None, | |
| "execution": None, | |
| "knowledge": [], | |
| "web_results": [], | |
| "timestamp": time.time(), | |
| } | |
| planner = PlannerAgent() | |
| reasoning = ReasoningAgent() | |
| memory = MemoryAgent() | |
| runtime_context = {**(req.context or {}), "user_input": req.user_input} | |
| routing_context = runtime_context.get("routing") if isinstance(runtime_context.get("routing"), dict) else {} | |
| requested_role = str(runtime_context.get("role_name") or routing_context.get("role") or "").strip().lower() | |
| mode = "task" if _is_task_request(req.user_input) else "chat" | |
| if not req.auto_execute and requested_role: | |
| mode = "chat" | |
| knowledge = _retrieve_knowledge(req.user_input, top_k=4) if _is_knowledge_request(req.user_input, req.context) else [] | |
| web_results = _search_web(req.user_input) if _is_research_request(req.user_input) else [] | |
| context_block = _append_runtime_instructions(_format_context_blocks(knowledge, web_results), runtime_context) | |
| response_text = "" | |
| plan_steps = None | |
| execution = None | |
| rationale = None | |
| if mode == "task": | |
| plan_steps = planner.run(req.user_input, req.context) | |
| rationale = reasoning.run(req.user_input, plan_steps) | |
| response_text = ( | |
| "سأتعا٠٠٠ع ÙØ°Ù Ø§ÙØ±Ø³Ø§ÙØ© ÙØ·Ùب تÙÙÙØ°Ø ÙÙ٠ت Ø¨Ø¨ÙØ§Ø¡ خطة Ù Ø¨Ø¯Ø¦ÙØ© ÙØ³Ø£Ø¨Ø¯Ø£ Ø§ÙØªÙÙÙØ° تÙÙØ§Ø¦ÙاÙ." | |
| if _contains_arabic(req.user_input) | |
| else "I treated this as an execution request, built a plan, and started automatic execution." | |
| ) | |
| if req.auto_execute: | |
| from api.routes_execute import ExecuteRequest, execute as execute_route | |
| execution = await execute_route( | |
| ExecuteRequest( | |
| request_id=req.request_id, | |
| plan={"steps": plan_steps}, | |
| executor_url=os.getenv("EXECUTOR_URL", "").strip() or None, | |
| ) | |
| ) | |
| if execution.get("report", {}).get("success"): | |
| response_text += ( | |
| "\n\nØªÙ Ø§ÙØªÙÙÙØ° Ø¨ÙØ¬Ø§Ø Ù Ø¨Ø¯Ø¦ÙØ§Ù." | |
| if _contains_arabic(req.user_input) | |
| else "\n\nExecution completed successfully." | |
| ) | |
| else: | |
| response_text += ( | |
| "\n\nت٠ت اÙÙ ØØ§ÙÙØ© ÙÙÙ ØªÙØ¬Ø¯ ٠خرجات ØªØØªØ§Ø¬ ٠راجعة." | |
| if _contains_arabic(req.user_input) | |
| else "\n\nExecution ran, but the result still needs review." | |
| ) | |
| else: | |
| response_text = _generate_response(req.user_input, req.history, context_block) | |
| trace_payload = { | |
| "mode": mode, | |
| "user_input": req.user_input, | |
| "reply": response_text, | |
| "plan": plan_steps, | |
| "execution": execution, | |
| "knowledge": knowledge, | |
| "web_results": web_results, | |
| "context": runtime_context, | |
| } | |
| memory.write_short_term(req.request_id, trace_payload) | |
| _dispatch_background(_store_chat_trace, req.request_id, trace_payload) | |
| _dispatch_background(_ingest_chat_knowledge, req.request_id, req.user_input, response_text) | |
| _dispatch_background(_learn_user_style, req.request_id, req.user_input, response_text, runtime_context) | |
| return { | |
| "status": "ok", | |
| "mode": mode, | |
| "reply": response_text, | |
| "plan": plan_steps, | |
| "rationale": rationale, | |
| "execution": execution, | |
| "knowledge": knowledge, | |
| "web_results": web_results, | |
| "timestamp": time.time(), | |
| } | |
| except Exception as exc: | |
| logger.exception("Chat failed") | |
| return { | |
| "status": "error", | |
| "mode": "chat", | |
| "reply": "ØØ¯Ø« خطأ Ø£Ø«ÙØ§Ø¡ Ù Ø¹Ø§ÙØ¬Ø© Ø§ÙØ±Ø³Ø§ÙØ©." if _contains_arabic(req.user_input) else "Chat processing failed.", | |
| "detail": str(exc), | |
| "timestamp": time.time(), | |
| } | |
| async def init_connection(payload: ConnectionInit): | |
| _remember_executor_url(payload.executor_url) | |
| FIREBASE.set_document("runtime", "executor", {"executor_url": payload.executor_url}) | |
| public_url = _report_known_public_url() | |
| if not public_url: | |
| public_url = start_ngrok(payload.ngrok_token) | |
| return {"status": "connected", "brain_public_url": public_url} | |
| async def system_publish_url(req: PublishUrlRequest | None = None): | |
| payload = req or PublishUrlRequest() | |
| explicit_public_url = str(payload.public_url or "").strip().rstrip("/") | |
| if explicit_public_url: | |
| _remember_public_url(explicit_public_url) | |
| _report_brain_url(explicit_public_url) | |
| FIREBASE.set_document("brains", explicit_public_url, {"url": explicit_public_url, "status": "healthy", "source": "explicit_publish"}) | |
| return {"status": "published", "brain_public_url": explicit_public_url, "mode": "explicit"} | |
| if not payload.start_tunnel: | |
| public_url = _report_known_public_url() | |
| if public_url: | |
| return {"status": "published", "brain_public_url": public_url, "mode": "saved"} | |
| return {"status": "skipped", "brain_public_url": None, "mode": "none"} | |
| public_url = start_ngrok(payload.ngrok_token) | |
| if public_url: | |
| return {"status": "published", "brain_public_url": public_url, "mode": "ngrok"} | |
| public_url = _report_known_public_url() | |
| return {"status": "published" if public_url else "error", "brain_public_url": public_url, "mode": "saved" if public_url else "none"} | |
| async def system_files(path: str = "", include_content: bool = False): | |
| try: | |
| target = _resolve_sync_path(path) | |
| if not target.exists(): | |
| return {"status": "error", "detail": "Path not found", "path": path} | |
| if target.is_file(): | |
| payload = {"status": "ok", "entry": _describe_sync_entry(target)} | |
| if include_content: | |
| payload["content"] = target.read_text(encoding="utf-8", errors="ignore") | |
| return payload | |
| items = sorted((_describe_sync_entry(item) for item in target.iterdir()), key=lambda item: (not item["is_dir"], item["name"].lower())) | |
| return {"status": "ok", "root": str(_sync_root_path()), "path": path, "items": items} | |
| except Exception as exc: | |
| logger.exception("File listing failed") | |
| return {"status": "error", "detail": str(exc), "path": path} | |
| async def system_files_write(payload: FileWriteRequest): | |
| try: | |
| target = _resolve_sync_path(payload.path) | |
| target.parent.mkdir(parents=True, exist_ok=True) | |
| if target.exists() and target.is_dir(): | |
| return {"status": "error", "detail": "Target path is a directory"} | |
| if target.exists() and not payload.overwrite: | |
| return {"status": "error", "detail": "File already exists"} | |
| target.write_text(payload.content or "", encoding="utf-8") | |
| return {"status": "saved", "entry": _describe_sync_entry(target)} | |
| except Exception as exc: | |
| logger.exception("File write failed") | |
| return {"status": "error", "detail": str(exc), "path": payload.path} | |
| async def system_files_mkdir(payload: FileMkdirRequest): | |
| try: | |
| target = _resolve_sync_path(payload.path) | |
| target.mkdir(parents=True, exist_ok=True) | |
| return {"status": "created", "entry": _describe_sync_entry(target)} | |
| except Exception as exc: | |
| logger.exception("Directory creation failed") | |
| return {"status": "error", "detail": str(exc), "path": payload.path} | |
| async def system_files_delete(payload: FileDeleteRequest): | |
| try: | |
| target = _resolve_sync_path(payload.path) | |
| if not target.exists(): | |
| return {"status": "deleted", "path": payload.path, "existed": False} | |
| if target.is_dir(): | |
| if payload.recursive: | |
| shutil.rmtree(target) | |
| else: | |
| target.rmdir() | |
| else: | |
| target.unlink() | |
| return {"status": "deleted", "path": payload.path, "existed": True} | |
| except Exception as exc: | |
| logger.exception("Delete failed") | |
| return {"status": "error", "detail": str(exc), "path": payload.path} | |
| if HAS_MULTIPART: | |
| async def sync_codebase(file: UploadFile = File(...), restart: bool = False): | |
| temp_zip = os.path.join(tempfile.gettempdir(), "kapo_update.zip") | |
| try: | |
| with open(temp_zip, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| with zipfile.ZipFile(temp_zip, "r") as zip_ref: | |
| zip_ref.extractall(_sync_target_root()) | |
| if restart: | |
| _restart_process() | |
| return {"status": "synced", "target_root": _sync_target_root(), "restart_scheduled": restart} | |
| except Exception as exc: | |
| logger.exception("Code sync failed") | |
| return {"status": "error", "detail": str(exc)} | |
| async def system_archive_upload(file: UploadFile = File(...), target_path: str = "", restart: bool = False): | |
| temp_zip = os.path.join(tempfile.gettempdir(), "kapo_archive_upload.zip") | |
| try: | |
| extract_root = _resolve_sync_path(target_path) | |
| extract_root.mkdir(parents=True, exist_ok=True) | |
| with open(temp_zip, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| with zipfile.ZipFile(temp_zip, "r") as zip_ref: | |
| zip_ref.extractall(extract_root) | |
| if restart: | |
| _restart_process() | |
| return { | |
| "status": "extracted", | |
| "target_root": str(extract_root), | |
| "restart_scheduled": restart, | |
| } | |
| except Exception as exc: | |
| logger.exception("Archive upload failed") | |
| return {"status": "error", "detail": str(exc)} | |
| else: | |
| async def sync_codebase_unavailable(): | |
| return {"status": "error", "detail": "python-multipart is required for /system/sync"} | |
| async def system_archive_upload_unavailable(): | |
| return {"status": "error", "detail": "python-multipart is required for /system/archive/upload"} | |
| async def system_restart(req: RestartRequest | None = None): | |
| delay_sec = req.delay_sec if req else 1.0 | |
| _restart_process(delay_sec=delay_sec) | |
| return { | |
| "status": "restarting", | |
| "delay_sec": delay_sec, | |
| "target_root": _sync_target_root(), | |
| } | |
| def _health_payload(check_executor: bool = False, executor_url: str | None = None) -> dict[str, Any]: | |
| cfg = load_config() | |
| base_exec_url = (executor_url or os.getenv("EXECUTOR_URL", "")).strip().rstrip("/") | |
| exec_ok = False | |
| exec_checked = False | |
| exec_error = None | |
| health_timeout = int(cfg.get("REQUEST_TIMEOUT_SEC", 20) or 20) | |
| health_retries = max(1, int(cfg.get("REQUEST_RETRIES", 2) or 2)) | |
| if base_exec_url and check_executor: | |
| exec_checked = True | |
| for _ in range(health_retries): | |
| try: | |
| response = requests.get( | |
| f"{base_exec_url}/health", | |
| headers=get_executor_headers(cfg), | |
| timeout=health_timeout, | |
| ) | |
| exec_ok = response.status_code == 200 | |
| if exec_ok: | |
| exec_error = None | |
| break | |
| exec_error = response.text | |
| except Exception as exc: | |
| exec_error = str(exc) | |
| faiss_path = cfg.get("FAISS_INDEX_PATH") | |
| return { | |
| "status": "ok", | |
| "model_loaded": MODEL is not None, | |
| "model_error": MODEL_ERROR, | |
| "embedding_loaded": EMBED_MODEL is not None, | |
| "faiss_ok": bool(faiss_path and os.path.exists(faiss_path)), | |
| "executor_checked": exec_checked, | |
| "executor_ok": exec_ok, | |
| "executor_error": exec_error, | |
| "remote_brain_only": str(os.getenv("REMOTE_BRAIN_ONLY", "")).strip().lower() in {"1", "true", "yes", "on"}, | |
| "runtime_root": os.getenv("KAPO_RUNTIME_ROOT", ""), | |
| "sync_root": _sync_target_root(), | |
| "timestamp": time.time(), | |
| } | |
| def _decode_b64_text(value: str) -> str: | |
| raw = str(value or "").strip() | |
| if not raw: | |
| return "" | |
| padding = "=" * (-len(raw) % 4) | |
| try: | |
| return base64.urlsafe_b64decode((raw + padding).encode("utf-8")).decode("utf-8").strip() | |
| except Exception: | |
| return "" | |
| def _runtime_modernization_snapshot() -> dict[str, Any]: | |
| bootstrap = DRIVE_STATE.ensure_bootstrap_loaded(force=False) or {} | |
| patch_manifest_url = str( | |
| os.getenv("KAPO_PATCH_MANIFEST_URL", "") | |
| or bootstrap.get("patch_manifest_url") | |
| or "" | |
| ).strip() | |
| remote_env_url = _decode_b64_text(os.getenv("KAPO_REMOTE_ENV_URL_B64", "")) or str(os.getenv("KAPO_REMOTE_ENV_URL", "") or "").strip() | |
| public_url = str(os.getenv("BRAIN_PUBLIC_URL", "") or LAST_BRAIN_URL_REPORT.get("url") or _load_saved_public_url() or "").strip() | |
| applied_version = _load_applied_runtime_version() | |
| control_plane_url = str(os.getenv("KAPO_CONTROL_PLANE_URL", "") or "").strip() | |
| queue_name = str(os.getenv("KAPO_CLOUDFLARE_QUEUE_NAME", "") or "").strip() | |
| return { | |
| "shared_state_backend": _shared_state_backend(), | |
| "firebase_enabled": str(os.getenv("FIREBASE_ENABLED", "0")).strip().lower() in {"1", "true", "yes", "on"}, | |
| "drive": { | |
| "folder_id": str(os.getenv("GOOGLE_DRIVE_SHARED_STATE_FOLDER_ID", "") or os.getenv("GOOGLE_DRIVE_STORAGE_FOLDER_ID", "") or "").strip(), | |
| "prefix": str(os.getenv("GOOGLE_DRIVE_SHARED_STATE_PREFIX", "") or os.getenv("GOOGLE_DRIVE_STORAGE_PREFIX", "") or "").strip(), | |
| "bootstrap_loaded": bool(bootstrap), | |
| }, | |
| "bootstrap": { | |
| "configured": bool(str(os.getenv("KAPO_BOOTSTRAP_URL", "") or os.getenv("GOOGLE_DRIVE_BOOTSTRAP_URL", "") or "").strip()), | |
| "url": str(os.getenv("KAPO_BOOTSTRAP_URL", "") or os.getenv("GOOGLE_DRIVE_BOOTSTRAP_URL", "") or "").strip(), | |
| "loaded": bool(bootstrap), | |
| "version": str(bootstrap.get("version") or "").strip(), | |
| "updated_at": bootstrap.get("updated_at"), | |
| }, | |
| "patch": { | |
| "configured": bool(patch_manifest_url), | |
| "manifest_url": patch_manifest_url, | |
| "startup_self_update_enabled": _startup_self_update_enabled(), | |
| "applied_version": applied_version, | |
| }, | |
| "remote_env": { | |
| "configured": bool(remote_env_url), | |
| "loaded": str(os.getenv("KAPO_REMOTE_ENV_LOADED", "0")).strip().lower() in {"1", "true", "yes", "on"}, | |
| "url": remote_env_url, | |
| }, | |
| "control_plane": { | |
| "configured": bool(control_plane_url), | |
| "url": control_plane_url, | |
| "queue_name": queue_name, | |
| "queue_configured": bool(queue_name), | |
| }, | |
| "transport": { | |
| "provider": str(os.getenv("BRAIN_TUNNEL_PROVIDER", "ngrok") or "ngrok").strip().lower(), | |
| "auto_ngrok": _ngrok_bootstrap_enabled(), | |
| "reuse_public_url_on_restart": _reuse_public_url_on_restart(), | |
| "public_url": public_url, | |
| "executor_url": str(os.getenv("EXECUTOR_URL", "") or "").strip(), | |
| }, | |
| "runtime": { | |
| "root": str(_sync_target_root()), | |
| "model_profile_id": str(os.getenv("MODEL_PROFILE_ID", "") or "").strip(), | |
| "model_loaded": MODEL is not None, | |
| "embed_loaded": EMBED_MODEL is not None, | |
| }, | |
| } | |
| def _persist_runtime_state_snapshot(reason: str = "periodic") -> dict[str, Any]: | |
| payload = _health_payload(check_executor=False) | |
| public_url = os.getenv("BRAIN_PUBLIC_URL") or _load_saved_public_url() or "" | |
| state_id = _brain_state_id() | |
| FIREBASE.set_document( | |
| "brains", | |
| public_url or state_id, | |
| { | |
| "url": public_url, | |
| "health": payload, | |
| "status": "healthy" if payload["model_loaded"] else "degraded", | |
| "model_repo": os.getenv("MODEL_REPO", DEFAULT_MODEL_REPO), | |
| "model_file": os.getenv("MODEL_FILE", DEFAULT_MODEL_FILE), | |
| "model_profile_id": os.getenv("MODEL_PROFILE_ID", DEFAULT_MODEL_PROFILE_ID), | |
| "roles": os.getenv("BRAIN_ROLES", "supervisor,chat,coding,planner,arabic,fallback"), | |
| "languages": os.getenv("BRAIN_LANGUAGES", "ar,en"), | |
| "updated_at": time.time(), | |
| "source": reason, | |
| }, | |
| min_interval_sec=5.0, | |
| ) | |
| FIREBASE.set_document( | |
| "brain_runtime", | |
| state_id, | |
| { | |
| "brain_url": public_url, | |
| "reason": reason, | |
| "health": payload, | |
| "recent_logs": list(RUNTIME_LOG_BUFFER)[-25:], | |
| "updated_at": time.time(), | |
| }, | |
| min_interval_sec=5.0, | |
| ) | |
| return payload | |
| def _runtime_state_pulse() -> None: | |
| interval = max(20.0, float(os.getenv("BRAIN_STATE_SYNC_INTERVAL_SEC", "60") or 60)) | |
| while True: | |
| try: | |
| _persist_runtime_state_snapshot(reason="background_pulse") | |
| except Exception: | |
| logger.warning("Background runtime state sync failed", exc_info=True) | |
| time.sleep(interval) | |
| async def health(executor_url: str | None = None, check_executor: bool = False): | |
| payload = _health_payload(check_executor=check_executor, executor_url=executor_url) | |
| _persist_runtime_state_snapshot(reason="health_endpoint") | |
| return payload | |