| """FastAPI entrypoint for the Brain Server.""" |
| import base64 |
| import gc |
| import logging |
| import os |
| import re |
| import shutil |
| import subprocess |
| import sys |
| import tempfile |
| import threading |
| import time |
| import zipfile |
| from collections import deque |
| from pathlib import Path |
| from typing import Any |
|
|
| import requests |
| from fastapi import FastAPI, File, UploadFile |
| from pydantic import BaseModel |
|
|
| from agents.memory_agent import MemoryAgent |
| from agents.planner_agent import PlannerAgent |
| from agents.reasoning_agent import ReasoningAgent |
| try: |
| from api import deps as deps_module |
| from api.deps import get_executor_headers, get_logger, load_config |
| from api.firebase_store import FirebaseStore |
| from api.routes_analyze import router as analyze_router |
| from api.routes_execute import router as execute_router |
| from api.routes_plan import router as plan_router |
| from shared.google_drive_state import GoogleDriveStateClient |
| except ImportError: |
| from . import deps as deps_module |
| from .deps import get_executor_headers, get_logger, load_config |
| from .firebase_store import FirebaseStore |
| from .routes_analyze import router as analyze_router |
| from .routes_execute import router as execute_router |
| from .routes_plan import router as plan_router |
| from shared.google_drive_state import GoogleDriveStateClient |
|
|
| logger = get_logger("kapo.brain.main") |
|
|
|
|
| def _configure_windows_utf8() -> None: |
| if os.name != "nt": |
| return |
| os.environ.setdefault("PYTHONUTF8", "1") |
| os.environ.setdefault("PYTHONIOENCODING", "utf-8") |
| os.environ.setdefault("PYTHONLEGACYWINDOWSSTDIO", "utf-8") |
| try: |
| import ctypes |
|
|
| kernel32 = ctypes.windll.kernel32 |
| kernel32.SetConsoleCP(65001) |
| kernel32.SetConsoleOutputCP(65001) |
| except Exception: |
| pass |
|
|
|
|
| _configure_windows_utf8() |
|
|
| if hasattr(sys.stdout, "reconfigure"): |
| sys.stdout.reconfigure(encoding="utf-8", errors="replace") |
| if hasattr(sys.stderr, "reconfigure"): |
| sys.stderr.reconfigure(encoding="utf-8", errors="replace") |
|
|
| app = FastAPI(title="KAPO-AI Brain Server", version="1.0.0") |
| app.include_router(plan_router) |
| app.include_router(execute_router) |
| app.include_router(analyze_router) |
|
|
| MODEL = None |
| MODEL_ERROR = None |
| MODEL_META = {"repo_id": None, "filename": None, "path": None} |
| EMBED_MODEL = None |
| FIREBASE = FirebaseStore("brain", logger_name="kapo.brain.firebase") |
| DRIVE_STATE = GoogleDriveStateClient(logger) |
| FIREBASE_RUNTIME_CACHE: dict[str, tuple[float, Any]] = {} |
| RUNTIME_LOG_BUFFER: deque[dict[str, Any]] = deque(maxlen=200) |
| LAST_BRAIN_URL_REPORT: dict[str, Any] = {"url": "", "ts": 0.0} |
| RUNTIME_STATE_THREAD_STARTED = False |
|
|
| DEFAULT_MODEL_REPO = "QuantFactory/aya-expanse-8b-GGUF" |
| DEFAULT_MODEL_FILE = "aya-expanse-8b.Q4_K_M.gguf" |
| DEFAULT_MODEL_PROFILE_ID = "supervisor-ar-en-default" |
| HAS_MULTIPART = True |
| try: |
| import multipart |
| except Exception: |
| HAS_MULTIPART = False |
|
|
|
|
| class RuntimeLogHandler(logging.Handler): |
| def emit(self, record) -> None: |
| try: |
| RUNTIME_LOG_BUFFER.append( |
| { |
| "ts": time.time(), |
| "level": record.levelname, |
| "name": record.name, |
| "message": record.getMessage(), |
| } |
| ) |
| except Exception: |
| pass |
|
|
|
|
| _runtime_log_handler = RuntimeLogHandler(level=logging.WARNING) |
| if not any(isinstance(handler, RuntimeLogHandler) for handler in logger.handlers): |
| logger.addHandler(_runtime_log_handler) |
|
|
|
|
| def _feature_enabled(name: str, default: bool = False) -> bool: |
| value = os.getenv(name) |
| if value is None or str(value).strip() == "": |
| return default |
| return str(value).strip().lower() in {"1", "true", "yes", "on"} |
|
|
|
|
| def _remote_brain_only() -> bool: |
| return _feature_enabled("REMOTE_BRAIN_ONLY", default=False) |
|
|
|
|
| def _ngrok_bootstrap_enabled() -> bool: |
| return _feature_enabled("BRAIN_AUTO_NGROK", default=True) |
|
|
|
|
| def _configured_public_url() -> str: |
| return str(os.getenv("BRAIN_PUBLIC_URL", "")).strip().rstrip("/") |
|
|
|
|
| def _prefer_configured_public_url() -> bool: |
| provider = str(os.getenv("BRAIN_PROVIDER", "") or os.getenv("BRAIN_TEMPLATE", "")).strip().lower() |
| if "huggingface" in provider or "hf-space" in provider: |
| return True |
| return _feature_enabled("BRAIN_FORCE_CONFIGURED_PUBLIC_URL", default=False) |
|
|
|
|
| def _reuse_public_url_on_restart() -> bool: |
| return _feature_enabled("BRAIN_REUSE_PUBLIC_URL_ON_RESTART", default=True) |
|
|
|
|
| def _auto_publish_public_url_on_startup() -> bool: |
| return _feature_enabled("BRAIN_AUTO_PUBLISH_URL_ON_STARTUP", default=True) |
|
|
|
|
| def _internal_restart_in_progress() -> bool: |
| return _feature_enabled("KAPO_INTERNAL_RESTART", default=False) |
|
|
|
|
| def _fast_restart_enabled() -> bool: |
| return _feature_enabled("KAPO_FAST_INTERNAL_RESTART", default=True) |
|
|
|
|
| def _executor_connect_timeout() -> float: |
| return float(os.getenv("EXECUTOR_CONNECT_TIMEOUT_SEC", "3.0") or 3.0) |
|
|
|
|
| def _executor_read_timeout(name: str, default: float) -> float: |
| return float(os.getenv(name, str(default)) or default) |
|
|
|
|
| def _executor_roundtrip_allowed(feature_name: str, default: bool = True) -> bool: |
| executor_url = os.getenv("EXECUTOR_URL", "").strip() |
| if not executor_url: |
| return False |
| kaggle_defaults = { |
| "BRAIN_REMOTE_TRACE_STORE_ENABLED": False, |
| "BRAIN_REMOTE_AUTO_INGEST_ENABLED": False, |
| "BRAIN_REMOTE_STYLE_PROFILE_ENABLED": False, |
| } |
| effective_default = kaggle_defaults.get(feature_name, default) if _is_kaggle_runtime() else default |
| return _feature_enabled(feature_name, default=effective_default) |
|
|
|
|
| def _remote_runtime_reads_enabled() -> bool: |
| return _feature_enabled("KAPO_REMOTE_STATE_READS", default=False) |
|
|
|
|
| def _shared_state_backend() -> str: |
| return str(os.getenv("KAPO_SHARED_STATE_BACKEND", "")).strip().lower() |
|
|
|
|
| def _drive_bootstrap_configured() -> bool: |
| return bool( |
| str(os.getenv("GOOGLE_DRIVE_BOOTSTRAP_URL", "") or os.getenv("KAPO_BOOTSTRAP_URL", "") or "").strip() |
| ) |
|
|
|
|
| def _bootstrap_shared_state() -> None: |
| if _drive_bootstrap_configured() or _shared_state_backend() in {"google_drive", "drive", "gdrive"}: |
| DRIVE_STATE.ensure_bootstrap_loaded(force=False) |
|
|
|
|
| def _startup_self_update_enabled() -> bool: |
| return _feature_enabled("KAPO_STARTUP_SELF_UPDATE", default=True) |
|
|
|
|
| def _should_report_brain_url(public_url: str) -> bool: |
| normalized = str(public_url or "").strip().rstrip("/") |
| if not normalized: |
| return False |
| interval_sec = max(30.0, float(os.getenv("BRAIN_REPORT_MIN_INTERVAL_SEC", "600") or 600)) |
| previous_url = str(LAST_BRAIN_URL_REPORT.get("url") or "").strip() |
| previous_ts = float(LAST_BRAIN_URL_REPORT.get("ts") or 0.0) |
| now = time.time() |
| if normalized != previous_url or (now - previous_ts) >= interval_sec: |
| LAST_BRAIN_URL_REPORT["url"] = normalized |
| LAST_BRAIN_URL_REPORT["ts"] = now |
| return True |
| return False |
|
|
|
|
| def _download_model(repo_id: str, filename: str, hf_token: str | None = None) -> str: |
| from huggingface_hub import hf_hub_download |
|
|
| configured_cache = str(os.getenv("MODEL_CACHE_DIR", "") or "").strip() |
| if configured_cache: |
| cache_dir = configured_cache |
| elif _is_kaggle_runtime(): |
| cache_dir = str((_project_root() / "models_cache").resolve()) |
| else: |
| cache_dir = os.path.join(tempfile.gettempdir(), "kapo_models") |
| os.makedirs(cache_dir, exist_ok=True) |
| return hf_hub_download(repo_id=repo_id, filename=filename, cache_dir=cache_dir, token=hf_token) |
|
|
|
|
| def ensure_model_loaded(repo_id: str, filename: str, hf_token: str | None = None) -> None: |
| global MODEL, MODEL_ERROR, MODEL_META |
| repo_id = (repo_id or "").strip() |
| filename = (filename or "").strip() |
| if not repo_id or not filename: |
| MODEL = None |
| MODEL_ERROR = "model repo/file missing" |
| return |
|
|
| try: |
| model_path = _download_model(repo_id, filename, hf_token=hf_token) |
| except Exception as exc: |
| MODEL = None |
| MODEL_ERROR = f"model download failed: {exc}" |
| logger.exception("Model download failed") |
| return |
|
|
| try: |
| from llama_cpp import Llama |
|
|
| MODEL = Llama(model_path=model_path, n_ctx=4096) |
| MODEL_ERROR = None |
| MODEL_META = {"repo_id": repo_id, "filename": filename, "path": model_path} |
| logger.info("Loaded model %s/%s", repo_id, filename) |
| except Exception as exc: |
| MODEL = None |
| MODEL_ERROR = f"model load failed: {exc}" |
| logger.exception("Model load failed") |
|
|
|
|
| def _load_embed_model() -> None: |
| global EMBED_MODEL |
| if EMBED_MODEL is not None: |
| return |
|
|
| from sentence_transformers import SentenceTransformer |
|
|
| model_name = os.getenv("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2") |
| EMBED_MODEL = SentenceTransformer(model_name) |
| logger.info("Loaded embedding model %s", model_name) |
|
|
|
|
| def _load_default_model() -> None: |
| repo_id = os.getenv("MODEL_REPO", DEFAULT_MODEL_REPO) |
| filename = os.getenv("MODEL_FILE", DEFAULT_MODEL_FILE) |
| ensure_model_loaded(repo_id, filename, hf_token=os.getenv("HF_TOKEN")) |
|
|
|
|
| def _brain_headers() -> dict: |
| cfg = load_config() |
| return get_executor_headers(cfg) |
|
|
|
|
| def _project_root() -> Path: |
| return Path(__file__).resolve().parents[1] |
|
|
|
|
| def _is_kaggle_runtime() -> bool: |
| return "/kaggle/" in str(_project_root()).replace("\\", "/") or bool(os.getenv("KAGGLE_KERNEL_RUN_TYPE")) |
|
|
|
|
| def _is_hf_space_runtime() -> bool: |
| return str(os.getenv("HF_SPACE_DOCKER", "0")).strip().lower() in {"1", "true", "yes", "on"} or bool(os.getenv("SPACE_ID")) |
|
|
|
|
| def _apply_executor_settings(settings: dict[str, Any]) -> None: |
| for key in ( |
| "NGROK_AUTHTOKEN", |
| "MODEL_REPO", |
| "MODEL_FILE", |
| "MODEL_PROFILE_ID", |
| "SUPERVISOR_MODEL_PROFILE_ID", |
| "EMBED_MODEL", |
| "REQUEST_TIMEOUT_SEC", |
| "REQUEST_RETRIES", |
| "CHAT_TIMEOUT_SEC", |
| "EXECUTOR_BYPASS_HEADER", |
| "EXECUTOR_BYPASS_VALUE", |
| "BRAIN_BYPASS_HEADER", |
| "BRAIN_BYPASS_VALUE", |
| "REMOTE_BRAIN_ONLY", |
| "KAGGLE_AUTO_BOOTSTRAP", |
| "BRAIN_AUTO_NGROK", |
| "BRAIN_AUTO_PUBLISH_URL_ON_STARTUP", |
| "BRAIN_PUBLIC_URL", |
| "BRAIN_REUSE_PUBLIC_URL_ON_RESTART", |
| "KAGGLE_SYNC_SUBDIR", |
| "BRAIN_ROLES", |
| "BRAIN_LANGUAGES", |
| "BRAIN_REMOTE_KNOWLEDGE_ENABLED", |
| "BRAIN_REMOTE_WEB_SEARCH_ENABLED", |
| "BRAIN_REMOTE_TRACE_STORE_ENABLED", |
| "BRAIN_REMOTE_AUTO_INGEST_ENABLED", |
| "BRAIN_LOCAL_RAG_FALLBACK_ENABLED", |
| "EXECUTOR_CONNECT_TIMEOUT_SEC", |
| "BRAIN_REMOTE_KNOWLEDGE_TIMEOUT_SEC", |
| "BRAIN_REMOTE_WEB_SEARCH_TIMEOUT_SEC", |
| "BRAIN_REMOTE_TRACE_STORE_TIMEOUT_SEC", |
| "BRAIN_REMOTE_AUTO_INGEST_TIMEOUT_SEC", |
| "FIREBASE_ENABLED", |
| "FIREBASE_PROJECT_ID", |
| "FIREBASE_SERVICE_ACCOUNT_PATH", |
| "FIREBASE_SERVICE_ACCOUNT_JSON", |
| "FIREBASE_NAMESPACE", |
| "KAPO_SHARED_STATE_BACKEND", |
| "GOOGLE_DRIVE_SHARED_STATE_FOLDER_ID", |
| "GOOGLE_DRIVE_SHARED_STATE_PREFIX", |
| "GOOGLE_DRIVE_BOOTSTRAP_URL", |
| "KAPO_BOOTSTRAP_URL", |
| "GOOGLE_DRIVE_ACCESS_TOKEN", |
| "GOOGLE_DRIVE_REFRESH_TOKEN", |
| "GOOGLE_DRIVE_CLIENT_SECRET_JSON", |
| "GOOGLE_DRIVE_CLIENT_SECRET_JSON_BASE64", |
| "GOOGLE_DRIVE_CLIENT_SECRET_PATH", |
| "GOOGLE_DRIVE_TOKEN_EXPIRES_AT", |
| "KAPO_CONTROL_PLANE_URL", |
| "KAPO_CLOUDFLARE_QUEUE_NAME", |
| ): |
| value = settings.get(key) |
| if value not in (None, ""): |
| os.environ[key] = str(value) |
| deps_module.CONFIG_CACHE = None |
|
|
|
|
| def _apply_firebase_runtime_settings() -> None: |
| if not _remote_runtime_reads_enabled(): |
| return |
| if not FIREBASE.enabled(): |
| return |
| shared = FIREBASE.get_document("settings", "global") |
| runtime = FIREBASE.get_document("runtime", "executor") |
| role_items = FIREBASE.list_documents("roles", limit=64) |
| merged = {} |
| merged.update(shared or {}) |
| merged.update(runtime or {}) |
| mappings = { |
| "executor_public_url": "EXECUTOR_PUBLIC_URL", |
| "executor_url": "EXECUTOR_URL", |
| "model_repo": "MODEL_REPO", |
| "model_file": "MODEL_FILE", |
| "model_profile_id": "MODEL_PROFILE_ID", |
| "supervisor_model_profile_id": "SUPERVISOR_MODEL_PROFILE_ID", |
| "brain_roles": "BRAIN_ROLES", |
| "brain_languages": "BRAIN_LANGUAGES", |
| } |
| current_brain_url = str(merged.get("current_brain_url") or "").strip() |
| if current_brain_url: |
| os.environ["KAPO_EXECUTOR_CURRENT_BRAIN_URL"] = current_brain_url |
| for key, env_name in mappings.items(): |
| value = merged.get(key) |
| if value not in (None, ""): |
| os.environ[env_name] = str(value) |
| if role_items: |
| enabled_roles = [ |
| str(item.get("name") or item.get("id") or "").strip().lower() |
| for item in role_items |
| if str(item.get("enabled", True)).strip().lower() not in {"0", "false", "no", "off"} |
| ] |
| enabled_roles = [role for role in enabled_roles if role] |
| if enabled_roles: |
| os.environ["BRAIN_ROLES"] = ",".join(dict.fromkeys(enabled_roles)) |
| deps_module.CONFIG_CACHE = None |
|
|
|
|
| def _firebase_collection_cache_key(name: str) -> str: |
| return f"collection:{name}" |
|
|
|
|
| def _firebase_list_documents_cached(collection: str, ttl_sec: float = 30.0, limit: int = 200) -> list[dict[str, Any]]: |
| if not _remote_runtime_reads_enabled(): |
| return [] |
| if not FIREBASE.enabled(): |
| return [] |
| key = _firebase_collection_cache_key(collection) |
| now = time.time() |
| cached = FIREBASE_RUNTIME_CACHE.get(key) |
| if cached and (now - cached[0]) < ttl_sec: |
| return list(cached[1] or []) |
| items = FIREBASE.list_documents(collection, limit=limit) |
| FIREBASE_RUNTIME_CACHE[key] = (now, items) |
| return list(items or []) |
|
|
|
|
| def _firebase_role_profiles() -> list[dict[str, Any]]: |
| items = _firebase_list_documents_cached("roles", ttl_sec=30.0, limit=64) |
| if items: |
| return items |
| roles = [part.strip() for part in str(os.getenv("BRAIN_ROLES", "")).split(",") if part.strip()] |
| return [{"name": role, "enabled": True} for role in roles] |
|
|
|
|
| def _firebase_runtime_snapshot() -> dict[str, Any]: |
| return { |
| "platforms": _firebase_list_documents_cached("platforms", ttl_sec=45.0, limit=64), |
| "models": _firebase_list_documents_cached("models", ttl_sec=45.0, limit=128), |
| "prompts": _firebase_list_documents_cached("prompts", ttl_sec=20.0, limit=128), |
| "roles": _firebase_role_profiles(), |
| } |
|
|
|
|
| def _json_safe(value: Any) -> Any: |
| if isinstance(value, dict): |
| return {str(key): _json_safe(item) for key, item in value.items()} |
| if isinstance(value, list): |
| return [_json_safe(item) for item in value] |
| if isinstance(value, tuple): |
| return [_json_safe(item) for item in value] |
| if isinstance(value, (str, int, float, bool)) or value is None: |
| return value |
| return str(value) |
|
|
|
|
| def _firebase_prompt_body(role_name: str, language: str = "en") -> str: |
| role_name = str(role_name or "").strip().lower() |
| language = str(language or "en").strip().lower() |
| if not role_name: |
| return "" |
| prompts = _firebase_runtime_snapshot().get("prompts", []) |
| exact = [] |
| fallback = [] |
| for item in prompts: |
| if str(item.get("role_name") or "").strip().lower() != role_name: |
| continue |
| if str(item.get("enabled", True)).strip().lower() in {"0", "false", "no", "off"}: |
| continue |
| item_lang = str(item.get("language") or "en").strip().lower() |
| body = str(item.get("body") or "").strip() |
| if not body: |
| continue |
| if item_lang == language: |
| exact.append(body) |
| elif item_lang == "en": |
| fallback.append(body) |
| if exact: |
| return exact[0] |
| if fallback: |
| return fallback[0] |
| return "" |
|
|
|
|
| def _prepare_runtime_environment() -> None: |
| try: |
| _bootstrap_shared_state() |
| except Exception: |
| logger.warning("Shared-state bootstrap preload failed", exc_info=True) |
| if not _is_kaggle_runtime(): |
| return |
|
|
| source_root = _project_root() |
| source_text = str(source_root).replace("\\", "/") |
| runtime_root_env = os.getenv("KAPO_RUNTIME_ROOT", "").strip() |
| if runtime_root_env: |
| runtime_root = Path(runtime_root_env).resolve() |
| elif source_text.startswith("/kaggle/working/"): |
| runtime_root = source_root.resolve() |
| else: |
| runtime_root = Path("/kaggle/working/KAPO-AI-SYSTEM").resolve() |
| sync_root = runtime_root |
| auto_bootstrap = str(os.getenv("KAGGLE_AUTO_BOOTSTRAP", "1")).strip().lower() in {"1", "true", "yes", "on"} |
|
|
| if auto_bootstrap and source_text.startswith("/kaggle/input/") and source_root != runtime_root: |
| if runtime_root.exists(): |
| shutil.rmtree(runtime_root, ignore_errors=True) |
| shutil.copytree( |
| source_root, |
| runtime_root, |
| ignore=shutil.ignore_patterns("__pycache__", "*.pyc", ".git", ".venv"), |
| ) |
| if str(runtime_root) not in sys.path: |
| sys.path.insert(0, str(runtime_root)) |
|
|
| data_dir = runtime_root / "data" / "local" / "brain_runtime" |
| data_dir.mkdir(parents=True, exist_ok=True) |
| os.environ["KAPO_RUNTIME_ROOT"] = str(runtime_root) |
| os.environ["KAPO_SYNC_ROOT"] = str(sync_root) |
| os.environ["LOCAL_DATA_DIR"] = str(data_dir) |
| os.environ["DB_PATH"] = str(data_dir / "episodic.db") |
| os.environ["TOOLS_DB_PATH"] = str(data_dir / "tools.db") |
| os.environ["FAISS_INDEX_PATH"] = str(data_dir / "faiss.index") |
| os.environ["REMOTE_BRAIN_ONLY"] = str(os.getenv("REMOTE_BRAIN_ONLY", "1") or "1") |
| saved_executor_url = _load_saved_executor_url() |
| current_executor_url = str(os.getenv("EXECUTOR_URL", "") or "").strip() |
| if saved_executor_url and not current_executor_url: |
| os.environ["EXECUTOR_URL"] = saved_executor_url |
| deps_module.CONFIG_CACHE = None |
|
|
|
|
| def _sync_target_root() -> str: |
| return os.getenv("KAPO_SYNC_ROOT") or os.getenv("KAPO_RUNTIME_ROOT") or os.getcwd() |
|
|
|
|
| def _sync_root_path() -> Path: |
| return Path(_sync_target_root()).resolve() |
|
|
|
|
| def _resolve_sync_path(user_path: str | None = None) -> Path: |
| root = _sync_root_path() |
| relative = str(user_path or "").strip().replace("\\", "/").lstrip("/") |
| candidate = (root / relative).resolve() if relative else root |
| if candidate != root and root not in candidate.parents: |
| raise ValueError("Path escapes sync root") |
| return candidate |
|
|
|
|
| def _describe_sync_entry(path: Path) -> dict[str, Any]: |
| stat = path.stat() |
| return { |
| "name": path.name or str(path), |
| "path": str(path.relative_to(_sync_root_path())).replace("\\", "/") if path != _sync_root_path() else "", |
| "is_dir": path.is_dir(), |
| "size": stat.st_size, |
| "modified_at": stat.st_mtime, |
| } |
|
|
|
|
| def _public_url_state_path() -> Path: |
| runtime_root = Path(_sync_target_root()).resolve() |
| state_dir = runtime_root / "data" / "local" / "brain_runtime" |
| state_dir.mkdir(parents=True, exist_ok=True) |
| return state_dir / "public_url.txt" |
|
|
|
|
| def _remember_public_url(public_url: str) -> None: |
| value = str(public_url or "").strip().rstrip("/") |
| if not value: |
| return |
| os.environ["BRAIN_PUBLIC_URL"] = value |
| try: |
| _public_url_state_path().write_text(value, encoding="utf-8") |
| except Exception: |
| logger.warning("Failed to persist public URL", exc_info=True) |
|
|
|
|
| def _load_saved_public_url() -> str: |
| try: |
| value = _public_url_state_path().read_text(encoding="utf-8").strip().rstrip("/") |
| return value |
| except Exception: |
| return "" |
|
|
|
|
| def _ngrok_api_state_path() -> Path: |
| runtime_root = Path(_sync_target_root()).resolve() |
| state_dir = runtime_root / "data" / "local" / "brain_runtime" |
| state_dir.mkdir(parents=True, exist_ok=True) |
| return state_dir / "ngrok_api_url.txt" |
|
|
|
|
| def _executor_url_state_path() -> Path: |
| runtime_root = Path(_sync_target_root()).resolve() |
| state_dir = runtime_root / "data" / "local" / "brain_runtime" |
| state_dir.mkdir(parents=True, exist_ok=True) |
| return state_dir / "executor_url.txt" |
|
|
|
|
| def _remember_executor_url(executor_url: str) -> None: |
| value = str(executor_url or "").strip().rstrip("/") |
| if not value: |
| return |
| os.environ["EXECUTOR_URL"] = value |
| try: |
| _executor_url_state_path().write_text(value, encoding="utf-8") |
| except Exception: |
| logger.warning("Failed to persist executor URL", exc_info=True) |
|
|
|
|
| def _load_saved_executor_url() -> str: |
| configured = str(os.getenv("EXECUTOR_URL", "")).strip().rstrip("/") |
| if configured: |
| return configured |
| try: |
| return _executor_url_state_path().read_text(encoding="utf-8").strip().rstrip("/") |
| except Exception: |
| return "" |
|
|
|
|
| def _brain_state_id() -> str: |
| public_url = str(os.getenv("BRAIN_PUBLIC_URL") or _load_saved_public_url() or "").strip().rstrip("/") |
| if public_url: |
| return re.sub(r"[^A-Za-z0-9._-]+", "_", public_url) |
| runtime_root = str(os.getenv("KAPO_RUNTIME_ROOT") or _sync_target_root() or "brain_runtime").strip() |
| return re.sub(r"[^A-Za-z0-9._-]+", "_", runtime_root) |
|
|
|
|
| def _applied_runtime_version_path() -> Path: |
| runtime_root = Path(_sync_target_root()).resolve() |
| state_dir = runtime_root / "data" / "local" / "brain_runtime" |
| state_dir.mkdir(parents=True, exist_ok=True) |
| return state_dir / "applied_version.json" |
|
|
|
|
| def _load_applied_runtime_version() -> dict[str, Any]: |
| try: |
| return dict(json.loads(_applied_runtime_version_path().read_text(encoding="utf-8")) or {}) |
| except Exception: |
| return {} |
|
|
|
|
| def _write_applied_runtime_version(payload: dict[str, Any]) -> None: |
| _applied_runtime_version_path().write_text( |
| json.dumps(dict(payload or {}), ensure_ascii=False, indent=2, sort_keys=True), |
| encoding="utf-8", |
| ) |
|
|
|
|
| def _download_remote_zip(url: str, destination: Path) -> Path: |
| response = requests.get(str(url).strip(), timeout=120) |
| response.raise_for_status() |
| destination.parent.mkdir(parents=True, exist_ok=True) |
| destination.write_bytes(response.content) |
| return destination |
|
|
|
|
| def _apply_zip_overlay(zip_path: Path, target_root: Path) -> None: |
| with zipfile.ZipFile(zip_path, "r") as archive: |
| archive.extractall(target_root) |
|
|
|
|
| def _load_patch_manifest_from_bootstrap() -> dict[str, Any]: |
| bootstrap = DRIVE_STATE.ensure_bootstrap_loaded(force=True) or {} |
| manifest_url = str(bootstrap.get("patch_manifest_url") or "").strip() |
| if not manifest_url: |
| return dict(bootstrap) |
| try: |
| response = requests.get(manifest_url, timeout=30) |
| response.raise_for_status() |
| payload = dict(response.json() or {}) |
| merged = {**bootstrap, **payload} |
| return merged |
| except Exception: |
| logger.warning("Failed to load patch manifest from bootstrap URL", exc_info=True) |
| return dict(bootstrap) |
|
|
|
|
| def _run_startup_self_update() -> None: |
| if not _startup_self_update_enabled(): |
| return |
| manifest = _load_patch_manifest_from_bootstrap() |
| target_version = str(manifest.get("version") or "").strip() |
| target_hash = str(manifest.get("build_hash") or "").strip() |
| if not target_version and not target_hash: |
| return |
| current = _load_applied_runtime_version() |
| if ( |
| str(current.get("version") or "").strip() == target_version |
| and str(current.get("build_hash") or "").strip() == target_hash |
| and target_version |
| ): |
| return |
| runtime_root = Path(_sync_target_root()).resolve() |
| temp_dir = runtime_root / "data" / "local" / "brain_runtime" / "updates" |
| patch_url = str(manifest.get("patch_bundle_url") or "").strip() |
| full_url = str(manifest.get("full_package_url") or "").strip() |
| applied_mode = "" |
| source_url = "" |
| try: |
| if patch_url: |
| zip_path = _download_remote_zip(patch_url, temp_dir / "patch_bundle.zip") |
| _apply_zip_overlay(zip_path, runtime_root) |
| applied_mode = "patch_bundle" |
| source_url = patch_url |
| elif full_url: |
| zip_path = _download_remote_zip(full_url, temp_dir / "full_package.zip") |
| _apply_zip_overlay(zip_path, runtime_root) |
| applied_mode = "full_package" |
| source_url = full_url |
| else: |
| return |
| _write_applied_runtime_version( |
| { |
| "version": target_version, |
| "build_hash": target_hash, |
| "applied_at": time.time(), |
| "mode": applied_mode, |
| "source_url": source_url, |
| } |
| ) |
| logger.info("Applied startup self-update (%s) version=%s", applied_mode, target_version or target_hash) |
| except Exception: |
| logger.warning("Startup self-update failed", exc_info=True) |
|
|
|
|
| def _remember_ngrok_api_url(api_url: str) -> None: |
| value = str(api_url or "").strip().rstrip("/") |
| if not value: |
| return |
| os.environ["KAPO_NGROK_API_URL"] = value |
| try: |
| _ngrok_api_state_path().write_text(value, encoding="utf-8") |
| except Exception: |
| logger.warning("Failed to persist ngrok API URL", exc_info=True) |
|
|
|
|
| def _load_saved_ngrok_api_url() -> str: |
| configured = str(os.getenv("KAPO_NGROK_API_URL", "")).strip().rstrip("/") |
| if configured: |
| return configured |
| try: |
| return _ngrok_api_state_path().read_text(encoding="utf-8").strip().rstrip("/") |
| except Exception: |
| return "" |
|
|
|
|
| def _ngrok_api_candidates() -> list[str]: |
| seen: set[str] = set() |
| candidates: list[str] = [] |
| for candidate in [_load_saved_ngrok_api_url(), "http://127.0.0.1:4040", "http://127.0.0.1:4041", "http://127.0.0.1:4042"]: |
| value = str(candidate or "").strip().rstrip("/") |
| if value and value not in seen: |
| seen.add(value) |
| candidates.append(value) |
| return candidates |
|
|
|
|
| def _probe_ngrok_api(api_url: str) -> bool: |
| try: |
| response = requests.get(f"{api_url}/api/tunnels", timeout=2) |
| return response.status_code == 200 |
| except Exception: |
| return False |
|
|
|
|
| def _find_live_ngrok_api() -> str | None: |
| for api_url in _ngrok_api_candidates(): |
| if _probe_ngrok_api(api_url): |
| _remember_ngrok_api_url(api_url) |
| return api_url |
| return None |
|
|
|
|
| def _ngrok_binary_path() -> str: |
| env_path = str(os.getenv("NGROK_PATH", "")).strip() |
| if env_path and Path(env_path).exists(): |
| return env_path |
| default_ngrok_path = "" |
| try: |
| from pyngrok import conf |
|
|
| default_ngrok_path = str(conf.get_default().ngrok_path or "").strip() |
| if default_ngrok_path and Path(default_ngrok_path).exists(): |
| return default_ngrok_path |
| except Exception: |
| pass |
| try: |
| from pyngrok import installer |
|
|
| install_target = default_ngrok_path or str((Path.home() / ".ngrok" / "ngrok").resolve()) |
| Path(install_target).parent.mkdir(parents=True, exist_ok=True) |
| installer.install_ngrok(install_target) |
| if Path(install_target).exists(): |
| return install_target |
| except Exception: |
| logger.warning("Failed to auto-install ngrok binary", exc_info=True) |
| discovered = shutil.which("ngrok") |
| if discovered: |
| return discovered |
| return "ngrok" |
|
|
|
|
| def _ensure_ngrok_auth(token: str) -> None: |
| ngrok_path = _ngrok_binary_path() |
| subprocess.run( |
| [ngrok_path, "config", "add-authtoken", token], |
| check=False, |
| stdout=subprocess.DEVNULL, |
| stderr=subprocess.DEVNULL, |
| ) |
|
|
|
|
| def _start_detached_ngrok_agent(token: str) -> str | None: |
| if token: |
| _ensure_ngrok_auth(token) |
| os.environ["NGROK_AUTHTOKEN"] = token |
|
|
| ngrok_path = _ngrok_binary_path() |
| popen_kwargs = { |
| "stdout": subprocess.DEVNULL, |
| "stderr": subprocess.DEVNULL, |
| "stdin": subprocess.DEVNULL, |
| } |
| if os.name == "nt": |
| popen_kwargs["creationflags"] = getattr(subprocess, "DETACHED_PROCESS", 0) | getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0) |
| else: |
| popen_kwargs["start_new_session"] = True |
|
|
| subprocess.Popen( |
| [ngrok_path, "start", "--none", "--log=stdout"], |
| **popen_kwargs, |
| ) |
|
|
| deadline = time.time() + 12 |
| while time.time() < deadline: |
| api_url = _find_live_ngrok_api() |
| if api_url: |
| return api_url |
| time.sleep(0.5) |
| return None |
|
|
|
|
| def _list_ngrok_tunnels(api_url: str) -> list[dict[str, Any]]: |
| response = requests.get(f"{api_url}/api/tunnels", timeout=5) |
| response.raise_for_status() |
| payload = response.json() |
| tunnels = payload.get("tunnels") |
| return tunnels if isinstance(tunnels, list) else [] |
|
|
|
|
| def _existing_ngrok_public_url(api_url: str, port: int) -> str | None: |
| for tunnel in _list_ngrok_tunnels(api_url): |
| public_url = str(tunnel.get("public_url") or "").strip() |
| config = tunnel.get("config") or {} |
| addr = str(config.get("addr") or "").strip() |
| if public_url and addr.endswith(f":{port}"): |
| return public_url.rstrip("/") |
| return None |
|
|
|
|
| def _create_ngrok_tunnel(api_url: str, port: int) -> str | None: |
| response = requests.post( |
| f"{api_url}/api/tunnels", |
| json={ |
| "name": f"http-{port}-kapo", |
| "addr": str(port), |
| "proto": "http", |
| }, |
| timeout=10, |
| ) |
| response.raise_for_status() |
| payload = response.json() |
| return str(payload.get("public_url") or "").strip().rstrip("/") or None |
|
|
|
|
| def _publish_brain_presence(public_url: str, *, source: str = "runtime") -> None: |
| normalized = str(public_url or "").strip().rstrip("/") |
| if not normalized: |
| return |
| payload = { |
| "url": normalized, |
| "status": "healthy", |
| "source": source, |
| "platform": "kaggle" if _is_kaggle_runtime() else str(os.getenv("BRAIN_PROVIDER") or "remote"), |
| "role": os.getenv("BRAIN_PRIMARY_ROLE", "fallback"), |
| "roles": [part.strip() for part in os.getenv("BRAIN_ROLES", "supervisor,chat,coding,planner,arabic,fallback").split(",") if part.strip()], |
| "languages": [part.strip() for part in os.getenv("BRAIN_LANGUAGES", "ar,en").split(",") if part.strip()], |
| "model_profile_id": os.getenv("MODEL_PROFILE_ID") or os.getenv("SUPERVISOR_MODEL_PROFILE_ID") or DEFAULT_MODEL_PROFILE_ID, |
| "model_repo": MODEL_META.get("repo_id") or os.getenv("MODEL_REPO") or DEFAULT_MODEL_REPO, |
| "model_file": MODEL_META.get("filename") or os.getenv("MODEL_FILE") or DEFAULT_MODEL_FILE, |
| "updated_at": time.time(), |
| } |
| FIREBASE.set_document("brains", normalized, payload) |
| FIREBASE.set_document( |
| "runtime", |
| "brains_last_report", |
| { |
| "brain_url": normalized, |
| "source": source, |
| "updated_at": time.time(), |
| "provider": payload["platform"], |
| "model_profile_id": payload["model_profile_id"], |
| }, |
| ) |
| FIREBASE.set_document( |
| "tunnels", |
| f"brain_{normalized}", |
| { |
| "kind": "brain", |
| "public_url": normalized, |
| "provider": "ngrok" if "ngrok" in normalized else payload["platform"], |
| "updated_at": time.time(), |
| }, |
| ) |
|
|
|
|
| def _report_brain_url(public_url: str) -> None: |
| _publish_brain_presence(public_url, source="report_attempt") |
| executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") |
| if not executor_url: |
| return |
| if not _should_report_brain_url(public_url): |
| return |
| last_error: Exception | None = None |
| connect_timeout = max(1.0, float(os.getenv("BRAIN_REPORT_CONNECT_TIMEOUT_SEC", "4.0") or 4.0)) |
| read_timeout = max(5.0, float(os.getenv("BRAIN_REPORT_READ_TIMEOUT_SEC", "15.0") or 15.0)) |
| retries = max(1, int(os.getenv("BRAIN_REPORT_RETRIES", "2") or 2)) |
| for _ in range(retries): |
| try: |
| response = requests.post( |
| f"{executor_url}/brain/report-url", |
| json={ |
| "brain_url": public_url, |
| "platform": "kaggle" if _is_kaggle_runtime() else "remote", |
| "role": os.getenv("BRAIN_PRIMARY_ROLE", "fallback"), |
| "roles": [part.strip() for part in os.getenv("BRAIN_ROLES", "supervisor,chat,coding,planner,arabic,fallback").split(",") if part.strip()], |
| "languages": [part.strip() for part in os.getenv("BRAIN_LANGUAGES", "ar,en").split(",") if part.strip()], |
| "model_profile_id": os.getenv("MODEL_PROFILE_ID") or os.getenv("SUPERVISOR_MODEL_PROFILE_ID") or DEFAULT_MODEL_PROFILE_ID, |
| "model_repo": MODEL_META.get("repo_id") or os.getenv("MODEL_REPO") or DEFAULT_MODEL_REPO, |
| "model_file": MODEL_META.get("filename") or os.getenv("MODEL_FILE") or DEFAULT_MODEL_FILE, |
| }, |
| headers=_brain_headers(), |
| timeout=(connect_timeout, read_timeout), |
| ) |
| response.raise_for_status() |
| _publish_brain_presence(public_url, source="executor_report") |
| return |
| except Exception as exc: |
| last_error = exc |
| time.sleep(1) |
| logger.info( |
| "Brain URL report to executor timed out or failed; continuing (%s)", |
| last_error, |
| ) |
|
|
|
|
| def _pull_executor_settings() -> dict[str, Any]: |
| if _shared_state_backend() in {"google_drive", "drive", "gdrive"} or _drive_bootstrap_configured(): |
| return {} |
| executor_url = _load_saved_executor_url() |
| if not executor_url: |
| return {} |
| try: |
| response = requests.get( |
| f"{executor_url}/share/settings", |
| headers=_brain_headers(), |
| timeout=( |
| max(1.5, float(os.getenv("EXECUTOR_CONNECT_TIMEOUT_SEC", "3.0") or 3.0)), |
| max(2.0, float(os.getenv("EXECUTOR_SETTINGS_READ_TIMEOUT_SEC", "5.0") or 5.0)), |
| ), |
| ) |
| if response.status_code == 200: |
| return response.json() |
| logger.warning("Executor settings request failed: %s", response.text[:400]) |
| except Exception as exc: |
| logger.warning("Failed to pull executor settings (%s)", exc) |
| return {} |
|
|
|
|
| def start_ngrok(token: str | None = None) -> str | None: |
| restart_reuse = str(os.getenv("KAPO_RESTART_REUSE_PUBLIC_URL", "")).strip().lower() in {"1", "true", "yes", "on"} |
| if restart_reuse: |
| saved_public_url = _load_saved_public_url() |
| if saved_public_url: |
| _remember_public_url(saved_public_url) |
| _report_brain_url(saved_public_url) |
| FIREBASE.set_document("brains", saved_public_url, {"url": saved_public_url, "status": "healthy", "source": "restart_reuse"}) |
| FIREBASE.set_document("tunnels", f"brain_{saved_public_url}", {"kind": "brain", "public_url": saved_public_url, "provider": "ngrok"}) |
| logger.info("Reusing saved brain public URL after restart: %s", saved_public_url) |
| os.environ["KAPO_RESTART_REUSE_PUBLIC_URL"] = "0" |
| return saved_public_url |
|
|
| configured_public_url = _configured_public_url() |
| if configured_public_url and _prefer_configured_public_url(): |
| _remember_public_url(configured_public_url) |
| _report_brain_url(configured_public_url) |
| FIREBASE.set_document("brains", configured_public_url, {"url": configured_public_url, "status": "healthy", "source": "configured_public_url"}) |
| FIREBASE.set_document("tunnels", f"brain_{configured_public_url}", {"kind": "brain", "public_url": configured_public_url, "provider": "configured"}) |
| logger.info("Using configured brain public URL without starting ngrok: %s", configured_public_url) |
| return configured_public_url |
|
|
| if not _ngrok_bootstrap_enabled(): |
| logger.info("Skipping ngrok bootstrap because BRAIN_AUTO_NGROK is disabled") |
| return None |
|
|
| try: |
| authtoken = str(token or os.getenv("NGROK_AUTHTOKEN") or "").strip() |
| if not authtoken: |
| return None |
|
|
| port = int(os.getenv("BRAIN_PORT", "7860")) |
| api_url = _find_live_ngrok_api() |
| if not api_url: |
| api_url = _start_detached_ngrok_agent(authtoken) |
| if not api_url: |
| logger.warning("Ngrok agent did not expose a local API URL") |
| return None |
|
|
| public_url = _existing_ngrok_public_url(api_url, port) |
| if not public_url: |
| public_url = _create_ngrok_tunnel(api_url, port) |
| if not public_url: |
| return None |
|
|
| match = re.search(r"https://[A-Za-z0-9.-]+", public_url) |
| if match: |
| public_url = match.group(0) |
| _remember_ngrok_api_url(api_url) |
| _remember_public_url(public_url) |
| _report_brain_url(public_url) |
| FIREBASE.set_document("brains", public_url, {"url": public_url, "status": "healthy", "source": "ngrok_bootstrap"}) |
| FIREBASE.set_document("tunnels", f"brain_{public_url}", {"kind": "brain", "public_url": public_url, "provider": "ngrok", "api_url": api_url}) |
| return public_url |
| except Exception: |
| logger.exception("Ngrok startup failed") |
| return None |
|
|
|
|
| def _report_known_public_url() -> str | None: |
| public_url = _load_saved_public_url() |
| if not public_url: |
| return None |
| _remember_public_url(public_url) |
| _report_brain_url(public_url) |
| FIREBASE.set_document("brains", public_url, {"url": public_url, "status": "healthy", "source": "saved_public_url"}) |
| logger.info("Reported known brain public URL without starting ngrok: %s", public_url) |
| return public_url |
|
|
|
|
| def _bootstrap_executor_handshake(start_tunnel: bool = False) -> None: |
| executor_url = os.getenv("EXECUTOR_URL", "").strip() |
| if not executor_url: |
| if start_tunnel: |
| public_url = start_ngrok(os.getenv("NGROK_AUTHTOKEN") or None) |
| if public_url: |
| logger.info("Brain public URL started locally without executor handshake: %s", public_url) |
| else: |
| logger.info("Brain started without publishing a public URL") |
| return |
| logger.info("Skipping executor handshake: EXECUTOR_URL not configured") |
| return |
|
|
| settings = _pull_executor_settings() |
| _apply_executor_settings(settings) |
|
|
| public_url = None |
| if start_tunnel: |
| public_url = start_ngrok(os.getenv("NGROK_AUTHTOKEN") or None) |
| if not public_url: |
| public_url = _report_known_public_url() |
| else: |
| public_url = _report_known_public_url() |
| if public_url: |
| logger.info("Brain public URL reported to executor: %s", public_url) |
| else: |
| logger.info("Brain started without publishing a public URL") |
|
|
|
|
| @app.on_event("startup") |
| async def startup_event(): |
| global RUNTIME_STATE_THREAD_STARTED |
| try: |
| _bootstrap_shared_state() |
| except Exception: |
| logger.exception("Shared-state bootstrap failed") |
| try: |
| _prepare_runtime_environment() |
| except Exception: |
| logger.exception("Runtime environment bootstrap failed") |
| try: |
| _run_startup_self_update() |
| except Exception: |
| logger.exception("Startup self-update bootstrap failed") |
| internal_restart = _internal_restart_in_progress() |
| fast_restart = internal_restart and _fast_restart_enabled() |
| if not fast_restart: |
| try: |
| settings = _pull_executor_settings() |
| _apply_executor_settings(settings) |
| except Exception: |
| logger.exception("Executor settings bootstrap failed") |
| try: |
| _apply_firebase_runtime_settings() |
| except Exception: |
| logger.exception("Firebase runtime bootstrap failed") |
| else: |
| logger.info("Fast internal restart enabled; skipping executor/Firebase startup bootstrap") |
| if not fast_restart: |
| _load_default_model() |
| try: |
| if not fast_restart: |
| _load_embed_model() |
| except Exception: |
| logger.exception("Embedding model startup failed") |
| try: |
| start_tunnel = _auto_publish_public_url_on_startup() and not internal_restart |
| _bootstrap_executor_handshake(start_tunnel=start_tunnel) |
| except Exception: |
| logger.exception("Executor handshake startup failed") |
| finally: |
| os.environ["KAPO_INTERNAL_RESTART"] = "0" |
| _persist_runtime_state_snapshot(reason="startup") |
| if not RUNTIME_STATE_THREAD_STARTED: |
| RUNTIME_STATE_THREAD_STARTED = True |
| threading.Thread(target=_runtime_state_pulse, daemon=True).start() |
|
|
|
|
| class ModelLoadRequest(BaseModel): |
| repo_id: str |
| filename: str |
| hf_token: str | None = None |
|
|
|
|
| class ConnectionInit(BaseModel): |
| executor_url: str |
| ngrok_token: str | None = None |
|
|
|
|
| class PublishUrlRequest(BaseModel): |
| ngrok_token: str | None = None |
| public_url: str | None = None |
| start_tunnel: bool = True |
|
|
|
|
| class RestartRequest(BaseModel): |
| delay_sec: float = 1.0 |
|
|
|
|
| class FileWriteRequest(BaseModel): |
| path: str |
| content: str = "" |
| overwrite: bool = True |
|
|
|
|
| class FileDeleteRequest(BaseModel): |
| path: str |
| recursive: bool = False |
|
|
|
|
| class FileMkdirRequest(BaseModel): |
| path: str |
|
|
|
|
| class ChatRequest(BaseModel): |
| request_id: str |
| user_input: str |
| context: dict[str, Any] = {} |
| history: list[dict[str, str]] = [] |
| auto_execute: bool = True |
|
|
|
|
| def _contains_arabic(text: str) -> bool: |
| return bool(re.search(r"[\u0600-\u06FF]", text or "")) |
|
|
|
|
| def _detect_language(text: str) -> str: |
| return "ar" if _contains_arabic(text) else "en" |
|
|
|
|
| def _is_task_request(text: str) -> bool: |
| lower = (text or "").strip().lower() |
| task_words = [ |
| "build", "fix", "debug", "create project", "generate project", "implement", "refactor", |
| "run", "execute", "install", "modify", "edit", "update", "write code", "make app", |
| "Ø§ÙØ´Ø¦", "Ø£ÙØ´Ø¦", "اعÙ
Ù", "ÙÙØ°", "شغÙ", "Ø§ØµÙØ", "Ø£ØµÙØ", "عدÙÙ", "عدÙ", "ابÙÙ", "ÙÙÙÙ Ù
Ø´Ø±ÙØ¹", |
| ] |
| return any(word in lower for word in task_words) |
|
|
|
|
| def _is_research_request(text: str) -> bool: |
| lower = (text or "").strip().lower() |
| research_words = [ |
| "search", "research", "look up", "find out", "web", "browse", |
| "Ø§Ø¨ØØ«", "Ø§Ø¨ØØ« عÙ", "Ø¯ÙØ±", "ÙØªØ´", "Ù
عÙÙÙ
Ø© عÙ", "Ù
عÙÙÙ
ات عÙ", |
| ] |
| return any(word in lower for word in research_words) |
|
|
|
|
| def _is_knowledge_request(text: str, context: dict[str, Any] | None = None) -> bool: |
| context = context or {} |
| if bool(context.get("use_executor_knowledge")): |
| return True |
| lower = (text or "").strip().lower() |
| knowledge_words = [ |
| "remember", "memory", "knowledge", "docs", "documentation", "project structure", "architecture", |
| "ØªØ°ÙØ±", "Ø§ÙØ°Ø§Ùرة", "اÙÙ
Ø¹Ø±ÙØ©", "اÙÙØ«Ø§Ø¦Ù", "Ø§ÙØ¯ÙÙÙ", "بÙÙØ© اÙÙ
Ø´Ø±ÙØ¹", "ÙÙÙ٠اÙÙ
Ø´Ø±ÙØ¹", "Ù
عÙ
Ø§Ø±ÙØ©", |
| ] |
| return any(word in lower for word in knowledge_words) |
|
|
|
|
| def _prune_history(history: list[dict[str, str]], keep_last: int = 6) -> list[dict[str, str]]: |
| if len(history) <= keep_last: |
| return history |
| return history[-keep_last:] |
|
|
|
|
| def _retrieve_knowledge(query: str, top_k: int = 4) -> list[dict[str, Any]]: |
| executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") |
| expanded_query = _expand_project_query(query) |
| if _executor_roundtrip_allowed("BRAIN_REMOTE_KNOWLEDGE_ENABLED", default=True): |
| try: |
| response = requests.get( |
| f"{executor_url}/rag/search", |
| params={"query": expanded_query, "top_k": top_k}, |
| headers=_brain_headers(), |
| timeout=( |
| _executor_connect_timeout(), |
| _executor_read_timeout("BRAIN_REMOTE_KNOWLEDGE_TIMEOUT_SEC", 6.0), |
| ), |
| ) |
| if response.status_code == 200: |
| payload = response.json() |
| results = payload.get("results", []) |
| if isinstance(results, list): |
| return results |
| except requests.exceptions.ReadTimeout: |
| logger.info("Executor knowledge retrieval timed out; continuing without remote knowledge") |
| except Exception: |
| logger.warning("Executor knowledge retrieval failed", exc_info=True) |
| if _remote_brain_only() or not _feature_enabled("BRAIN_LOCAL_RAG_FALLBACK_ENABLED", default=False): |
| return [] |
| try: |
| from rag.retriever import retrieve |
|
|
| return retrieve(expanded_query, top_k=top_k) |
| except Exception: |
| logger.warning("Knowledge retrieval failed", exc_info=True) |
| return [] |
|
|
|
|
| def _search_web(query: str) -> list[dict[str, Any]]: |
| executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") |
| if not _executor_roundtrip_allowed("BRAIN_REMOTE_WEB_SEARCH_ENABLED", default=True): |
| return [] |
| try: |
| response = requests.post( |
| f"{executor_url}/tools/search", |
| json={"query": query, "num_results": 5}, |
| headers=_brain_headers(), |
| timeout=( |
| _executor_connect_timeout(), |
| _executor_read_timeout("BRAIN_REMOTE_WEB_SEARCH_TIMEOUT_SEC", 8.0), |
| ), |
| ) |
| if response.status_code == 200: |
| payload = response.json() |
| results = payload.get("results", []) |
| return results if isinstance(results, list) else [] |
| except Exception: |
| logger.warning("Web search failed", exc_info=True) |
| return [] |
|
|
|
|
| def _format_context_blocks(knowledge: list[dict[str, Any]], web_results: list[dict[str, Any]]) -> str: |
| blocks: list[str] = [] |
| if knowledge: |
| lines = [] |
| for item in knowledge[:4]: |
| source = item.get("source", "knowledge") |
| content = item.get("content", "") or item.get("text", "") |
| lines.append(f"- [{source}] {content[:500]}") |
| blocks.append("Knowledge:\n" + "\n".join(lines)) |
| if web_results: |
| lines = [] |
| for item in web_results[:5]: |
| lines.append(f"- {item.get('title', '')}: {item.get('snippet', '')}") |
| blocks.append("Web:\n" + "\n".join(lines)) |
| return "\n\n".join(blocks).strip() |
|
|
|
|
| def _project_context_tags(text: str) -> list[str]: |
| source = str(text or "") |
| lowered = source.lower() |
| tags: list[str] = [] |
| tag_rules = [ |
| ("brain_runtime", ["Ø§ÙØ¹ÙÙ", "brain", "model", "Ù
ÙØ¯ÙÙ", "اÙÙÙ
ÙØ°Ø¬"]), |
| ("executor_runtime", ["اÙÙÙÙÙ Ø§ÙØªÙÙÙØ°Ù", "executor", "agent"]), |
| ("tunnel_runtime", ["اÙÙÙÙ", "tunnel", "ngrok", "cloudflare"]), |
| ("url_routing", ["Ø§ÙØ±Ø§Ø¨Ø·", "url", "endpoint", "ÙÙÙÙ"]), |
| ("restart_sync", ["restart", "Ø±ÙØ³ØªØ§Ø±Øª", "إعادة تشغÙÙ", "اعادة تشغÙÙ", "sync", "Ù
زاÙ
ÙØ©"]), |
| ("knowledge_memory", ["memory", "Ø°Ø§ÙØ±Ø©", "Ù
Ø¹Ø±ÙØ©", "knowledge", "rag", "Ø·Ø¨ÙØ§Øª", "embedding", "embeddings"]), |
| ("kaggle_runtime", ["kaggle", "ÙØ§Ø¬Ù"]), |
| ] |
| for tag, markers in tag_rules: |
| if any(marker in source or marker in lowered for marker in markers): |
| tags.append(tag) |
| return tags |
|
|
|
|
| def _expand_project_query(query: str) -> str: |
| tags = _project_context_tags(query) |
| additions: list[str] = [] |
| if "tunnel_runtime" in tags: |
| additions.append("ngrok tunnel public url reverse proxy runtime restart") |
| if "restart_sync" in tags: |
| additions.append("system restart sync uvicorn process reuse public url") |
| if "executor_runtime" in tags: |
| additions.append("executor share settings control plane local machine") |
| if "knowledge_memory" in tags: |
| additions.append("knowledge layers rag embeddings preferences profile") |
| if "brain_runtime" in tags: |
| additions.append("brain server kaggle model startup runtime") |
| return query if not additions else f"{query}\nContext expansion: {' | '.join(additions)}" |
|
|
|
|
| def _fetch_style_profile() -> dict[str, Any]: |
| firebase_profile = FIREBASE.get_document("profiles", "style", ttl_sec=30.0) |
| if firebase_profile: |
| return firebase_profile |
| executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") |
| if not executor_url or not _executor_roundtrip_allowed("BRAIN_REMOTE_STYLE_PROFILE_ENABLED", default=False): |
| return {} |
| try: |
| response = requests.get( |
| f"{executor_url}/preferences/profile", |
| headers=_brain_headers(), |
| timeout=( |
| _executor_connect_timeout(), |
| _executor_read_timeout("BRAIN_REMOTE_STYLE_PROFILE_TIMEOUT_SEC", 2.5), |
| ), |
| ) |
| if response.status_code != 200: |
| return {} |
| payload = response.json().get("profile", {}) |
| return payload if isinstance(payload, dict) else {} |
| except requests.exceptions.ReadTimeout: |
| logger.info("Style profile load timed out; continuing without remote style profile") |
| return {} |
| except Exception: |
| logger.warning("Failed to load style profile", exc_info=True) |
| return {} |
|
|
|
|
| def _render_style_profile_context(profile: dict[str, Any]) -> str: |
| if not profile: |
| return "" |
| preferences = profile.get("preferences", []) or [] |
| examples = profile.get("examples", []) or [] |
| lexical_signals = profile.get("lexical_signals", []) or [] |
| style_markers = profile.get("style_markers", {}) or {} |
| persona_summary = str(profile.get("persona_summary") or "").strip() |
| response_contract = str(profile.get("response_contract") or "").strip() |
| lines: list[str] = [] |
| if persona_summary: |
| lines.append(f"User persona summary: {persona_summary}") |
| if response_contract: |
| lines.append(f"Response contract: {response_contract}") |
| if preferences: |
| lines.append("User Style Preferences:") |
| for item in preferences[:10]: |
| lines.append(f"- {item}") |
| if lexical_signals: |
| lines.append("User Lexical Signals:") |
| for item in lexical_signals[:10]: |
| lines.append(f"- {item}") |
| if style_markers: |
| lines.append("Style Markers:") |
| for key, value in sorted(style_markers.items()): |
| lines.append(f"- {key}: {value}") |
| if examples: |
| lines.append("Recent User Style Examples:") |
| for sample in examples[-3:]: |
| user_text = str(sample.get("user_input") or "").strip() |
| assistant_text = str(sample.get("assistant_reply") or "").strip() |
| if user_text: |
| lines.append(f"- User: {user_text}") |
| if assistant_text: |
| lines.append(f" Assistant: {assistant_text}") |
| return "\n".join(lines).strip() |
|
|
|
|
| def _project_domain_context(user_input: str, context: dict[str, Any] | None = None) -> str: |
| tags = _project_context_tags(user_input) |
| if not tags: |
| return "" |
| lines = [ |
| "Project Domain Glossary:", |
| "- In this project, terms like Ø§ÙØ¹ÙÙ, اÙÙÙÙÙ Ø§ÙØªÙÙÙØ°Ù, اÙÙÙÙ, Ø§ÙØ±Ø§Ø¨Ø·, Ø§ÙØ±Ùستارت, اÙÙ
زاÙ
ÙØ©, Ø§ÙØ°Ø§Ùرة, ÙØ§ÙØ·Ø¨ÙØ§Øª usually refer to software runtime and operations concepts.", |
| "- Treat اÙÙÙÙ as ngrok or cloudflare tunnel unless the user explicitly asks for a literal/civil meaning.", |
| "- Treat Ø§ÙØ±Ø§Ø¨Ø· as public URL, endpoint, or routing target when the surrounding context mentions deployment, Kaggle, restart, or sync.", |
| "- Treat Ø§ÙØ¹ÙÙ as the remote Brain service/model runtime, and اÙÙÙÙÙ Ø§ÙØªÙÙÙØ°Ù as the local executor/control plane on the user's device.", |
| ] |
| if "restart_sync" in tags: |
| lines.append("- Restart means process/service restart; preserving the same public URL matters more than creating a fresh tunnel.") |
| if "knowledge_memory" in tags: |
| lines.append("- Knowledge, embeddings, layers, and memory refer to the RAG and memory system inside this project.") |
| if "kaggle_runtime" in tags: |
| lines.append("- Kaggle here is the remote runtime hosting the Brain service.") |
| role_name = str((context or {}).get("role_name") or "").strip() |
| if role_name: |
| lines.append(f"- Current assigned role: {role_name}.") |
| return "\n".join(lines) |
|
|
|
|
| def _firebase_runtime_context(role_name: str, language: str) -> str: |
| snapshot = _firebase_runtime_snapshot() |
| lines: list[str] = [] |
| roles = snapshot.get("roles") or [] |
| if roles: |
| enabled_roles = [ |
| str(item.get("name") or item.get("id") or "").strip() |
| for item in roles |
| if str(item.get("enabled", True)).strip().lower() not in {"0", "false", "no", "off"} |
| ] |
| enabled_roles = [item for item in enabled_roles if item] |
| if enabled_roles: |
| lines.append("Live roles from Firestore: " + ", ".join(enabled_roles[:12])) |
| models = snapshot.get("models") or [] |
| if models: |
| preferred = [ |
| item for item in models |
| if str(item.get("enabled", True)).strip().lower() not in {"0", "false", "no", "off"} |
| ] |
| if preferred: |
| labels = [str(item.get("label") or item.get("id") or "").strip() for item in preferred[:5]] |
| labels = [item for item in labels if item] |
| if labels: |
| lines.append("Live model profiles: " + ", ".join(labels)) |
| platforms = snapshot.get("platforms") or [] |
| if platforms: |
| names = [str(item.get("name") or "").strip() for item in platforms[:6] if str(item.get("name") or "").strip()] |
| if names: |
| lines.append("Live platforms: " + ", ".join(names)) |
| prompt_body = _firebase_prompt_body(role_name or "chat", language) or _firebase_prompt_body(role_name or "chat", "en") |
| if prompt_body: |
| lines.append(f"Live Firestore prompt for role '{role_name or 'chat'}': {prompt_body}") |
| return "\n".join(lines).strip() |
|
|
|
|
| def _append_runtime_instructions(context_block: str, context: dict[str, Any]) -> str: |
| instructions = str((context or {}).get("system_instructions") or "").strip() |
| role_name = str((context or {}).get("role_name") or "").strip() |
| user_input = str((context or {}).get("user_input") or "").strip() |
| language = _detect_language(user_input) |
| style_profile = _render_style_profile_context(_fetch_style_profile()) |
| domain_context = _project_domain_context(user_input, context) |
| firebase_context = _firebase_runtime_context(role_name, language) |
| if not instructions and not role_name and not style_profile and not domain_context and not firebase_context: |
| return context_block |
| extra = [] |
| if role_name: |
| extra.append(f"Assigned role: {role_name}") |
| if instructions: |
| extra.append(instructions) |
| if firebase_context: |
| extra.append(firebase_context) |
| if style_profile: |
| extra.append(style_profile) |
| if domain_context: |
| extra.append(domain_context) |
| extra_block = "Runtime Instructions:\n" + "\n".join(extra) |
| return (context_block + "\n\n" + extra_block).strip() if context_block else extra_block |
|
|
|
|
| def _extract_exact_reply_instruction(user_input: str) -> str: |
| text = (user_input or "").strip() |
| patterns = [ |
| r'(?is)reply\s+with\s+exactly\s+[:"]?\s*(.+?)\s*[".]?$', |
| r'(?is)respond\s+with\s+exactly\s+[:"]?\s*(.+?)\s*[".]?$', |
| r"(?is)ÙÙ\s+ÙÙØ·[:ï¼]?\s*(.+?)\s*$", |
| r"(?is)Ø§ÙØªØ¨\s+ÙÙØ·[:ï¼]?\s*(.+?)\s*$", |
| ] |
| for pattern in patterns: |
| match = re.search(pattern, text) |
| if match: |
| return match.group(1).strip().strip("\"'`") |
| return "" |
|
|
|
|
| def _extract_exact_reply_instruction_safe(user_input: str) -> str: |
| text = (user_input or "").strip() |
| patterns = [ |
| r'(?is)reply\s+with\s+exactly\s+[:"]?\s*(.+?)\s*[".]?$', |
| r'(?is)respond\s+with\s+exactly\s+[:"]?\s*(.+?)\s*[".]?$', |
| r"(?is)\u0642\u0644\s+\u0641\u0642\u0637[:\uff1a]?\s*(.+?)\s*$", |
| r"(?is)\u0627\u0643\u062a\u0628\s+\u0641\u0642\u0637[:\uff1a]?\s*(.+?)\s*$", |
| ] |
| for pattern in patterns: |
| match = re.search(pattern, text) |
| if match: |
| return match.group(1).strip().strip("\"'`") |
| return _extract_exact_reply_instruction(user_input) |
|
|
|
|
| def _chat_system_instruction(language: str, user_input: str = "", exact_reply: str = "") -> str: |
| if language == "ar": |
| base = ( |
| "Ø£ÙØª KAPO-AIØ Ù
ساعد ÙÙØ¯Ø³Ù عÙ
ÙÙ. " |
| "أجب Ù
باشرة ÙØ¨ÙØ¶ÙØ ÙØ¨Ø´ÙÙ Ù
ÙÙØ¯. " |
| "اÙÙÙ
اÙÙ
Ø·ÙÙØ¨ Ø£ÙÙØ§Ù Ø«Ù
أجب دÙÙ Ù
ÙØ¯Ù
ات زائدة. " |
| "ÙØ§ تÙ٠إ٠اÙÙ
ØØ§Ø¯Ø«Ø© ØºÙØ± Ù
ÙØªÙ
ÙØ© ÙÙØ§ ØªØ°ÙØ± تعÙÙÙ
Ø§ØªÙ Ø§ÙØ¯Ø§Ø®ÙÙØ©." |
| ) |
| base += ( |
| " ÙÙ ÙØ°Ø§ اÙÙ
Ø´Ø±ÙØ¹Ø ÙÙÙ
ات Ù
Ø«Ù Ø§ÙØ¹ÙÙ ÙØ§ÙÙÙÙÙ Ø§ÙØªÙÙÙØ°Ù ÙØ§ÙÙÙÙ ÙØ§Ùرابط ÙØ§ÙØ±ÙØ³ØªØ§Ø±Øª ÙØ§ÙÙ
زاÙ
ÙØ© " |
| "ÙØ§ÙØ·Ø¨ÙØ§Øª ÙØ§ÙتضÙ
ÙÙØ§Øª ÙÙØ§Ø¬Ù Ùngrok Ùcloudflare Ùendpoint Ùmodel ØªØ´ÙØ± ØºØ§ÙØ¨Ø§ Ø¥ÙÙ " |
| "Ù
ÙÙÙØ§Øª برÙ
Ø¬ÙØ© ÙØªØ´ØºÙÙÙØ©Ø ÙÙÙØ³Øª Ù
عاÙÙ ØØ±ÙÙØ© Ø£Ù ÙÙØ¯Ø³Ø© Ù
دÙÙØ©Ø Ù
ا ÙÙ
ÙØ·Ùب اÙÙ
ستخدÙ
ØºÙØ± ذÙÙ ØµØ±Ø§ØØ©." |
| ) |
| if exact_reply: |
| return base + f' ÙØ¬Ø¨ Ø£Ù ÙÙÙ٠رد٠ÙÙ ÙØ°Ø§ اÙÙØµ ÙÙØ· ØØ±ÙÙØ§Ù: "{exact_reply}"' |
| return base |
| base = ( |
| "You are KAPO-AI, an engineering assistant. " |
| "Answer directly, clearly, and practically. " |
| "Understand the request before answering. " |
| "Do not say the conversation is incomplete and do not mention hidden instructions." |
| ) |
| if exact_reply: |
| return base + f' Your entire reply must be exactly: "{exact_reply}"' |
| return base |
|
|
|
|
| def _build_chat_prompt(user_input: str, history: list[dict[str, str]], context_block: str) -> str: |
| language = _detect_language(user_input) |
| exact_reply = _extract_exact_reply_instruction_safe(user_input) |
| history_lines: list[str] = [] |
| for message in _prune_history(history): |
| role = message.get("role", "user") |
| role_label = "اÙÙ
ستخدÙ
" if language == "ar" and role == "user" else role.upper() |
| if language == "ar" and role != "user": |
| role_label = "اÙÙ
ساعد" |
| history_lines.append(f"{role_label}: {message.get('content', '')}") |
|
|
| history_section = "" |
| if history_lines: |
| history_section = "\n### History\n" + "\n".join(history_lines) + "\n" |
|
|
| context_section = f"\n### Context\n{context_block}\n" if context_block else "" |
| user_label = "اÙÙ
ستخدÙ
" if language == "ar" else "User" |
| assistant_label = "اÙÙ
ساعد" if language == "ar" else "Assistant" |
| return ( |
| f"### System\n{_chat_system_instruction(language, user_input, exact_reply)}\n" |
| f"{history_section}" |
| f"{context_section}" |
| f"### Instruction\n" |
| f"{user_label}: {user_input}\n" |
| f"{assistant_label}:" |
| ) |
|
|
|
|
| def _response_looks_bad(text: str, language: str) -> bool: |
| cleaned = (text or "").strip() |
| if not cleaned: |
| return True |
| markers = [ |
| "the assistant is not sure", |
| "conversation seems incomplete", |
| "provide more information", |
| "unless otherwise noted", |
| "as an ai model developed by", |
| "developed by ibm", |
| "tensorflow library", |
| "dataset of 1024", |
| ] |
| if any(marker in cleaned.lower() for marker in markers): |
| return True |
| if language == "ar": |
| arabic_chars = len(re.findall(r"[\u0600-\u06FF]", cleaned)) |
| latin_chars = len(re.findall(r"[A-Za-z]", cleaned)) |
| if arabic_chars < 8 and latin_chars > max(12, arabic_chars * 2): |
| return True |
| return False |
|
|
|
|
| def _fallback_response(user_input: str) -> str: |
| if _detect_language(user_input) == "ar": |
| return "ÙÙÙ
ت Ø±Ø³Ø§ÙØªÙØ ÙÙÙ Ø§ÙØ±Ø¯ اÙÙ
ÙÙØ¯ ÙÙ
ÙÙÙ ØµØ§ÙØØ§Ù ÙÙØ§Ø³ØªØ®Ø¯Ø§Ù
. أعد ØµÙØ§ØºØ© Ø§ÙØ·Ùب بشÙÙ Ø£ÙØ«Ø± ØªØØ¯ÙداÙ." |
| return "I understood your message, but the generated reply was not usable. Please rephrase the request more specifically." |
|
|
|
|
| def _project_specific_fast_reply(user_input: str) -> str: |
| text = (user_input or "").strip() |
| lower = text.lower() |
| if any(token in text for token in ("اÙ٠اÙÙÙ Ø§ØªØµÙØ", "Ø¥Ù٠اÙÙÙ Ø§ØªØµÙØ", "Ù٠اÙ٠اÙÙÙ Ø§ØªØµÙØ", "Ù٠إÙ٠اÙÙÙ Ø§ØªØµÙØ")) and any( |
| token in text or token in lower for token in ("اÙÙÙÙ", "Ø§ÙØ±Ø§Ø¨Ø·", "Ø§ÙØ±Ùستارت", "restart") |
| ): |
| return ( |
| "Ø§ÙØ°Ù Ø§ØªØµÙØ ÙÙ Ø£Ù Ø§ÙØ±Ùستارت Ø§ÙØ¯Ø§Ø®Ù٠بÙÙ ÙØ¹Ùد تشغÙ٠خدÙ
Ø© Ø§ÙØ¹ÙÙ ÙÙØ³Ùا Ù
Ù ØºÙØ± Ù
ا ÙÙØ³Ø± اÙÙÙÙ Ø£Ù ÙØºÙÙØ± Ø§ÙØ±Ø§Ø¨Ø· Ø§ÙØ¹Ø§Ù
. " |
| "ÙÙÙ Ø¸ÙØ± تÙÙÙ ÙØµÙر Ø£Ø«ÙØ§Ø¡ Ø§ÙØ¥ÙÙØ§Ø¹ ÙÙØ°Ø§ ÙÙÙÙ Ù
Ù Ø±Ø¬ÙØ¹ Ø§ÙØ®Ø¯Ù
Ø©Ø ÙØ§ Ù
Ù Ø¥ÙØ´Ø§Ø¡ ÙÙÙ Ø¬Ø¯ÙØ¯." |
| ) |
| if "اÙÙÙÙ" in text and any(token in text for token in ("Ø¥ØµÙØ§Ø", "Ø§ØµÙØ§Ø", "Ø§ÙØ°Ù تÙ
", "Ø§ØªØµÙØ", "تÙ
Ø¥ØµÙØ§Ø", "تÙ
Ø§ØµÙØ§Ø")): |
| return ( |
| "تÙ
ÙØµÙ Ø¯ÙØ±Ø© ØÙاة اÙÙÙÙ Ø¹Ù Ø¯ÙØ±Ø© ØÙاة خدÙ
Ø© Ø§ÙØ¹ÙÙØ ÙØ£ØµØ¨Ø ngrok ÙØ¹Ù
Ù ÙØ¹Ø§Ù
Ù Ù
ستÙ٠ع٠عÙ
ÙÙØ© Uvicorn. " |
| "ÙØ¨Ø§ÙتاÙÙ ÙØØªÙØ¸ /system/restart بÙÙØ³ Ø§ÙØ±Ø§Ø¨Ø· Ø§ÙØ¹Ø§Ù
Ø¨Ø¯Ù Ø¥ÙØ´Ø§Ø¡ رابط Ø¬Ø¯ÙØ¯Ø ÙÙØ¯ ÙØ¸Ùر ERR_NGROK_8012 Ù
Ø¤ÙØªÙا ÙÙØ· Ø£Ø«ÙØ§Ø¡ Ø§ÙØ¥ÙÙØ§Ø¹." |
| ) |
| if "ÙÙØ³ Ø§ÙØ±Ø§Ø¨Ø·" in text and ("Ø±ÙØ³ØªØ§Ø±Øª" in text or "restart" in lower): |
| return ( |
| "اÙÙØ¯Ù ÙÙØ§ Ø£Ù ØªØ¹ÙØ¯ Ø§ÙØ®Ø¯Ù
Ø© Ø§ÙØ¥ÙÙØ§Ø¹ داخÙÙÙØ§ Ù
ع Ø§ÙØ¥Ø¨Ùاء عÙÙ ÙÙØ³ اÙÙ public URL. " |
| "ÙØ°ÙÙ ÙØ¨ÙÙ tunnel ØÙÙØ§Ø بÙÙÙ
ا ØªØ¹ÙØ¯ خدÙ
Ø© localhost:7860 ÙÙØ¹Ù
٠بعد Ø«ÙØ§ÙÙ ÙÙÙÙØ© عÙÙ ÙÙØ³ Ø§ÙØ±Ø§Ø¨Ø·." |
| ) |
| return "" |
|
|
|
|
| def _generate_response(user_input: str, history: list[dict[str, str]], context_block: str) -> str: |
| language = _detect_language(user_input) |
| exact_reply = _extract_exact_reply_instruction_safe(user_input) |
| if exact_reply: |
| return exact_reply |
| fast_reply = _project_specific_fast_reply(user_input) |
| if fast_reply: |
| return fast_reply |
| if MODEL is None: |
| if language == "ar": |
| return "Ø§ÙØ®Ø¯Ù
Ø© تعÙ
Ù ÙÙ٠تÙÙÙØ¯ Ø§ÙØ±Ø¯ Ø§ÙØØ± ØºÙØ± Ù
ØªØ§Ø Ø§ÙØ¢Ù ÙØ£Ù اÙÙÙ
ÙØ°Ø¬ ØºÙØ± Ù
ØÙ
Ù." |
| return "The Brain is online, but natural chat generation is unavailable because the model is not loaded." |
|
|
| prompt = _build_chat_prompt(user_input, history, context_block) |
|
|
| try: |
| max_tokens = 80 if language == "ar" else 96 |
| output = MODEL( |
| prompt, |
| max_tokens=max_tokens, |
| temperature=0.1, |
| top_p=0.85, |
| stop=["\nUser:", "\nUSER:", "\nاÙÙ
ستخدÙ
:", "\n###", "<|EOT|>"], |
| ) |
| text = output["choices"][0]["text"].strip() |
| if _response_looks_bad(text, language): |
| return _fallback_response(user_input) |
| return text or ("تÙ
Ø§Ø³ØªÙØ§Ù
Ø±Ø³Ø§ÙØªÙ." if language == "ar" else "I received your message.") |
| except Exception: |
| logger.exception("Model generation failed") |
| if language == "ar": |
| return "ÙÙÙ
ت Ø·ÙØ¨ÙØ ÙÙÙ ÙØ´Ù تÙÙÙØ¯ Ø§ÙØ±Ø¯ اÙÙØµÙ." |
| return "I understood your request, but text generation failed." |
|
|
|
|
| def _store_chat_trace(request_id: str, payload: dict[str, Any]) -> None: |
| executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") |
| if not _executor_roundtrip_allowed("BRAIN_REMOTE_TRACE_STORE_ENABLED", default=True): |
| return |
| try: |
| requests.post( |
| f"{executor_url}/memory/store", |
| json={"request_id": request_id, "payload": payload}, |
| headers=_brain_headers(), |
| timeout=( |
| _executor_connect_timeout(), |
| _executor_read_timeout("BRAIN_REMOTE_TRACE_STORE_TIMEOUT_SEC", 2.5), |
| ), |
| ) |
| except requests.exceptions.ReadTimeout: |
| logger.info("Chat trace store timed out; continuing") |
| except Exception: |
| logger.warning("Failed to store chat trace on executor", exc_info=True) |
|
|
|
|
| def _ingest_chat_knowledge(request_id: str, user_input: str, reply: str) -> None: |
| if len(reply or "") < 180: |
| return |
| executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") |
| if not _executor_roundtrip_allowed("BRAIN_REMOTE_AUTO_INGEST_ENABLED", default=False): |
| return |
| payload = { |
| "request_id": request_id, |
| "payload": { |
| "source": "auto_chat", |
| "content": f"User: {user_input}\nAssistant: {reply}", |
| }, |
| } |
| try: |
| requests.post( |
| f"{executor_url}/rag/ingest", |
| json=payload, |
| headers=_brain_headers(), |
| timeout=( |
| _executor_connect_timeout(), |
| _executor_read_timeout("BRAIN_REMOTE_AUTO_INGEST_TIMEOUT_SEC", 3.0), |
| ), |
| ) |
| except requests.exceptions.ReadTimeout: |
| logger.info("Auto-ingest chat knowledge timed out; continuing") |
| except Exception: |
| logger.warning("Failed to auto-ingest chat knowledge", exc_info=True) |
|
|
|
|
| def _learn_user_style(request_id: str, user_input: str, reply: str, context: dict[str, Any]) -> None: |
| executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/") |
| if not executor_url or not _executor_roundtrip_allowed("BRAIN_REMOTE_STYLE_PROFILE_ENABLED", default=False): |
| return |
| try: |
| requests.post( |
| f"{executor_url}/preferences/learn", |
| json={ |
| "request_id": request_id, |
| "user_input": user_input, |
| "assistant_reply": reply, |
| "context": context or {}, |
| }, |
| headers=_brain_headers(), |
| timeout=( |
| _executor_connect_timeout(), |
| _executor_read_timeout("BRAIN_REMOTE_TRACE_STORE_TIMEOUT_SEC", 2.5), |
| ), |
| ) |
| except requests.exceptions.ReadTimeout: |
| logger.info("Style learning timed out; continuing") |
| except Exception: |
| logger.warning("Failed to learn user style on executor", exc_info=True) |
|
|
|
|
| def _dispatch_background(task, *args) -> None: |
| try: |
| threading.Thread(target=task, args=args, daemon=True).start() |
| except Exception: |
| logger.warning("Background task dispatch failed", exc_info=True) |
|
|
|
|
| def _restart_process(delay_sec: float = 1.0) -> None: |
| if _is_hf_space_runtime(): |
| logger.info("Skipping in-process restart on Hugging Face Space runtime") |
| return |
| def _run() -> None: |
| time.sleep(max(0.2, float(delay_sec))) |
| target_root = _sync_target_root() |
| os.chdir(target_root) |
| os.environ["KAPO_INTERNAL_RESTART"] = "1" |
| os.environ.setdefault("KAPO_FAST_INTERNAL_RESTART", "1") |
| if _reuse_public_url_on_restart(): |
| current_public_url = _load_saved_public_url() |
| if current_public_url: |
| _remember_public_url(current_public_url) |
| os.environ["KAPO_RESTART_REUSE_PUBLIC_URL"] = "1" |
| port = str(os.getenv("BRAIN_PORT", "7860") or "7860") |
| app_module = str(os.getenv("KAPO_UVICORN_APP") or "").strip() |
| if not app_module: |
| if (Path(target_root) / "brain_server" / "api" / "main.py").exists(): |
| app_module = "brain_server.api.main:app" |
| elif (Path(target_root) / "api" / "main.py").exists(): |
| app_module = "api.main:app" |
| else: |
| app_module = "brain_server.api.main:app" |
| os.execv( |
| sys.executable, |
| [ |
| sys.executable, |
| "-m", |
| "uvicorn", |
| app_module, |
| "--host", |
| "0.0.0.0", |
| "--port", |
| port, |
| ], |
| ) |
|
|
| threading.Thread(target=_run, daemon=True).start() |
|
|
|
|
| @app.get("/") |
| async def root(): |
| return {"status": "ok", "service": "brain_server", "docs": "/docs", "health": "/health"} |
|
|
|
|
| @app.get("/runtime/firebase") |
| async def runtime_firebase_snapshot(): |
| try: |
| return {"status": "ok", "firebase": _json_safe(_firebase_runtime_snapshot())} |
| except Exception as exc: |
| logger.warning("Failed to build firebase runtime snapshot", exc_info=True) |
| return {"status": "degraded", "firebase": {"platforms": [], "models": [], "prompts": [], "roles": []}, "detail": str(exc)} |
|
|
|
|
| @app.get("/runtime/errors") |
| async def runtime_errors(limit: int = 50, level: str = "WARNING"): |
| normalized = str(level or "WARNING").strip().upper() |
| allowed = {"DEBUG": 10, "INFO": 20, "WARNING": 30, "ERROR": 40, "CRITICAL": 50} |
| threshold = allowed.get(normalized, 30) |
| items = [item for item in list(RUNTIME_LOG_BUFFER) if allowed.get(str(item.get("level") or "").upper(), 0) >= threshold] |
| return {"status": "ok", "count": len(items[-limit:]), "items": items[-limit:]} |
|
|
|
|
| @app.get("/runtime/modernization") |
| async def runtime_modernization(): |
| try: |
| return {"status": "ok", "modernization": _runtime_modernization_snapshot()} |
| except Exception as exc: |
| logger.warning("Failed to build runtime modernization snapshot", exc_info=True) |
| return {"status": "degraded", "detail": str(exc), "modernization": {}} |
|
|
|
|
| @app.get("/model/status") |
| async def model_status(): |
| return { |
| "loaded": MODEL is not None, |
| "error": MODEL_ERROR, |
| "repo_id": MODEL_META.get("repo_id"), |
| "filename": MODEL_META.get("filename"), |
| } |
|
|
|
|
| @app.post("/model/load") |
| async def model_load(req: ModelLoadRequest): |
| ensure_model_loaded(req.repo_id, req.filename, hf_token=req.hf_token) |
| return await model_status() |
|
|
|
|
| @app.post("/model/hotswap") |
| async def model_hotswap(req: ModelLoadRequest): |
| global MODEL |
| if MODEL is not None: |
| del MODEL |
| gc.collect() |
| MODEL = None |
| ensure_model_loaded(req.repo_id, req.filename, hf_token=req.hf_token) |
| return await model_status() |
|
|
|
|
| @app.post("/embeddings") |
| async def embeddings(payload: dict[str, Any]): |
| if EMBED_MODEL is None: |
| _load_embed_model() |
| texts = payload.get("texts") or [] |
| if not texts: |
| return {"embeddings": []} |
| return {"embeddings": EMBED_MODEL.encode(texts).tolist()} |
|
|
|
|
| @app.post("/chat") |
| async def chat(req: ChatRequest): |
| try: |
| exact_reply = _extract_exact_reply_instruction_safe(req.user_input) |
| if exact_reply: |
| runtime_context = {**(req.context or {}), "user_input": req.user_input} |
| trace_payload = { |
| "mode": "chat", |
| "user_input": req.user_input, |
| "reply": exact_reply, |
| "plan": None, |
| "execution": None, |
| "knowledge": [], |
| "web_results": [], |
| "context": runtime_context, |
| } |
| memory = MemoryAgent() |
| memory.write_short_term(req.request_id, trace_payload) |
| return { |
| "status": "ok", |
| "mode": "chat", |
| "reply": exact_reply, |
| "plan": None, |
| "rationale": None, |
| "execution": None, |
| "knowledge": [], |
| "web_results": [], |
| "timestamp": time.time(), |
| } |
|
|
| planner = PlannerAgent() |
| reasoning = ReasoningAgent() |
| memory = MemoryAgent() |
|
|
| mode = "task" if _is_task_request(req.user_input) else "chat" |
| runtime_context = {**(req.context or {}), "user_input": req.user_input} |
| knowledge = _retrieve_knowledge(req.user_input, top_k=4) if _is_knowledge_request(req.user_input, req.context) else [] |
| web_results = _search_web(req.user_input) if _is_research_request(req.user_input) else [] |
| context_block = _append_runtime_instructions(_format_context_blocks(knowledge, web_results), runtime_context) |
| response_text = "" |
| plan_steps = None |
| execution = None |
| rationale = None |
|
|
| if mode == "task": |
| plan_steps = planner.run(req.user_input, req.context) |
| rationale = reasoning.run(req.user_input, plan_steps) |
| response_text = ( |
| "سأتعاÙ
Ù Ù
ع ÙØ°Ù Ø§ÙØ±Ø³Ø§ÙØ© ÙØ·Ùب تÙÙÙØ°Ø ÙÙÙ
ت Ø¨Ø¨ÙØ§Ø¡ خطة Ù
Ø¨Ø¯Ø¦ÙØ© ÙØ³Ø£Ø¨Ø¯Ø£ Ø§ÙØªÙÙÙØ° تÙÙØ§Ø¦ÙاÙ." |
| if _contains_arabic(req.user_input) |
| else "I treated this as an execution request, built a plan, and started automatic execution." |
| ) |
| if req.auto_execute: |
| from api.routes_execute import ExecuteRequest, execute as execute_route |
|
|
| execution = await execute_route( |
| ExecuteRequest( |
| request_id=req.request_id, |
| plan={"steps": plan_steps}, |
| executor_url=os.getenv("EXECUTOR_URL", "").strip() or None, |
| ) |
| ) |
| if execution.get("report", {}).get("success"): |
| response_text += ( |
| "\n\nتÙ
Ø§ÙØªÙÙÙØ° Ø¨ÙØ¬Ø§Ø Ù
Ø¨Ø¯Ø¦ÙØ§Ù." |
| if _contains_arabic(req.user_input) |
| else "\n\nExecution completed successfully." |
| ) |
| else: |
| response_text += ( |
| "\n\nتÙ
ت اÙÙ
ØØ§ÙÙØ© ÙÙÙ ØªÙØ¬Ø¯ Ù
خرجات ØªØØªØ§Ø¬ Ù
راجعة." |
| if _contains_arabic(req.user_input) |
| else "\n\nExecution ran, but the result still needs review." |
| ) |
| else: |
| response_text = _generate_response(req.user_input, req.history, context_block) |
|
|
| trace_payload = { |
| "mode": mode, |
| "user_input": req.user_input, |
| "reply": response_text, |
| "plan": plan_steps, |
| "execution": execution, |
| "knowledge": knowledge, |
| "web_results": web_results, |
| "context": runtime_context, |
| } |
| memory.write_short_term(req.request_id, trace_payload) |
| _dispatch_background(_store_chat_trace, req.request_id, trace_payload) |
| _dispatch_background(_ingest_chat_knowledge, req.request_id, req.user_input, response_text) |
| _dispatch_background(_learn_user_style, req.request_id, req.user_input, response_text, runtime_context) |
|
|
| return { |
| "status": "ok", |
| "mode": mode, |
| "reply": response_text, |
| "plan": plan_steps, |
| "rationale": rationale, |
| "execution": execution, |
| "knowledge": knowledge, |
| "web_results": web_results, |
| "timestamp": time.time(), |
| } |
| except Exception as exc: |
| logger.exception("Chat failed") |
| return { |
| "status": "error", |
| "mode": "chat", |
| "reply": "ØØ¯Ø« خطأ Ø£Ø«ÙØ§Ø¡ Ù
Ø¹Ø§ÙØ¬Ø© Ø§ÙØ±Ø³Ø§ÙØ©." if _contains_arabic(req.user_input) else "Chat processing failed.", |
| "detail": str(exc), |
| "timestamp": time.time(), |
| } |
|
|
|
|
| @app.post("/init-connection") |
| async def init_connection(payload: ConnectionInit): |
| _remember_executor_url(payload.executor_url) |
| FIREBASE.set_document("runtime", "executor", {"executor_url": payload.executor_url}) |
| public_url = _report_known_public_url() |
| if not public_url: |
| public_url = start_ngrok(payload.ngrok_token) |
| return {"status": "connected", "brain_public_url": public_url} |
|
|
|
|
| @app.post("/system/publish-url") |
| async def system_publish_url(req: PublishUrlRequest | None = None): |
| payload = req or PublishUrlRequest() |
| explicit_public_url = str(payload.public_url or "").strip().rstrip("/") |
| if explicit_public_url: |
| _remember_public_url(explicit_public_url) |
| _report_brain_url(explicit_public_url) |
| FIREBASE.set_document("brains", explicit_public_url, {"url": explicit_public_url, "status": "healthy", "source": "explicit_publish"}) |
| return {"status": "published", "brain_public_url": explicit_public_url, "mode": "explicit"} |
|
|
| if not payload.start_tunnel: |
| public_url = _report_known_public_url() |
| if public_url: |
| return {"status": "published", "brain_public_url": public_url, "mode": "saved"} |
| return {"status": "skipped", "brain_public_url": None, "mode": "none"} |
|
|
| public_url = start_ngrok(payload.ngrok_token) |
| if public_url: |
| return {"status": "published", "brain_public_url": public_url, "mode": "ngrok"} |
| public_url = _report_known_public_url() |
| return {"status": "published" if public_url else "error", "brain_public_url": public_url, "mode": "saved" if public_url else "none"} |
|
|
|
|
| @app.get("/system/files") |
| async def system_files(path: str = "", include_content: bool = False): |
| try: |
| target = _resolve_sync_path(path) |
| if not target.exists(): |
| return {"status": "error", "detail": "Path not found", "path": path} |
| if target.is_file(): |
| payload = {"status": "ok", "entry": _describe_sync_entry(target)} |
| if include_content: |
| payload["content"] = target.read_text(encoding="utf-8", errors="ignore") |
| return payload |
| items = sorted((_describe_sync_entry(item) for item in target.iterdir()), key=lambda item: (not item["is_dir"], item["name"].lower())) |
| return {"status": "ok", "root": str(_sync_root_path()), "path": path, "items": items} |
| except Exception as exc: |
| logger.exception("File listing failed") |
| return {"status": "error", "detail": str(exc), "path": path} |
|
|
|
|
| @app.post("/system/files/write") |
| async def system_files_write(payload: FileWriteRequest): |
| try: |
| target = _resolve_sync_path(payload.path) |
| target.parent.mkdir(parents=True, exist_ok=True) |
| if target.exists() and target.is_dir(): |
| return {"status": "error", "detail": "Target path is a directory"} |
| if target.exists() and not payload.overwrite: |
| return {"status": "error", "detail": "File already exists"} |
| target.write_text(payload.content or "", encoding="utf-8") |
| return {"status": "saved", "entry": _describe_sync_entry(target)} |
| except Exception as exc: |
| logger.exception("File write failed") |
| return {"status": "error", "detail": str(exc), "path": payload.path} |
|
|
|
|
| @app.post("/system/files/mkdir") |
| async def system_files_mkdir(payload: FileMkdirRequest): |
| try: |
| target = _resolve_sync_path(payload.path) |
| target.mkdir(parents=True, exist_ok=True) |
| return {"status": "created", "entry": _describe_sync_entry(target)} |
| except Exception as exc: |
| logger.exception("Directory creation failed") |
| return {"status": "error", "detail": str(exc), "path": payload.path} |
|
|
|
|
| @app.delete("/system/files") |
| async def system_files_delete(payload: FileDeleteRequest): |
| try: |
| target = _resolve_sync_path(payload.path) |
| if not target.exists(): |
| return {"status": "deleted", "path": payload.path, "existed": False} |
| if target.is_dir(): |
| if payload.recursive: |
| shutil.rmtree(target) |
| else: |
| target.rmdir() |
| else: |
| target.unlink() |
| return {"status": "deleted", "path": payload.path, "existed": True} |
| except Exception as exc: |
| logger.exception("Delete failed") |
| return {"status": "error", "detail": str(exc), "path": payload.path} |
|
|
|
|
| if HAS_MULTIPART: |
| @app.post("/system/sync") |
| async def sync_codebase(file: UploadFile = File(...), restart: bool = False): |
| temp_zip = os.path.join(tempfile.gettempdir(), "kapo_update.zip") |
| try: |
| with open(temp_zip, "wb") as buffer: |
| shutil.copyfileobj(file.file, buffer) |
| with zipfile.ZipFile(temp_zip, "r") as zip_ref: |
| zip_ref.extractall(_sync_target_root()) |
| if restart: |
| _restart_process() |
| return {"status": "synced", "target_root": _sync_target_root(), "restart_scheduled": restart} |
| except Exception as exc: |
| logger.exception("Code sync failed") |
| return {"status": "error", "detail": str(exc)} |
|
|
| @app.post("/system/archive/upload") |
| async def system_archive_upload(file: UploadFile = File(...), target_path: str = "", restart: bool = False): |
| temp_zip = os.path.join(tempfile.gettempdir(), "kapo_archive_upload.zip") |
| try: |
| extract_root = _resolve_sync_path(target_path) |
| extract_root.mkdir(parents=True, exist_ok=True) |
| with open(temp_zip, "wb") as buffer: |
| shutil.copyfileobj(file.file, buffer) |
| with zipfile.ZipFile(temp_zip, "r") as zip_ref: |
| zip_ref.extractall(extract_root) |
| if restart: |
| _restart_process() |
| return { |
| "status": "extracted", |
| "target_root": str(extract_root), |
| "restart_scheduled": restart, |
| } |
| except Exception as exc: |
| logger.exception("Archive upload failed") |
| return {"status": "error", "detail": str(exc)} |
| else: |
| @app.post("/system/sync") |
| async def sync_codebase_unavailable(): |
| return {"status": "error", "detail": "python-multipart is required for /system/sync"} |
|
|
| @app.post("/system/archive/upload") |
| async def system_archive_upload_unavailable(): |
| return {"status": "error", "detail": "python-multipart is required for /system/archive/upload"} |
|
|
|
|
| @app.post("/system/restart") |
| async def system_restart(req: RestartRequest | None = None): |
| delay_sec = req.delay_sec if req else 1.0 |
| if _is_hf_space_runtime(): |
| return { |
| "status": "skipped", |
| "reason": "restart_disabled_on_hf_space", |
| "delay_sec": delay_sec, |
| "target_root": _sync_target_root(), |
| } |
| _restart_process(delay_sec=delay_sec) |
| return { |
| "status": "restarting", |
| "delay_sec": delay_sec, |
| "target_root": _sync_target_root(), |
| } |
|
|
|
|
| def _health_payload(check_executor: bool = False, executor_url: str | None = None) -> dict[str, Any]: |
| cfg = load_config() |
| base_exec_url = (executor_url or os.getenv("EXECUTOR_URL", "")).strip().rstrip("/") |
| exec_ok = False |
| exec_checked = False |
| exec_error = None |
| health_timeout = int(cfg.get("REQUEST_TIMEOUT_SEC", 20) or 20) |
| health_retries = max(1, int(cfg.get("REQUEST_RETRIES", 2) or 2)) |
|
|
| if base_exec_url and check_executor: |
| exec_checked = True |
| for _ in range(health_retries): |
| try: |
| response = requests.get( |
| f"{base_exec_url}/health", |
| headers=get_executor_headers(cfg), |
| timeout=health_timeout, |
| ) |
| exec_ok = response.status_code == 200 |
| if exec_ok: |
| exec_error = None |
| break |
| exec_error = response.text |
| except Exception as exc: |
| exec_error = str(exc) |
|
|
| faiss_path = cfg.get("FAISS_INDEX_PATH") |
| return { |
| "status": "ok", |
| "model_loaded": MODEL is not None, |
| "model_error": MODEL_ERROR, |
| "embedding_loaded": EMBED_MODEL is not None, |
| "faiss_ok": bool(faiss_path and os.path.exists(faiss_path)), |
| "executor_checked": exec_checked, |
| "executor_ok": exec_ok, |
| "executor_error": exec_error, |
| "remote_brain_only": str(os.getenv("REMOTE_BRAIN_ONLY", "")).strip().lower() in {"1", "true", "yes", "on"}, |
| "runtime_root": os.getenv("KAPO_RUNTIME_ROOT", ""), |
| "sync_root": _sync_target_root(), |
| "timestamp": time.time(), |
| } |
|
|
|
|
| def _decode_b64_text(value: str) -> str: |
| raw = str(value or "").strip() |
| if not raw: |
| return "" |
| padding = "=" * (-len(raw) % 4) |
| try: |
| return base64.urlsafe_b64decode((raw + padding).encode("utf-8")).decode("utf-8").strip() |
| except Exception: |
| return "" |
|
|
|
|
| def _runtime_modernization_snapshot() -> dict[str, Any]: |
| bootstrap = DRIVE_STATE.ensure_bootstrap_loaded(force=False) or {} |
| patch_manifest_url = str( |
| os.getenv("KAPO_PATCH_MANIFEST_URL", "") |
| or bootstrap.get("patch_manifest_url") |
| or "" |
| ).strip() |
| remote_env_url = _decode_b64_text(os.getenv("KAPO_REMOTE_ENV_URL_B64", "")) or str(os.getenv("KAPO_REMOTE_ENV_URL", "") or "").strip() |
| public_url = str(os.getenv("BRAIN_PUBLIC_URL", "") or LAST_BRAIN_URL_REPORT.get("url") or _load_saved_public_url() or "").strip() |
| applied_version = _load_applied_runtime_version() |
| control_plane_url = str(os.getenv("KAPO_CONTROL_PLANE_URL", "") or "").strip() |
| queue_name = str(os.getenv("KAPO_CLOUDFLARE_QUEUE_NAME", "") or "").strip() |
| return { |
| "shared_state_backend": _shared_state_backend(), |
| "firebase_enabled": str(os.getenv("FIREBASE_ENABLED", "0")).strip().lower() in {"1", "true", "yes", "on"}, |
| "drive": { |
| "folder_id": str(os.getenv("GOOGLE_DRIVE_SHARED_STATE_FOLDER_ID", "") or os.getenv("GOOGLE_DRIVE_STORAGE_FOLDER_ID", "") or "").strip(), |
| "prefix": str(os.getenv("GOOGLE_DRIVE_SHARED_STATE_PREFIX", "") or os.getenv("GOOGLE_DRIVE_STORAGE_PREFIX", "") or "").strip(), |
| "bootstrap_loaded": bool(bootstrap), |
| }, |
| "bootstrap": { |
| "configured": bool(str(os.getenv("KAPO_BOOTSTRAP_URL", "") or os.getenv("GOOGLE_DRIVE_BOOTSTRAP_URL", "") or "").strip()), |
| "url": str(os.getenv("KAPO_BOOTSTRAP_URL", "") or os.getenv("GOOGLE_DRIVE_BOOTSTRAP_URL", "") or "").strip(), |
| "loaded": bool(bootstrap), |
| "version": str(bootstrap.get("version") or "").strip(), |
| "updated_at": bootstrap.get("updated_at"), |
| }, |
| "patch": { |
| "configured": bool(patch_manifest_url), |
| "manifest_url": patch_manifest_url, |
| "startup_self_update_enabled": _startup_self_update_enabled(), |
| "applied_version": applied_version, |
| }, |
| "remote_env": { |
| "configured": bool(remote_env_url), |
| "loaded": str(os.getenv("KAPO_REMOTE_ENV_LOADED", "0")).strip().lower() in {"1", "true", "yes", "on"}, |
| "url": remote_env_url, |
| }, |
| "control_plane": { |
| "configured": bool(control_plane_url), |
| "url": control_plane_url, |
| "queue_name": queue_name, |
| "queue_configured": bool(queue_name), |
| }, |
| "transport": { |
| "provider": str(os.getenv("BRAIN_TUNNEL_PROVIDER", "ngrok") or "ngrok").strip().lower(), |
| "auto_ngrok": _ngrok_bootstrap_enabled(), |
| "reuse_public_url_on_restart": _reuse_public_url_on_restart(), |
| "public_url": public_url, |
| "executor_url": str(os.getenv("EXECUTOR_URL", "") or "").strip(), |
| }, |
| "runtime": { |
| "root": str(_sync_target_root()), |
| "model_profile_id": str(os.getenv("MODEL_PROFILE_ID", "") or "").strip(), |
| "model_loaded": MODEL is not None, |
| "embed_loaded": EMBED_MODEL is not None, |
| }, |
| } |
|
|
|
|
| def _persist_runtime_state_snapshot(reason: str = "periodic") -> dict[str, Any]: |
| payload = _health_payload(check_executor=False) |
| public_url = os.getenv("BRAIN_PUBLIC_URL") or _load_saved_public_url() or "" |
| state_id = _brain_state_id() |
| FIREBASE.set_document( |
| "brains", |
| public_url or state_id, |
| { |
| "url": public_url, |
| "health": payload, |
| "status": "healthy" if payload["model_loaded"] else "degraded", |
| "model_repo": os.getenv("MODEL_REPO", DEFAULT_MODEL_REPO), |
| "model_file": os.getenv("MODEL_FILE", DEFAULT_MODEL_FILE), |
| "model_profile_id": os.getenv("MODEL_PROFILE_ID", DEFAULT_MODEL_PROFILE_ID), |
| "roles": os.getenv("BRAIN_ROLES", "supervisor,chat,coding,planner,arabic,fallback"), |
| "languages": os.getenv("BRAIN_LANGUAGES", "ar,en"), |
| "updated_at": time.time(), |
| "source": reason, |
| }, |
| min_interval_sec=5.0, |
| ) |
| FIREBASE.set_document( |
| "brain_runtime", |
| state_id, |
| { |
| "brain_url": public_url, |
| "reason": reason, |
| "health": payload, |
| "recent_logs": list(RUNTIME_LOG_BUFFER)[-25:], |
| "updated_at": time.time(), |
| }, |
| min_interval_sec=5.0, |
| ) |
| return payload |
|
|
|
|
| def _runtime_state_pulse() -> None: |
| interval = max(20.0, float(os.getenv("BRAIN_STATE_SYNC_INTERVAL_SEC", "60") or 60)) |
| while True: |
| try: |
| _persist_runtime_state_snapshot(reason="background_pulse") |
| except Exception: |
| logger.warning("Background runtime state sync failed", exc_info=True) |
| time.sleep(interval) |
|
|
|
|
| @app.get("/health") |
| async def health(executor_url: str | None = None, check_executor: bool = False): |
| payload = _health_payload(check_executor=check_executor, executor_url=executor_url) |
| _persist_runtime_state_snapshot(reason="health_endpoint") |
| return payload |
|
|
|
|
| |
| def _kapo_hf_transformers_enabled() -> bool: |
| return str(os.getenv('KAPO_HF_TRANSFORMERS_RUNTIME', '0')).strip().lower() in {'1', 'true', 'yes', 'on'} |
|
|
| def ensure_model_loaded(repo_id: str, filename: str, hf_token: str | None = None) -> None: |
| global MODEL, MODEL_ERROR, MODEL_META |
| repo_id = (repo_id or '').strip() |
| filename = (filename or '').strip() |
| if not repo_id: |
| MODEL = None |
| MODEL_ERROR = 'model repo missing' |
| return |
| if _kapo_hf_transformers_enabled(): |
| try: |
| from transformers import AutoModelForCausalLM, AutoTokenizer |
| tokenizer = AutoTokenizer.from_pretrained(repo_id, token=hf_token, trust_remote_code=True) |
| model = AutoModelForCausalLM.from_pretrained(repo_id, token=hf_token, trust_remote_code=True, device_map='cpu') |
| if hasattr(model, 'eval'): |
| model.eval() |
| MODEL = {'kind': 'transformers', 'model': model, 'tokenizer': tokenizer} |
| MODEL_ERROR = None |
| MODEL_META = {'repo_id': repo_id, 'filename': filename, 'path': None} |
| logger.info('Loaded transformers model %s', repo_id) |
| return |
| except Exception as exc: |
| MODEL = None |
| MODEL_ERROR = f'transformers model load failed: {exc}' |
| logger.exception('Transformers model load failed') |
| return |
| if not filename: |
| MODEL = None |
| MODEL_ERROR = 'model file missing' |
| return |
| try: |
| model_path = _download_model(repo_id, filename, hf_token=hf_token) |
| except Exception as exc: |
| MODEL = None |
| MODEL_ERROR = f'model download failed: {exc}' |
| logger.exception('Model download failed') |
| return |
| try: |
| from llama_cpp import Llama |
| MODEL = Llama(model_path=model_path, n_ctx=4096) |
| MODEL_ERROR = None |
| MODEL_META = {'repo_id': repo_id, 'filename': filename, 'path': model_path} |
| logger.info('Loaded model %s/%s', repo_id, filename) |
| except Exception as exc: |
| MODEL = None |
| MODEL_ERROR = f'model load failed: {exc}' |
| logger.exception('Model load failed') |
|
|
| def _generate_response(user_input: str, history: list[dict[str, str]], context_block: str) -> str: |
| language = _detect_language(user_input) |
| exact_reply = _extract_exact_reply_instruction_safe(user_input) |
| if exact_reply: |
| return exact_reply |
| fast_reply = _project_specific_fast_reply(user_input) |
| if fast_reply: |
| return fast_reply |
| if MODEL is None: |
| try: |
| _load_default_model() |
| except Exception: |
| logger.exception('Lazy model load failed') |
| if MODEL is None: |
| if language == 'ar': |
| return 'الخدمة تعمل لكن توليد الرد الحر غير متاح الآن لأن النموذج غير محمل.' |
| return 'The Brain is online, but natural chat generation is unavailable because the model is not loaded.' |
| prompt = _build_chat_prompt(user_input, history, context_block) |
| try: |
| max_tokens = 80 if language == 'ar' else 96 |
| if isinstance(MODEL, dict) and MODEL.get('kind') == 'transformers': |
| tokenizer = MODEL['tokenizer'] |
| model = MODEL['model'] |
| inputs = tokenizer(prompt, return_tensors='pt', truncation=True, max_length=2048) |
| if hasattr(model, 'device'): |
| inputs = {k: v.to(model.device) if hasattr(v, 'to') else v for k, v in inputs.items()} |
| output_ids = model.generate(**inputs, max_new_tokens=max_tokens, do_sample=False, pad_token_id=tokenizer.eos_token_id) |
| generated = output_ids[0][inputs['input_ids'].shape[1]:] |
| text = tokenizer.decode(generated, skip_special_tokens=True).strip() |
| else: |
| output = MODEL(prompt, max_tokens=max_tokens, temperature=0.1, top_p=0.85, stop=['\nUser:', '\nUSER:', '\n###', '<|EOT|>']) |
| text = output['choices'][0]['text'].strip() |
| if _response_looks_bad(text, language): |
| return _fallback_response(user_input) |
| return text or ('تم استلام رسالتك.' if language == 'ar' else 'I received your message.') |
| except Exception: |
| logger.exception('Model generation failed') |
| if language == 'ar': |
| return 'فهمت طلبك، لكن حدث خطأ أثناء توليد الرد النصي.' |
| return 'I understood your request, but text generation failed.' |
|
|