MrA7A3's picture
KAPO self-heal fix: conversational HF fallback
1fcae39 verified
"""FastAPI entrypoint for the Brain Server."""
import base64
import gc
import logging
import os
import re
import shutil
import subprocess
import sys
import tempfile
import threading
import time
import zipfile
from collections import deque
from pathlib import Path
from typing import Any
import requests
from fastapi import FastAPI, File, UploadFile
from pydantic import BaseModel
from agents.memory_agent import MemoryAgent
from agents.planner_agent import PlannerAgent
from agents.reasoning_agent import ReasoningAgent
try:
from api import deps as deps_module
from api.deps import get_executor_headers, get_logger, load_config
from api.firebase_store import FirebaseStore
from api.routes_analyze import router as analyze_router
from api.routes_execute import router as execute_router
from api.routes_plan import router as plan_router
from shared.google_drive_state import GoogleDriveStateClient
except ImportError:
from . import deps as deps_module
from .deps import get_executor_headers, get_logger, load_config
from .firebase_store import FirebaseStore
from .routes_analyze import router as analyze_router
from .routes_execute import router as execute_router
from .routes_plan import router as plan_router
from shared.google_drive_state import GoogleDriveStateClient
logger = get_logger("kapo.brain.main")
def _configure_windows_utf8() -> None:
if os.name != "nt":
return
os.environ.setdefault("PYTHONUTF8", "1")
os.environ.setdefault("PYTHONIOENCODING", "utf-8")
os.environ.setdefault("PYTHONLEGACYWINDOWSSTDIO", "utf-8")
try:
import ctypes
kernel32 = ctypes.windll.kernel32
kernel32.SetConsoleCP(65001)
kernel32.SetConsoleOutputCP(65001)
except Exception:
pass
_configure_windows_utf8()
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
if hasattr(sys.stderr, "reconfigure"):
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
app = FastAPI(title="KAPO-AI Brain Server", version="1.0.0")
app.include_router(plan_router)
app.include_router(execute_router)
app.include_router(analyze_router)
MODEL = None
MODEL_ERROR = None
MODEL_META = {"repo_id": None, "filename": None, "path": None}
EMBED_MODEL = None
FIREBASE = FirebaseStore("brain", logger_name="kapo.brain.firebase")
DRIVE_STATE = GoogleDriveStateClient(logger)
FIREBASE_RUNTIME_CACHE: dict[str, tuple[float, Any]] = {}
RUNTIME_LOG_BUFFER: deque[dict[str, Any]] = deque(maxlen=200)
LAST_BRAIN_URL_REPORT: dict[str, Any] = {"url": "", "ts": 0.0}
RUNTIME_STATE_THREAD_STARTED = False
PUBLIC_URL_RETRY_STARTED = False
DEFAULT_MODEL_REPO = "QuantFactory/aya-expanse-8b-GGUF"
DEFAULT_MODEL_FILE = "aya-expanse-8b.Q4_K_M.gguf"
DEFAULT_MODEL_PROFILE_ID = "supervisor-ar-en-default"
HAS_MULTIPART = True
try:
import multipart # noqa: F401
except Exception:
HAS_MULTIPART = False
class RuntimeLogHandler(logging.Handler):
def emit(self, record) -> None:
try:
RUNTIME_LOG_BUFFER.append(
{
"ts": time.time(),
"level": record.levelname,
"name": record.name,
"message": record.getMessage(),
}
)
except Exception:
pass
_runtime_log_handler = RuntimeLogHandler(level=logging.WARNING)
if not any(isinstance(handler, RuntimeLogHandler) for handler in logger.handlers):
logger.addHandler(_runtime_log_handler)
def _feature_enabled(name: str, default: bool = False) -> bool:
value = os.getenv(name)
if value is None or str(value).strip() == "":
return default
return str(value).strip().lower() in {"1", "true", "yes", "on"}
def _hf_transformers_runtime_enabled() -> bool:
return _feature_enabled("KAPO_HF_TRANSFORMERS_RUNTIME", default=False)
def _embeddings_enabled() -> bool:
return not _feature_enabled("KAPO_DISABLE_EMBEDDINGS", default=False)
def _remote_brain_only() -> bool:
return _feature_enabled("REMOTE_BRAIN_ONLY", default=False)
def _ngrok_bootstrap_enabled() -> bool:
return _feature_enabled("BRAIN_AUTO_NGROK", default=True)
def _configured_public_url() -> str:
return str(os.getenv("BRAIN_PUBLIC_URL", "")).strip().rstrip("/")
def _prefer_configured_public_url() -> bool:
provider = str(os.getenv("BRAIN_PROVIDER", "") or os.getenv("BRAIN_TEMPLATE", "")).strip().lower()
if "huggingface" in provider or "hf-space" in provider:
return True
return _feature_enabled("BRAIN_FORCE_CONFIGURED_PUBLIC_URL", default=False)
def _reuse_public_url_on_restart() -> bool:
return _feature_enabled("BRAIN_REUSE_PUBLIC_URL_ON_RESTART", default=True)
def _auto_publish_public_url_on_startup() -> bool:
return _feature_enabled("BRAIN_AUTO_PUBLISH_URL_ON_STARTUP", default=True)
def _internal_restart_in_progress() -> bool:
return _feature_enabled("KAPO_INTERNAL_RESTART", default=False)
def _fast_restart_enabled() -> bool:
return _feature_enabled("KAPO_FAST_INTERNAL_RESTART", default=True)
def _executor_connect_timeout() -> float:
return float(os.getenv("EXECUTOR_CONNECT_TIMEOUT_SEC", "3.0") or 3.0)
def _executor_read_timeout(name: str, default: float) -> float:
return float(os.getenv(name, str(default)) or default)
def _executor_roundtrip_allowed(feature_name: str, default: bool = True) -> bool:
executor_url = os.getenv("EXECUTOR_URL", "").strip()
if not executor_url:
return False
kaggle_defaults = {
"BRAIN_REMOTE_TRACE_STORE_ENABLED": False,
"BRAIN_REMOTE_AUTO_INGEST_ENABLED": False,
"BRAIN_REMOTE_STYLE_PROFILE_ENABLED": False,
}
effective_default = kaggle_defaults.get(feature_name, default) if _is_kaggle_runtime() else default
return _feature_enabled(feature_name, default=effective_default)
def _remote_runtime_reads_enabled() -> bool:
return _feature_enabled("KAPO_REMOTE_STATE_READS", default=False)
def _shared_state_backend() -> str:
return str(os.getenv("KAPO_SHARED_STATE_BACKEND", "")).strip().lower()
def _drive_bootstrap_configured() -> bool:
return bool(
str(os.getenv("GOOGLE_DRIVE_BOOTSTRAP_URL", "") or os.getenv("KAPO_BOOTSTRAP_URL", "") or "").strip()
)
def _bootstrap_shared_state() -> None:
if _drive_bootstrap_configured() or _shared_state_backend() in {"google_drive", "drive", "gdrive"}:
payload = DRIVE_STATE.ensure_bootstrap_loaded(force=False) or {}
fallback_mappings = {
"executor_url": "EXECUTOR_URL",
"control_plane_url": "KAPO_CONTROL_PLANE_URL",
"cloudflare_control_plane_url": "KAPO_CONTROL_PLANE_URL",
"cloudflare_queue_name": "KAPO_CLOUDFLARE_QUEUE_NAME",
}
for key, env_name in fallback_mappings.items():
value = payload.get(key)
if value not in (None, ""):
os.environ[env_name] = str(value)
def _startup_self_update_enabled() -> bool:
return _feature_enabled("KAPO_STARTUP_SELF_UPDATE", default=True)
def _should_report_brain_url(public_url: str) -> bool:
normalized = str(public_url or "").strip().rstrip("/")
if not normalized:
return False
interval_sec = max(30.0, float(os.getenv("BRAIN_REPORT_MIN_INTERVAL_SEC", "600") or 600))
previous_url = str(LAST_BRAIN_URL_REPORT.get("url") or "").strip()
previous_ts = float(LAST_BRAIN_URL_REPORT.get("ts") or 0.0)
now = time.time()
if normalized != previous_url or (now - previous_ts) >= interval_sec:
LAST_BRAIN_URL_REPORT["url"] = normalized
LAST_BRAIN_URL_REPORT["ts"] = now
return True
return False
def _download_model(repo_id: str, filename: str, hf_token: str | None = None) -> str:
from huggingface_hub import hf_hub_download
configured_cache = str(os.getenv("MODEL_CACHE_DIR", "") or "").strip()
if configured_cache:
cache_dir = configured_cache
elif _is_kaggle_runtime():
cache_dir = str((_project_root() / "models_cache").resolve())
else:
cache_dir = os.path.join(tempfile.gettempdir(), "kapo_models")
os.makedirs(cache_dir, exist_ok=True)
return hf_hub_download(repo_id=repo_id, filename=filename, cache_dir=cache_dir, token=hf_token)
def ensure_model_loaded(repo_id: str, filename: str, hf_token: str | None = None) -> None:
global MODEL, MODEL_ERROR, MODEL_META
repo_id = (repo_id or "").strip()
filename = (filename or "").strip()
if not repo_id or not filename:
MODEL = None
MODEL_ERROR = "model repo/file missing"
return
try:
model_path = _download_model(repo_id, filename, hf_token=hf_token)
except Exception as exc:
MODEL = None
MODEL_ERROR = f"model download failed: {exc}"
logger.exception("Model download failed")
return
try:
from llama_cpp import Llama
MODEL = Llama(model_path=model_path, n_ctx=4096)
MODEL_ERROR = None
MODEL_META = {"repo_id": repo_id, "filename": filename, "path": model_path}
logger.info("Loaded model %s/%s", repo_id, filename)
except Exception as exc:
MODEL = None
MODEL_ERROR = f"model load failed: {exc}"
logger.exception("Model load failed")
def _load_embed_model() -> None:
global EMBED_MODEL
if not _embeddings_enabled():
logger.info("Embedding model disabled by configuration")
return
if EMBED_MODEL is not None:
return
try:
from sentence_transformers import SentenceTransformer
except ModuleNotFoundError:
logger.info("Skipping embedding model load because sentence_transformers is unavailable")
return
model_name = os.getenv("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
EMBED_MODEL = SentenceTransformer(model_name)
logger.info("Loaded embedding model %s", model_name)
def _load_default_model() -> None:
global MODEL, MODEL_ERROR, MODEL_META
repo_id = str(os.getenv("MODEL_REPO", DEFAULT_MODEL_REPO) or DEFAULT_MODEL_REPO).strip()
filename = str(os.getenv("MODEL_FILE", "") or "").strip()
provider = str(os.getenv("BRAIN_PROVIDER", "") or os.getenv("BRAIN_TEMPLATE", "") or "").strip().lower()
if _hf_transformers_runtime_enabled():
MODEL = None
MODEL_ERROR = None
MODEL_META = {"repo_id": repo_id, "filename": "", "path": None}
logger.info("Skipping local model load; HF transformers runtime is enabled for %s", repo_id)
return
if not filename and ("huggingface" in provider or "hf-space" in provider):
MODEL = None
MODEL_ERROR = None
MODEL_META = {"repo_id": repo_id, "filename": "", "path": None}
logger.info("Skipping local model load; using Hugging Face remote generation fallback for %s", repo_id)
return
ensure_model_loaded(repo_id, filename or DEFAULT_MODEL_FILE, hf_token=os.getenv("HF_TOKEN"))
def _brain_headers() -> dict:
cfg = load_config()
return get_executor_headers(cfg)
def _project_root() -> Path:
return Path(__file__).resolve().parents[1]
def _is_kaggle_runtime() -> bool:
return "/kaggle/" in str(_project_root()).replace("\\", "/") or bool(os.getenv("KAGGLE_KERNEL_RUN_TYPE"))
def _apply_executor_settings(settings: dict[str, Any]) -> None:
for key in (
"NGROK_AUTHTOKEN",
"MODEL_REPO",
"MODEL_FILE",
"MODEL_PROFILE_ID",
"SUPERVISOR_MODEL_PROFILE_ID",
"EMBED_MODEL",
"REQUEST_TIMEOUT_SEC",
"REQUEST_RETRIES",
"CHAT_TIMEOUT_SEC",
"EXECUTOR_BYPASS_HEADER",
"EXECUTOR_BYPASS_VALUE",
"BRAIN_BYPASS_HEADER",
"BRAIN_BYPASS_VALUE",
"REMOTE_BRAIN_ONLY",
"KAGGLE_AUTO_BOOTSTRAP",
"BRAIN_AUTO_NGROK",
"BRAIN_AUTO_PUBLISH_URL_ON_STARTUP",
"BRAIN_PUBLIC_URL",
"BRAIN_REUSE_PUBLIC_URL_ON_RESTART",
"KAGGLE_SYNC_SUBDIR",
"BRAIN_ROLES",
"BRAIN_LANGUAGES",
"BRAIN_REMOTE_KNOWLEDGE_ENABLED",
"BRAIN_REMOTE_WEB_SEARCH_ENABLED",
"BRAIN_REMOTE_TRACE_STORE_ENABLED",
"BRAIN_REMOTE_AUTO_INGEST_ENABLED",
"BRAIN_LOCAL_RAG_FALLBACK_ENABLED",
"EXECUTOR_CONNECT_TIMEOUT_SEC",
"BRAIN_REMOTE_KNOWLEDGE_TIMEOUT_SEC",
"BRAIN_REMOTE_WEB_SEARCH_TIMEOUT_SEC",
"BRAIN_REMOTE_TRACE_STORE_TIMEOUT_SEC",
"BRAIN_REMOTE_AUTO_INGEST_TIMEOUT_SEC",
"FIREBASE_ENABLED",
"FIREBASE_PROJECT_ID",
"FIREBASE_SERVICE_ACCOUNT_PATH",
"FIREBASE_SERVICE_ACCOUNT_JSON",
"FIREBASE_NAMESPACE",
"KAPO_SHARED_STATE_BACKEND",
"GOOGLE_DRIVE_SHARED_STATE_FOLDER_ID",
"GOOGLE_DRIVE_SHARED_STATE_PREFIX",
"GOOGLE_DRIVE_BOOTSTRAP_URL",
"KAPO_BOOTSTRAP_URL",
"GOOGLE_DRIVE_ACCESS_TOKEN",
"GOOGLE_DRIVE_REFRESH_TOKEN",
"GOOGLE_DRIVE_CLIENT_SECRET_JSON",
"GOOGLE_DRIVE_CLIENT_SECRET_JSON_BASE64",
"GOOGLE_DRIVE_CLIENT_SECRET_PATH",
"GOOGLE_DRIVE_TOKEN_EXPIRES_AT",
"KAPO_CONTROL_PLANE_URL",
"KAPO_CLOUDFLARE_QUEUE_NAME",
):
value = settings.get(key)
if value not in (None, ""):
os.environ[key] = str(value)
deps_module.CONFIG_CACHE = None
def _apply_firebase_runtime_settings() -> None:
if not _remote_runtime_reads_enabled():
return
if not FIREBASE.enabled():
return
shared = FIREBASE.get_document("settings", "global")
runtime = FIREBASE.get_document("runtime", "executor")
role_items = FIREBASE.list_documents("roles", limit=64)
merged = {}
merged.update(shared or {})
merged.update(runtime or {})
mappings = {
"executor_public_url": "EXECUTOR_PUBLIC_URL",
"executor_url": "EXECUTOR_URL",
"model_repo": "MODEL_REPO",
"model_file": "MODEL_FILE",
"model_profile_id": "MODEL_PROFILE_ID",
"supervisor_model_profile_id": "SUPERVISOR_MODEL_PROFILE_ID",
"brain_roles": "BRAIN_ROLES",
"brain_languages": "BRAIN_LANGUAGES",
}
current_brain_url = str(merged.get("current_brain_url") or "").strip()
if current_brain_url:
os.environ["KAPO_EXECUTOR_CURRENT_BRAIN_URL"] = current_brain_url
for key, env_name in mappings.items():
value = merged.get(key)
if value not in (None, ""):
os.environ[env_name] = str(value)
if role_items:
enabled_roles = [
str(item.get("name") or item.get("id") or "").strip().lower()
for item in role_items
if str(item.get("enabled", True)).strip().lower() not in {"0", "false", "no", "off"}
]
enabled_roles = [role for role in enabled_roles if role]
if enabled_roles:
os.environ["BRAIN_ROLES"] = ",".join(dict.fromkeys(enabled_roles))
deps_module.CONFIG_CACHE = None
def _firebase_collection_cache_key(name: str) -> str:
return f"collection:{name}"
def _firebase_list_documents_cached(collection: str, ttl_sec: float = 30.0, limit: int = 200) -> list[dict[str, Any]]:
if not _remote_runtime_reads_enabled():
return []
if not FIREBASE.enabled():
return []
key = _firebase_collection_cache_key(collection)
now = time.time()
cached = FIREBASE_RUNTIME_CACHE.get(key)
if cached and (now - cached[0]) < ttl_sec:
return list(cached[1] or [])
items = FIREBASE.list_documents(collection, limit=limit)
FIREBASE_RUNTIME_CACHE[key] = (now, items)
return list(items or [])
def _firebase_role_profiles() -> list[dict[str, Any]]:
items = _firebase_list_documents_cached("roles", ttl_sec=30.0, limit=64)
if items:
return items
roles = [part.strip() for part in str(os.getenv("BRAIN_ROLES", "")).split(",") if part.strip()]
return [{"name": role, "enabled": True} for role in roles]
def _firebase_runtime_snapshot() -> dict[str, Any]:
return {
"platforms": _firebase_list_documents_cached("platforms", ttl_sec=45.0, limit=64),
"models": _firebase_list_documents_cached("models", ttl_sec=45.0, limit=128),
"prompts": _firebase_list_documents_cached("prompts", ttl_sec=20.0, limit=128),
"roles": _firebase_role_profiles(),
}
def _json_safe(value: Any) -> Any:
if isinstance(value, dict):
return {str(key): _json_safe(item) for key, item in value.items()}
if isinstance(value, list):
return [_json_safe(item) for item in value]
if isinstance(value, tuple):
return [_json_safe(item) for item in value]
if isinstance(value, (str, int, float, bool)) or value is None:
return value
return str(value)
def _firebase_prompt_body(role_name: str, language: str = "en") -> str:
role_name = str(role_name or "").strip().lower()
language = str(language or "en").strip().lower()
if not role_name:
return ""
prompts = _firebase_runtime_snapshot().get("prompts", [])
exact = []
fallback = []
for item in prompts:
if str(item.get("role_name") or "").strip().lower() != role_name:
continue
if str(item.get("enabled", True)).strip().lower() in {"0", "false", "no", "off"}:
continue
item_lang = str(item.get("language") or "en").strip().lower()
body = str(item.get("body") or "").strip()
if not body:
continue
if item_lang == language:
exact.append(body)
elif item_lang == "en":
fallback.append(body)
if exact:
return exact[0]
if fallback:
return fallback[0]
return ""
def _prepare_runtime_environment() -> None:
try:
_bootstrap_shared_state()
except Exception:
logger.warning("Shared-state bootstrap preload failed", exc_info=True)
if not _is_kaggle_runtime():
return
source_root = _project_root()
source_text = str(source_root).replace("\\", "/")
runtime_root_env = os.getenv("KAPO_RUNTIME_ROOT", "").strip()
if runtime_root_env:
runtime_root = Path(runtime_root_env).resolve()
elif source_text.startswith("/kaggle/working/"):
runtime_root = source_root.resolve()
else:
runtime_root = Path("/kaggle/working/KAPO-AI-SYSTEM").resolve()
sync_root = runtime_root
auto_bootstrap = str(os.getenv("KAGGLE_AUTO_BOOTSTRAP", "1")).strip().lower() in {"1", "true", "yes", "on"}
if auto_bootstrap and source_text.startswith("/kaggle/input/") and source_root != runtime_root:
if runtime_root.exists():
shutil.rmtree(runtime_root, ignore_errors=True)
shutil.copytree(
source_root,
runtime_root,
ignore=shutil.ignore_patterns("__pycache__", "*.pyc", ".git", ".venv"),
)
if str(runtime_root) not in sys.path:
sys.path.insert(0, str(runtime_root))
data_dir = runtime_root / "data" / "local" / "brain_runtime"
data_dir.mkdir(parents=True, exist_ok=True)
os.environ["KAPO_RUNTIME_ROOT"] = str(runtime_root)
os.environ["KAPO_SYNC_ROOT"] = str(sync_root)
os.environ["LOCAL_DATA_DIR"] = str(data_dir)
os.environ["DB_PATH"] = str(data_dir / "episodic.db")
os.environ["TOOLS_DB_PATH"] = str(data_dir / "tools.db")
os.environ["FAISS_INDEX_PATH"] = str(data_dir / "faiss.index")
os.environ["REMOTE_BRAIN_ONLY"] = str(os.getenv("REMOTE_BRAIN_ONLY", "1") or "1")
saved_executor_url = _load_saved_executor_url()
current_executor_url = str(os.getenv("EXECUTOR_URL", "") or "").strip()
if saved_executor_url and not current_executor_url:
os.environ["EXECUTOR_URL"] = saved_executor_url
deps_module.CONFIG_CACHE = None
def _sync_target_root() -> str:
return os.getenv("KAPO_SYNC_ROOT") or os.getenv("KAPO_RUNTIME_ROOT") or os.getcwd()
def _sync_root_path() -> Path:
return Path(_sync_target_root()).resolve()
def _resolve_sync_path(user_path: str | None = None) -> Path:
root = _sync_root_path()
relative = str(user_path or "").strip().replace("\\", "/").lstrip("/")
candidate = (root / relative).resolve() if relative else root
if candidate != root and root not in candidate.parents:
raise ValueError("Path escapes sync root")
return candidate
def _describe_sync_entry(path: Path) -> dict[str, Any]:
stat = path.stat()
return {
"name": path.name or str(path),
"path": str(path.relative_to(_sync_root_path())).replace("\\", "/") if path != _sync_root_path() else "",
"is_dir": path.is_dir(),
"size": stat.st_size,
"modified_at": stat.st_mtime,
}
def _public_url_state_path() -> Path:
runtime_root = Path(_sync_target_root()).resolve()
state_dir = runtime_root / "data" / "local" / "brain_runtime"
state_dir.mkdir(parents=True, exist_ok=True)
return state_dir / "public_url.txt"
def _remember_public_url(public_url: str) -> None:
value = str(public_url or "").strip().rstrip("/")
if not value:
return
os.environ["BRAIN_PUBLIC_URL"] = value
try:
_public_url_state_path().write_text(value, encoding="utf-8")
except Exception:
logger.warning("Failed to persist public URL", exc_info=True)
def _load_saved_public_url() -> str:
try:
value = _public_url_state_path().read_text(encoding="utf-8").strip().rstrip("/")
return value
except Exception:
return ""
def _ngrok_api_state_path() -> Path:
runtime_root = Path(_sync_target_root()).resolve()
state_dir = runtime_root / "data" / "local" / "brain_runtime"
state_dir.mkdir(parents=True, exist_ok=True)
return state_dir / "ngrok_api_url.txt"
def _executor_url_state_path() -> Path:
runtime_root = Path(_sync_target_root()).resolve()
state_dir = runtime_root / "data" / "local" / "brain_runtime"
state_dir.mkdir(parents=True, exist_ok=True)
return state_dir / "executor_url.txt"
def _remember_executor_url(executor_url: str) -> None:
value = str(executor_url or "").strip().rstrip("/")
if not value:
return
os.environ["EXECUTOR_URL"] = value
try:
_executor_url_state_path().write_text(value, encoding="utf-8")
except Exception:
logger.warning("Failed to persist executor URL", exc_info=True)
def _load_saved_executor_url() -> str:
configured = str(os.getenv("EXECUTOR_URL", "")).strip().rstrip("/")
if configured:
return configured
try:
return _executor_url_state_path().read_text(encoding="utf-8").strip().rstrip("/")
except Exception:
return ""
def _brain_state_id() -> str:
public_url = str(os.getenv("BRAIN_PUBLIC_URL") or _load_saved_public_url() or "").strip().rstrip("/")
if public_url:
return re.sub(r"[^A-Za-z0-9._-]+", "_", public_url)
runtime_root = str(os.getenv("KAPO_RUNTIME_ROOT") or _sync_target_root() or "brain_runtime").strip()
return re.sub(r"[^A-Za-z0-9._-]+", "_", runtime_root)
def _applied_runtime_version_path() -> Path:
runtime_root = Path(_sync_target_root()).resolve()
state_dir = runtime_root / "data" / "local" / "brain_runtime"
state_dir.mkdir(parents=True, exist_ok=True)
return state_dir / "applied_version.json"
def _load_applied_runtime_version() -> dict[str, Any]:
try:
return dict(json.loads(_applied_runtime_version_path().read_text(encoding="utf-8")) or {})
except Exception:
return {}
def _write_applied_runtime_version(payload: dict[str, Any]) -> None:
_applied_runtime_version_path().write_text(
json.dumps(dict(payload or {}), ensure_ascii=False, indent=2, sort_keys=True),
encoding="utf-8",
)
def _download_remote_zip(url: str, destination: Path) -> Path:
response = requests.get(str(url).strip(), timeout=120)
response.raise_for_status()
destination.parent.mkdir(parents=True, exist_ok=True)
destination.write_bytes(response.content)
return destination
def _apply_zip_overlay(zip_path: Path, target_root: Path) -> None:
with zipfile.ZipFile(zip_path, "r") as archive:
archive.extractall(target_root)
def _load_patch_manifest_from_bootstrap() -> dict[str, Any]:
bootstrap = DRIVE_STATE.ensure_bootstrap_loaded(force=True) or {}
manifest_url = str(bootstrap.get("patch_manifest_url") or "").strip()
if not manifest_url:
return dict(bootstrap)
try:
response = requests.get(manifest_url, timeout=30)
response.raise_for_status()
payload = dict(response.json() or {})
merged = {**bootstrap, **payload}
return merged
except Exception:
logger.warning("Failed to load patch manifest from bootstrap URL", exc_info=True)
return dict(bootstrap)
def _run_startup_self_update() -> None:
if not _startup_self_update_enabled():
return
manifest = _load_patch_manifest_from_bootstrap()
target_version = str(manifest.get("version") or "").strip()
target_hash = str(manifest.get("build_hash") or "").strip()
if not target_version and not target_hash:
return
current = _load_applied_runtime_version()
if (
str(current.get("version") or "").strip() == target_version
and str(current.get("build_hash") or "").strip() == target_hash
and target_version
):
return
runtime_root = Path(_sync_target_root()).resolve()
temp_dir = runtime_root / "data" / "local" / "brain_runtime" / "updates"
patch_url = str(manifest.get("patch_bundle_url") or "").strip()
full_url = str(manifest.get("full_package_url") or "").strip()
applied_mode = ""
source_url = ""
try:
if patch_url:
zip_path = _download_remote_zip(patch_url, temp_dir / "patch_bundle.zip")
_apply_zip_overlay(zip_path, runtime_root)
applied_mode = "patch_bundle"
source_url = patch_url
elif full_url:
zip_path = _download_remote_zip(full_url, temp_dir / "full_package.zip")
_apply_zip_overlay(zip_path, runtime_root)
applied_mode = "full_package"
source_url = full_url
else:
return
_write_applied_runtime_version(
{
"version": target_version,
"build_hash": target_hash,
"applied_at": time.time(),
"mode": applied_mode,
"source_url": source_url,
}
)
logger.info("Applied startup self-update (%s) version=%s", applied_mode, target_version or target_hash)
except Exception:
logger.warning("Startup self-update failed", exc_info=True)
def _remember_ngrok_api_url(api_url: str) -> None:
value = str(api_url or "").strip().rstrip("/")
if not value:
return
os.environ["KAPO_NGROK_API_URL"] = value
try:
_ngrok_api_state_path().write_text(value, encoding="utf-8")
except Exception:
logger.warning("Failed to persist ngrok API URL", exc_info=True)
def _load_saved_ngrok_api_url() -> str:
configured = str(os.getenv("KAPO_NGROK_API_URL", "")).strip().rstrip("/")
if configured:
return configured
try:
return _ngrok_api_state_path().read_text(encoding="utf-8").strip().rstrip("/")
except Exception:
return ""
def _ngrok_api_candidates() -> list[str]:
seen: set[str] = set()
candidates: list[str] = []
for candidate in [_load_saved_ngrok_api_url(), "http://127.0.0.1:4040", "http://127.0.0.1:4041", "http://127.0.0.1:4042"]:
value = str(candidate or "").strip().rstrip("/")
if value and value not in seen:
seen.add(value)
candidates.append(value)
return candidates
def _probe_ngrok_api(api_url: str) -> bool:
try:
response = requests.get(f"{api_url}/api/tunnels", timeout=2)
return response.status_code == 200
except Exception:
return False
def _find_live_ngrok_api() -> str | None:
for api_url in _ngrok_api_candidates():
if _probe_ngrok_api(api_url):
_remember_ngrok_api_url(api_url)
return api_url
return None
def _ngrok_binary_path() -> str:
env_path = str(os.getenv("NGROK_PATH", "")).strip()
if env_path and Path(env_path).exists():
return env_path
default_ngrok_path = ""
try:
from pyngrok import conf
default_ngrok_path = str(conf.get_default().ngrok_path or "").strip()
if default_ngrok_path and Path(default_ngrok_path).exists():
return default_ngrok_path
except Exception:
pass
try:
from pyngrok import installer
install_target = default_ngrok_path or str((Path.home() / ".ngrok" / "ngrok").resolve())
Path(install_target).parent.mkdir(parents=True, exist_ok=True)
installer.install_ngrok(install_target)
if Path(install_target).exists():
return install_target
except Exception:
logger.warning("Failed to auto-install ngrok binary", exc_info=True)
discovered = shutil.which("ngrok")
if discovered:
return discovered
return "ngrok"
def _ensure_ngrok_auth(token: str) -> None:
ngrok_path = _ngrok_binary_path()
subprocess.run(
[ngrok_path, "config", "add-authtoken", token],
check=False,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
def _start_detached_ngrok_agent(token: str) -> str | None:
if token:
_ensure_ngrok_auth(token)
os.environ["NGROK_AUTHTOKEN"] = token
ngrok_path = _ngrok_binary_path()
popen_kwargs = {
"stdout": subprocess.DEVNULL,
"stderr": subprocess.DEVNULL,
"stdin": subprocess.DEVNULL,
}
if os.name == "nt":
popen_kwargs["creationflags"] = getattr(subprocess, "DETACHED_PROCESS", 0) | getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0)
else:
popen_kwargs["start_new_session"] = True
subprocess.Popen(
[ngrok_path, "start", "--none", "--log=stdout"],
**popen_kwargs,
)
deadline = time.time() + 12
while time.time() < deadline:
api_url = _find_live_ngrok_api()
if api_url:
return api_url
time.sleep(0.5)
return None
def _list_ngrok_tunnels(api_url: str) -> list[dict[str, Any]]:
response = requests.get(f"{api_url}/api/tunnels", timeout=5)
response.raise_for_status()
payload = response.json()
tunnels = payload.get("tunnels")
return tunnels if isinstance(tunnels, list) else []
def _existing_ngrok_public_url(api_url: str, port: int) -> str | None:
for tunnel in _list_ngrok_tunnels(api_url):
public_url = str(tunnel.get("public_url") or "").strip()
config = tunnel.get("config") or {}
addr = str(config.get("addr") or "").strip()
if public_url and addr.endswith(f":{port}"):
return public_url.rstrip("/")
return None
def _create_ngrok_tunnel(api_url: str, port: int) -> str | None:
response = requests.post(
f"{api_url}/api/tunnels",
json={
"name": f"http-{port}-kapo",
"addr": str(port),
"proto": "http",
},
timeout=10,
)
response.raise_for_status()
payload = response.json()
return str(payload.get("public_url") or "").strip().rstrip("/") or None
def _publish_brain_presence(public_url: str, *, source: str = "runtime") -> None:
normalized = str(public_url or "").strip().rstrip("/")
if not normalized:
return
payload = {
"url": normalized,
"status": "healthy",
"source": source,
"platform": "kaggle" if _is_kaggle_runtime() else str(os.getenv("BRAIN_PROVIDER") or "remote"),
"role": os.getenv("BRAIN_PRIMARY_ROLE", "fallback"),
"roles": [part.strip() for part in os.getenv("BRAIN_ROLES", "supervisor,chat,coding,planner,arabic,fallback").split(",") if part.strip()],
"languages": [part.strip() for part in os.getenv("BRAIN_LANGUAGES", "ar,en").split(",") if part.strip()],
"model_profile_id": os.getenv("MODEL_PROFILE_ID") or os.getenv("SUPERVISOR_MODEL_PROFILE_ID") or DEFAULT_MODEL_PROFILE_ID,
"model_repo": MODEL_META.get("repo_id") or os.getenv("MODEL_REPO") or DEFAULT_MODEL_REPO,
"model_file": MODEL_META.get("filename") or os.getenv("MODEL_FILE") or DEFAULT_MODEL_FILE,
"updated_at": time.time(),
}
FIREBASE.set_document("brains", normalized, payload)
FIREBASE.set_document(
"runtime",
"brains_last_report",
{
"brain_url": normalized,
"source": source,
"updated_at": time.time(),
"provider": payload["platform"],
"model_profile_id": payload["model_profile_id"],
},
)
FIREBASE.set_document(
"tunnels",
f"brain_{normalized}",
{
"kind": "brain",
"public_url": normalized,
"provider": "ngrok" if "ngrok" in normalized else payload["platform"],
"updated_at": time.time(),
},
)
def _report_brain_url(public_url: str) -> None:
_publish_brain_presence(public_url, source="report_attempt")
executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/")
if not executor_url:
return
if not _should_report_brain_url(public_url):
return
last_error: Exception | None = None
connect_timeout = max(1.0, float(os.getenv("BRAIN_REPORT_CONNECT_TIMEOUT_SEC", "4.0") or 4.0))
read_timeout = max(5.0, float(os.getenv("BRAIN_REPORT_READ_TIMEOUT_SEC", "15.0") or 15.0))
retries = max(1, int(os.getenv("BRAIN_REPORT_RETRIES", "2") or 2))
for _ in range(retries):
try:
response = requests.post(
f"{executor_url}/brain/report-url",
json={
"brain_url": public_url,
"platform": "kaggle" if _is_kaggle_runtime() else "remote",
"role": os.getenv("BRAIN_PRIMARY_ROLE", "fallback"),
"roles": [part.strip() for part in os.getenv("BRAIN_ROLES", "supervisor,chat,coding,planner,arabic,fallback").split(",") if part.strip()],
"languages": [part.strip() for part in os.getenv("BRAIN_LANGUAGES", "ar,en").split(",") if part.strip()],
"model_profile_id": os.getenv("MODEL_PROFILE_ID") or os.getenv("SUPERVISOR_MODEL_PROFILE_ID") or DEFAULT_MODEL_PROFILE_ID,
"model_repo": MODEL_META.get("repo_id") or os.getenv("MODEL_REPO") or DEFAULT_MODEL_REPO,
"model_file": MODEL_META.get("filename") or os.getenv("MODEL_FILE") or DEFAULT_MODEL_FILE,
},
headers=_brain_headers(),
timeout=(connect_timeout, read_timeout),
)
response.raise_for_status()
_publish_brain_presence(public_url, source="executor_report")
return
except Exception as exc:
last_error = exc
time.sleep(1)
logger.info(
"Brain URL report to executor timed out or failed; continuing (%s)",
last_error,
)
def _pull_executor_settings() -> dict[str, Any]:
if _shared_state_backend() in {"google_drive", "drive", "gdrive"} or _drive_bootstrap_configured():
return {}
executor_url = _load_saved_executor_url()
if not executor_url:
return {}
try:
response = requests.get(
f"{executor_url}/share/settings",
headers=_brain_headers(),
timeout=(
max(1.5, float(os.getenv("EXECUTOR_CONNECT_TIMEOUT_SEC", "3.0") or 3.0)),
max(2.0, float(os.getenv("EXECUTOR_SETTINGS_READ_TIMEOUT_SEC", "5.0") or 5.0)),
),
)
if response.status_code == 200:
return response.json()
logger.warning("Executor settings request failed: %s", response.text[:400])
except Exception as exc:
logger.warning("Failed to pull executor settings (%s)", exc)
return {}
def start_ngrok(token: str | None = None) -> str | None:
restart_reuse = str(os.getenv("KAPO_RESTART_REUSE_PUBLIC_URL", "")).strip().lower() in {"1", "true", "yes", "on"}
if restart_reuse:
saved_public_url = _load_saved_public_url()
if saved_public_url:
_remember_public_url(saved_public_url)
_report_brain_url(saved_public_url)
FIREBASE.set_document("brains", saved_public_url, {"url": saved_public_url, "status": "healthy", "source": "restart_reuse"})
FIREBASE.set_document("tunnels", f"brain_{saved_public_url}", {"kind": "brain", "public_url": saved_public_url, "provider": "ngrok"})
logger.info("Reusing saved brain public URL after restart: %s", saved_public_url)
os.environ["KAPO_RESTART_REUSE_PUBLIC_URL"] = "0"
return saved_public_url
configured_public_url = _configured_public_url()
if configured_public_url and _prefer_configured_public_url():
_remember_public_url(configured_public_url)
_report_brain_url(configured_public_url)
FIREBASE.set_document("brains", configured_public_url, {"url": configured_public_url, "status": "healthy", "source": "configured_public_url"})
FIREBASE.set_document("tunnels", f"brain_{configured_public_url}", {"kind": "brain", "public_url": configured_public_url, "provider": "configured"})
logger.info("Using configured brain public URL without starting ngrok: %s", configured_public_url)
return configured_public_url
if not _ngrok_bootstrap_enabled():
logger.info("Skipping ngrok bootstrap because BRAIN_AUTO_NGROK is disabled")
return None
try:
authtoken = str(token or os.getenv("NGROK_AUTHTOKEN") or "").strip()
if not authtoken:
return None
port = int(os.getenv("BRAIN_PORT", "7860"))
api_url = _find_live_ngrok_api()
if not api_url:
api_url = _start_detached_ngrok_agent(authtoken)
if not api_url:
logger.warning("Ngrok agent did not expose a local API URL")
return None
public_url = _existing_ngrok_public_url(api_url, port)
if not public_url:
public_url = _create_ngrok_tunnel(api_url, port)
if not public_url:
return None
match = re.search(r"https://[A-Za-z0-9.-]+", public_url)
if match:
public_url = match.group(0)
_remember_ngrok_api_url(api_url)
_remember_public_url(public_url)
_report_brain_url(public_url)
FIREBASE.set_document("brains", public_url, {"url": public_url, "status": "healthy", "source": "ngrok_bootstrap"})
FIREBASE.set_document("tunnels", f"brain_{public_url}", {"kind": "brain", "public_url": public_url, "provider": "ngrok", "api_url": api_url})
return public_url
except Exception:
logger.exception("Ngrok startup failed")
return None
def _report_known_public_url() -> str | None:
public_url = _load_saved_public_url()
if not public_url:
return None
_remember_public_url(public_url)
_report_brain_url(public_url)
FIREBASE.set_document("brains", public_url, {"url": public_url, "status": "healthy", "source": "saved_public_url"})
logger.info("Reported known brain public URL without starting ngrok: %s", public_url)
return public_url
def _retry_publish_public_url(attempts: int = 8, delay_sec: float = 12.0) -> None:
for attempt in range(max(1, int(attempts))):
try:
public_url = _report_known_public_url()
if not public_url and _auto_publish_public_url_on_startup():
public_url = start_ngrok(os.getenv("NGROK_AUTHTOKEN") or None)
if public_url:
logger.info("Recovered brain public URL on retry attempt %s: %s", attempt + 1, public_url)
return
except Exception:
logger.warning("Public URL retry attempt %s failed", attempt + 1, exc_info=True)
time.sleep(max(2.0, float(delay_sec)))
logger.warning("Brain public URL retry loop exhausted without a published URL")
def _ensure_public_url_background(start_tunnel: bool = False) -> None:
global PUBLIC_URL_RETRY_STARTED
current = str(os.getenv("BRAIN_PUBLIC_URL") or LAST_BRAIN_URL_REPORT.get("url") or _load_saved_public_url() or "").strip()
if current or PUBLIC_URL_RETRY_STARTED:
return
if not start_tunnel and not _auto_publish_public_url_on_startup():
return
PUBLIC_URL_RETRY_STARTED = True
threading.Thread(
target=_retry_publish_public_url,
kwargs={"attempts": 8, "delay_sec": 12.0},
daemon=True,
).start()
def _bootstrap_executor_handshake(start_tunnel: bool = False) -> None:
executor_url = os.getenv("EXECUTOR_URL", "").strip()
if not executor_url:
if start_tunnel:
public_url = start_ngrok(os.getenv("NGROK_AUTHTOKEN") or None)
if public_url:
logger.info("Brain public URL started locally without executor handshake: %s", public_url)
else:
logger.info("Brain started without publishing a public URL")
_ensure_public_url_background(start_tunnel=True)
return
logger.info("Skipping executor handshake: EXECUTOR_URL not configured")
_ensure_public_url_background(start_tunnel=start_tunnel)
return
settings = _pull_executor_settings()
_apply_executor_settings(settings)
public_url = None
if start_tunnel:
public_url = start_ngrok(os.getenv("NGROK_AUTHTOKEN") or None)
if not public_url:
public_url = _report_known_public_url()
else:
public_url = _report_known_public_url()
if public_url:
logger.info("Brain public URL reported to executor: %s", public_url)
else:
logger.info("Brain started without publishing a public URL")
_ensure_public_url_background(start_tunnel=start_tunnel)
@app.on_event("startup")
async def startup_event():
global RUNTIME_STATE_THREAD_STARTED
try:
_bootstrap_shared_state()
except Exception:
logger.exception("Shared-state bootstrap failed")
try:
_prepare_runtime_environment()
except Exception:
logger.exception("Runtime environment bootstrap failed")
try:
_run_startup_self_update()
except Exception:
logger.exception("Startup self-update bootstrap failed")
internal_restart = _internal_restart_in_progress()
fast_restart = internal_restart and _fast_restart_enabled()
if not fast_restart:
try:
settings = _pull_executor_settings()
_apply_executor_settings(settings)
except Exception:
logger.exception("Executor settings bootstrap failed")
try:
_apply_firebase_runtime_settings()
except Exception:
logger.exception("Firebase runtime bootstrap failed")
else:
logger.info("Fast internal restart enabled; skipping executor/Firebase startup bootstrap")
if not fast_restart:
_load_default_model()
try:
if not fast_restart:
_load_embed_model()
except Exception:
logger.exception("Embedding model startup failed")
try:
start_tunnel = _auto_publish_public_url_on_startup() and not internal_restart
_bootstrap_executor_handshake(start_tunnel=start_tunnel)
except Exception:
logger.exception("Executor handshake startup failed")
finally:
os.environ["KAPO_INTERNAL_RESTART"] = "0"
_persist_runtime_state_snapshot(reason="startup")
if not RUNTIME_STATE_THREAD_STARTED:
RUNTIME_STATE_THREAD_STARTED = True
threading.Thread(target=_runtime_state_pulse, daemon=True).start()
class ModelLoadRequest(BaseModel):
repo_id: str
filename: str
hf_token: str | None = None
class ConnectionInit(BaseModel):
executor_url: str
ngrok_token: str | None = None
class PublishUrlRequest(BaseModel):
ngrok_token: str | None = None
public_url: str | None = None
start_tunnel: bool = True
class RestartRequest(BaseModel):
delay_sec: float = 1.0
class FileWriteRequest(BaseModel):
path: str
content: str = ""
overwrite: bool = True
class FileDeleteRequest(BaseModel):
path: str
recursive: bool = False
class FileMkdirRequest(BaseModel):
path: str
class ChatRequest(BaseModel):
request_id: str
user_input: str
context: dict[str, Any] = {}
history: list[dict[str, str]] = []
auto_execute: bool = True
def _contains_arabic(text: str) -> bool:
return bool(re.search(r"[\u0600-\u06FF]", text or ""))
def _detect_language(text: str) -> str:
return "ar" if _contains_arabic(text) else "en"
def _is_task_request(text: str) -> bool:
lower = (text or "").strip().lower()
task_words = [
"build", "fix", "debug", "create project", "generate project", "implement", "refactor",
"run", "execute", "install", "modify", "edit", "update", "write code", "make app",
"انشئ", "أنشئ", "اعمل", "نفذ", "شغل", "اصلح", "أصلح", "عدّل", "عدل", "ابني", "كوّن مشروع",
]
return any(word in lower for word in task_words)
def _is_research_request(text: str) -> bool:
lower = (text or "").strip().lower()
research_words = [
"search", "research", "look up", "find out", "web", "browse",
"ابحث", "ابحث عن", "دور", "فتش", "معلومة عن", "معلومات عن",
]
return any(word in lower for word in research_words)
def _is_knowledge_request(text: str, context: dict[str, Any] | None = None) -> bool:
context = context or {}
if bool(context.get("use_executor_knowledge")):
return True
lower = (text or "").strip().lower()
knowledge_words = [
"remember", "memory", "knowledge", "docs", "documentation", "project structure", "architecture",
"تذكر", "الذاكرة", "المعرفة", "الوثائق", "الدليل", "بنية المشروع", "هيكل المشروع", "معمارية",
]
return any(word in lower for word in knowledge_words)
def _prune_history(history: list[dict[str, str]], keep_last: int = 6) -> list[dict[str, str]]:
if len(history) <= keep_last:
return history
return history[-keep_last:]
def _retrieve_knowledge(query: str, top_k: int = 4) -> list[dict[str, Any]]:
executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/")
expanded_query = _expand_project_query(query)
if _executor_roundtrip_allowed("BRAIN_REMOTE_KNOWLEDGE_ENABLED", default=True):
try:
response = requests.get(
f"{executor_url}/rag/search",
params={"query": expanded_query, "top_k": top_k},
headers=_brain_headers(),
timeout=(
_executor_connect_timeout(),
_executor_read_timeout("BRAIN_REMOTE_KNOWLEDGE_TIMEOUT_SEC", 6.0),
),
)
if response.status_code == 200:
payload = response.json()
results = payload.get("results", [])
if isinstance(results, list):
return results
except requests.exceptions.ReadTimeout:
logger.info("Executor knowledge retrieval timed out; continuing without remote knowledge")
except Exception:
logger.warning("Executor knowledge retrieval failed", exc_info=True)
if _remote_brain_only() or not _feature_enabled("BRAIN_LOCAL_RAG_FALLBACK_ENABLED", default=False):
return []
try:
from rag.retriever import retrieve
return retrieve(expanded_query, top_k=top_k)
except Exception:
logger.warning("Knowledge retrieval failed", exc_info=True)
return []
def _search_web(query: str) -> list[dict[str, Any]]:
executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/")
if not _executor_roundtrip_allowed("BRAIN_REMOTE_WEB_SEARCH_ENABLED", default=True):
return []
try:
response = requests.post(
f"{executor_url}/tools/search",
json={"query": query, "num_results": 5},
headers=_brain_headers(),
timeout=(
_executor_connect_timeout(),
_executor_read_timeout("BRAIN_REMOTE_WEB_SEARCH_TIMEOUT_SEC", 8.0),
),
)
if response.status_code == 200:
payload = response.json()
results = payload.get("results", [])
return results if isinstance(results, list) else []
except Exception:
logger.warning("Web search failed", exc_info=True)
return []
def _format_context_blocks(knowledge: list[dict[str, Any]], web_results: list[dict[str, Any]]) -> str:
blocks: list[str] = []
if knowledge:
lines = []
for item in knowledge[:4]:
source = item.get("source", "knowledge")
content = item.get("content", "") or item.get("text", "")
lines.append(f"- [{source}] {content[:500]}")
blocks.append("Knowledge:\n" + "\n".join(lines))
if web_results:
lines = []
for item in web_results[:5]:
lines.append(f"- {item.get('title', '')}: {item.get('snippet', '')}")
blocks.append("Web:\n" + "\n".join(lines))
return "\n\n".join(blocks).strip()
def _project_context_tags(text: str) -> list[str]:
source = str(text or "")
lowered = source.lower()
tags: list[str] = []
tag_rules = [
("brain_runtime", ["العقل", "brain", "model", "موديل", "النموذج"]),
("executor_runtime", ["الوكيل التنفيذي", "executor", "agent"]),
("tunnel_runtime", ["النفق", "tunnel", "ngrok", "cloudflare"]),
("url_routing", ["الرابط", "url", "endpoint", "لينك"]),
("restart_sync", ["restart", "ريستارت", "إعادة تشغيل", "اعادة تشغيل", "sync", "مزامنة"]),
("knowledge_memory", ["memory", "ذاكرة", "معرفة", "knowledge", "rag", "طبقات", "embedding", "embeddings"]),
("kaggle_runtime", ["kaggle", "كاجل"]),
]
for tag, markers in tag_rules:
if any(marker in source or marker in lowered for marker in markers):
tags.append(tag)
return tags
def _expand_project_query(query: str) -> str:
tags = _project_context_tags(query)
additions: list[str] = []
if "tunnel_runtime" in tags:
additions.append("ngrok tunnel public url reverse proxy runtime restart")
if "restart_sync" in tags:
additions.append("system restart sync uvicorn process reuse public url")
if "executor_runtime" in tags:
additions.append("executor share settings control plane local machine")
if "knowledge_memory" in tags:
additions.append("knowledge layers rag embeddings preferences profile")
if "brain_runtime" in tags:
additions.append("brain server kaggle model startup runtime")
return query if not additions else f"{query}\nContext expansion: {' | '.join(additions)}"
def _fetch_style_profile() -> dict[str, Any]:
firebase_profile = FIREBASE.get_document("profiles", "style", ttl_sec=30.0)
if firebase_profile:
return firebase_profile
executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/")
if not executor_url or not _executor_roundtrip_allowed("BRAIN_REMOTE_STYLE_PROFILE_ENABLED", default=False):
return {}
try:
response = requests.get(
f"{executor_url}/preferences/profile",
headers=_brain_headers(),
timeout=(
_executor_connect_timeout(),
_executor_read_timeout("BRAIN_REMOTE_STYLE_PROFILE_TIMEOUT_SEC", 2.5),
),
)
if response.status_code != 200:
return {}
payload = response.json().get("profile", {})
return payload if isinstance(payload, dict) else {}
except requests.exceptions.ReadTimeout:
logger.info("Style profile load timed out; continuing without remote style profile")
return {}
except Exception:
logger.warning("Failed to load style profile", exc_info=True)
return {}
def _render_style_profile_context(profile: dict[str, Any]) -> str:
if not profile:
return ""
preferences = profile.get("preferences", []) or []
examples = profile.get("examples", []) or []
lexical_signals = profile.get("lexical_signals", []) or []
style_markers = profile.get("style_markers", {}) or {}
persona_summary = str(profile.get("persona_summary") or "").strip()
response_contract = str(profile.get("response_contract") or "").strip()
lines: list[str] = []
if persona_summary:
lines.append(f"User persona summary: {persona_summary}")
if response_contract:
lines.append(f"Response contract: {response_contract}")
if preferences:
lines.append("User Style Preferences:")
for item in preferences[:10]:
lines.append(f"- {item}")
if lexical_signals:
lines.append("User Lexical Signals:")
for item in lexical_signals[:10]:
lines.append(f"- {item}")
if style_markers:
lines.append("Style Markers:")
for key, value in sorted(style_markers.items()):
lines.append(f"- {key}: {value}")
if examples:
lines.append("Recent User Style Examples:")
for sample in examples[-3:]:
user_text = str(sample.get("user_input") or "").strip()
assistant_text = str(sample.get("assistant_reply") or "").strip()
if user_text:
lines.append(f"- User: {user_text}")
if assistant_text:
lines.append(f" Assistant: {assistant_text}")
return "\n".join(lines).strip()
def _project_domain_context(user_input: str, context: dict[str, Any] | None = None) -> str:
tags = _project_context_tags(user_input)
if not tags:
return ""
lines = [
"Project Domain Glossary:",
"- In this project, terms like العقل, الوكيل التنفيذي, النفق, الرابط, الريستارت, المزامنة, الذاكرة, والطبقات usually refer to software runtime and operations concepts.",
"- Treat النفق as ngrok or cloudflare tunnel unless the user explicitly asks for a literal/civil meaning.",
"- Treat الرابط as public URL, endpoint, or routing target when the surrounding context mentions deployment, Kaggle, restart, or sync.",
"- Treat العقل as the remote Brain service/model runtime, and الوكيل التنفيذي as the local executor/control plane on the user's device.",
]
if "restart_sync" in tags:
lines.append("- Restart means process/service restart; preserving the same public URL matters more than creating a fresh tunnel.")
if "knowledge_memory" in tags:
lines.append("- Knowledge, embeddings, layers, and memory refer to the RAG and memory system inside this project.")
if "kaggle_runtime" in tags:
lines.append("- Kaggle here is the remote runtime hosting the Brain service.")
role_name = str((context or {}).get("role_name") or "").strip()
if role_name:
lines.append(f"- Current assigned role: {role_name}.")
return "\n".join(lines)
def _firebase_runtime_context(role_name: str, language: str) -> str:
snapshot = _firebase_runtime_snapshot()
lines: list[str] = []
roles = snapshot.get("roles") or []
if roles:
enabled_roles = [
str(item.get("name") or item.get("id") or "").strip()
for item in roles
if str(item.get("enabled", True)).strip().lower() not in {"0", "false", "no", "off"}
]
enabled_roles = [item for item in enabled_roles if item]
if enabled_roles:
lines.append("Live roles from Firestore: " + ", ".join(enabled_roles[:12]))
models = snapshot.get("models") or []
if models:
preferred = [
item for item in models
if str(item.get("enabled", True)).strip().lower() not in {"0", "false", "no", "off"}
]
if preferred:
labels = [str(item.get("label") or item.get("id") or "").strip() for item in preferred[:5]]
labels = [item for item in labels if item]
if labels:
lines.append("Live model profiles: " + ", ".join(labels))
platforms = snapshot.get("platforms") or []
if platforms:
names = [str(item.get("name") or "").strip() for item in platforms[:6] if str(item.get("name") or "").strip()]
if names:
lines.append("Live platforms: " + ", ".join(names))
prompt_body = _firebase_prompt_body(role_name or "chat", language) or _firebase_prompt_body(role_name or "chat", "en")
if prompt_body:
lines.append(f"Live Firestore prompt for role '{role_name or 'chat'}': {prompt_body}")
return "\n".join(lines).strip()
def _append_runtime_instructions(context_block: str, context: dict[str, Any]) -> str:
instructions = str((context or {}).get("system_instructions") or "").strip()
role_name = str((context or {}).get("role_name") or "").strip()
user_input = str((context or {}).get("user_input") or "").strip()
language = _detect_language(user_input)
style_profile = _render_style_profile_context(_fetch_style_profile())
domain_context = _project_domain_context(user_input, context)
firebase_context = _firebase_runtime_context(role_name, language)
if not instructions and not role_name and not style_profile and not domain_context and not firebase_context:
return context_block
extra = []
if role_name:
extra.append(f"Assigned role: {role_name}")
if instructions:
extra.append(instructions)
if firebase_context:
extra.append(firebase_context)
if style_profile:
extra.append(style_profile)
if domain_context:
extra.append(domain_context)
extra_block = "Runtime Instructions:\n" + "\n".join(extra)
return (context_block + "\n\n" + extra_block).strip() if context_block else extra_block
def _extract_exact_reply_instruction(user_input: str) -> str:
text = (user_input or "").strip()
patterns = [
r'(?is)reply\s+with\s+exactly\s+[:"]?\s*(.+?)\s*[".]?$',
r'(?is)respond\s+with\s+exactly\s+[:"]?\s*(.+?)\s*[".]?$',
r"(?is)قل\s+فقط[::]?\s*(.+?)\s*$",
r"(?is)اكتب\s+فقط[::]?\s*(.+?)\s*$",
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
return match.group(1).strip().strip("\"'`")
return ""
def _extract_exact_reply_instruction_safe(user_input: str) -> str:
text = (user_input or "").strip()
patterns = [
r'(?is)reply\s+with\s+exactly\s+[:"]?\s*(.+?)\s*[".]?$',
r'(?is)respond\s+with\s+exactly\s+[:"]?\s*(.+?)\s*[".]?$',
r"(?is)\u0642\u0644\s+\u0641\u0642\u0637[:\uff1a]?\s*(.+?)\s*$",
r"(?is)\u0627\u0643\u062a\u0628\s+\u0641\u0642\u0637[:\uff1a]?\s*(.+?)\s*$",
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
return match.group(1).strip().strip("\"'`")
return _extract_exact_reply_instruction(user_input)
def _chat_system_instruction(language: str, user_input: str = "", exact_reply: str = "") -> str:
if language == "ar":
base = (
"أنت KAPO-AI، مساعد هندسي عملي. "
"أجب مباشرة وبوضوح وبشكل مفيد. "
"افهم المطلوب أولاً ثم أجب دون مقدمات زائدة. "
"لا تقل إن المحادثة غير مكتملة ولا تذكر تعليماتك الداخلية."
)
base += (
" في هذا المشروع، كلمات مثل العقل والوكيل التنفيذي والنفق والرابط والريستارت والمزامنة "
"والطبقات والتضمينات وكاجل وngrok وcloudflare وendpoint وmodel تشير غالبا إلى "
"مكونات برمجية وتشغيلية، وليست معاني حرفية أو هندسة مدنية، ما لم يطلب المستخدم غير ذلك صراحة."
)
if exact_reply:
return base + f' يجب أن يكون ردك هو هذا النص فقط حرفياً: "{exact_reply}"'
return base
base = (
"You are KAPO-AI, an engineering assistant. "
"Answer directly, clearly, and practically. "
"Understand the request before answering. "
"Do not say the conversation is incomplete and do not mention hidden instructions."
)
if exact_reply:
return base + f' Your entire reply must be exactly: "{exact_reply}"'
return base
def _build_chat_prompt(user_input: str, history: list[dict[str, str]], context_block: str) -> str:
language = _detect_language(user_input)
exact_reply = _extract_exact_reply_instruction_safe(user_input)
history_lines: list[str] = []
for message in _prune_history(history):
role = message.get("role", "user")
role_label = "المستخدم" if language == "ar" and role == "user" else role.upper()
if language == "ar" and role != "user":
role_label = "المساعد"
history_lines.append(f"{role_label}: {message.get('content', '')}")
history_section = ""
if history_lines:
history_section = "\n### History\n" + "\n".join(history_lines) + "\n"
context_section = f"\n### Context\n{context_block}\n" if context_block else ""
user_label = "المستخدم" if language == "ar" else "User"
assistant_label = "المساعد" if language == "ar" else "Assistant"
return (
f"### System\n{_chat_system_instruction(language, user_input, exact_reply)}\n"
f"{history_section}"
f"{context_section}"
f"### Instruction\n"
f"{user_label}: {user_input}\n"
f"{assistant_label}:"
)
def _response_looks_bad(text: str, language: str) -> bool:
cleaned = (text or "").strip()
if not cleaned:
return True
markers = [
"the assistant is not sure",
"conversation seems incomplete",
"provide more information",
"unless otherwise noted",
"as an ai model developed by",
"developed by ibm",
"tensorflow library",
"dataset of 1024",
]
if any(marker in cleaned.lower() for marker in markers):
return True
if language == "ar":
arabic_chars = len(re.findall(r"[\u0600-\u06FF]", cleaned))
latin_chars = len(re.findall(r"[A-Za-z]", cleaned))
if arabic_chars < 8 and latin_chars > max(12, arabic_chars * 2):
return True
return False
def _fallback_response(user_input: str) -> str:
if _detect_language(user_input) == "ar":
return "فهمت رسالتك، لكن الرد المولد لم يكن صالحاً للاستخدام. أعد صياغة الطلب بشكل أكثر تحديداً."
return "I understood your message, but the generated reply was not usable. Please rephrase the request more specifically."
def _project_specific_fast_reply(user_input: str) -> str:
text = (user_input or "").strip()
lower = text.lower()
if any(token in text for token in ("ايه اللي اتصلح", "إيه اللي اتصلح", "هو ايه اللي اتصلح", "هو إيه اللي اتصلح")) and any(
token in text or token in lower for token in ("النفق", "الرابط", "الريستارت", "restart")
):
return (
"الذي اتصلح هو أن الريستارت الداخلي بقى يعيد تشغيل خدمة العقل نفسها من غير ما يكسر النفق أو يغيّر الرابط العام. "
"ولو ظهر توقف قصير أثناء الإقلاع فهذا يكون من رجوع الخدمة، لا من إنشاء نفق جديد."
)
if "النفق" in text and any(token in text for token in ("إصلاح", "اصلاح", "الذي تم", "اتصلح", "تم إصلاح", "تم اصلاح")):
return (
"تم فصل دورة حياة النفق عن دورة حياة خدمة العقل، فأصبح ngrok يعمل كعامل مستقل عن عملية Uvicorn. "
"وبالتالي يحتفظ /system/restart بنفس الرابط العام بدل إنشاء رابط جديد، وقد يظهر ERR_NGROK_8012 مؤقتًا فقط أثناء الإقلاع."
)
if "نفس الرابط" in text and ("ريستارت" in text or "restart" in lower):
return (
"الهدف هنا أن تعيد الخدمة الإقلاع داخليًا مع الإبقاء على نفس الـ public URL. "
"لذلك يبقى tunnel حيًا، بينما تعود خدمة localhost:7860 للعمل بعد ثوانٍ قليلة على نفس الرابط."
)
return ""
def _hf_chat_messages(user_input: str, history: list[dict[str, str]], context_block: str) -> list[dict[str, str]]:
messages: list[dict[str, str]] = []
context_text = str(context_block or "").strip()
if context_text:
messages.append({"role": "system", "content": context_text})
for item in history or []:
role = str((item or {}).get("role") or "").strip().lower()
content = str((item or {}).get("content") or "").strip()
if role in {"system", "user", "assistant"} and content:
messages.append({"role": role, "content": content})
messages.append({"role": "user", "content": str(user_input or "").strip()})
return messages
def _hf_chat_completion_text(client: Any, messages: list[dict[str, str]], max_tokens: int) -> str:
result = client.chat_completion(messages=messages, max_tokens=max_tokens)
choices = getattr(result, "choices", None)
if choices is None and isinstance(result, dict):
choices = result.get("choices")
choices = choices or []
if not choices:
return ""
first = choices[0]
message = getattr(first, "message", None)
if message is None and isinstance(first, dict):
message = first.get("message")
content = getattr(message, "content", None)
if content is None and isinstance(message, dict):
content = message.get("content")
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, dict):
text = item.get("text")
if text:
parts.append(str(text))
elif item:
parts.append(str(item))
content = "\n".join(part for part in parts if part.strip())
return str(content or "").strip()
def _generate_response(user_input: str, history: list[dict[str, str]], context_block: str) -> str:
language = _detect_language(user_input)
exact_reply = _extract_exact_reply_instruction_safe(user_input)
if exact_reply:
return exact_reply
fast_reply = _project_specific_fast_reply(user_input)
if fast_reply:
return fast_reply
if MODEL is None:
provider = str(os.getenv("BRAIN_PROVIDER", "") or os.getenv("BRAIN_TEMPLATE", "") or "").strip().lower()
if _feature_enabled("KAPO_HF_INFERENCE_API", default=False) or "huggingface" in provider or "hf-space" in provider or _hf_transformers_runtime_enabled():
try:
from huggingface_hub import InferenceClient
prompt = _build_chat_prompt(user_input, history, context_block)
messages = _hf_chat_messages(user_input, history, context_block)
max_tokens = 80 if language == "ar" else 96
model_repo = str(os.getenv("MODEL_REPO", DEFAULT_MODEL_REPO) or DEFAULT_MODEL_REPO).strip()
client = InferenceClient(model=model_repo, api_key=(str(os.getenv("HF_TOKEN", "") or "").strip() or None))
try:
generated_text = _hf_chat_completion_text(client, messages, max_tokens)
if generated_text:
return generated_text
except Exception as exc:
logger.info("HF chat-completion path failed; falling back to text-generation (%s)", exc)
generated = client.text_generation(
prompt,
max_new_tokens=max_tokens,
return_full_text=False,
)
generated_text = str(generated or "").strip()
if generated_text:
return generated_text
except Exception as exc:
logger.warning("HF inference fallback failed: %s", exc, exc_info=True)
if language == "ar":
return "الخدمة تعمل لكن توليد الرد الحر غير متاح الآن لأن النموذج غير محمل."
return "The Brain is online, but natural chat generation is unavailable because the model is not loaded."
prompt = _build_chat_prompt(user_input, history, context_block)
try:
max_tokens = 80 if language == "ar" else 96
output = MODEL(
prompt,
max_tokens=max_tokens,
temperature=0.1,
top_p=0.85,
stop=["\nUser:", "\nUSER:", "\nالمستخدم:", "\n###", "<|EOT|>"],
)
text = output["choices"][0]["text"].strip()
if _response_looks_bad(text, language):
return _fallback_response(user_input)
return text or ("تم استلام رسالتك." if language == "ar" else "I received your message.")
except Exception:
logger.exception("Model generation failed")
if language == "ar":
return "فهمت طلبك، لكن فشل توليد الرد النصي."
return "I understood your request, but text generation failed."
def _store_chat_trace(request_id: str, payload: dict[str, Any]) -> None:
executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/")
if not _executor_roundtrip_allowed("BRAIN_REMOTE_TRACE_STORE_ENABLED", default=True):
return
if not executor_url:
return
try:
requests.post(
f"{executor_url}/memory/store",
json={"request_id": request_id, "payload": payload},
headers=_brain_headers(),
timeout=(
_executor_connect_timeout(),
_executor_read_timeout("BRAIN_REMOTE_TRACE_STORE_TIMEOUT_SEC", 2.5),
),
)
except requests.exceptions.ReadTimeout:
logger.info("Chat trace store timed out; continuing")
except requests.exceptions.ConnectionError:
logger.info("Chat trace store skipped because executor is unreachable")
except Exception:
logger.warning("Failed to store chat trace on executor", exc_info=True)
def _ingest_chat_knowledge(request_id: str, user_input: str, reply: str) -> None:
if len(reply or "") < 180:
return
executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/")
if not _executor_roundtrip_allowed("BRAIN_REMOTE_AUTO_INGEST_ENABLED", default=False):
return
payload = {
"request_id": request_id,
"payload": {
"source": "auto_chat",
"content": f"User: {user_input}\nAssistant: {reply}",
},
}
try:
requests.post(
f"{executor_url}/rag/ingest",
json=payload,
headers=_brain_headers(),
timeout=(
_executor_connect_timeout(),
_executor_read_timeout("BRAIN_REMOTE_AUTO_INGEST_TIMEOUT_SEC", 3.0),
),
)
except requests.exceptions.ReadTimeout:
logger.info("Auto-ingest chat knowledge timed out; continuing")
except Exception:
logger.warning("Failed to auto-ingest chat knowledge", exc_info=True)
def _learn_user_style(request_id: str, user_input: str, reply: str, context: dict[str, Any]) -> None:
executor_url = os.getenv("EXECUTOR_URL", "").strip().rstrip("/")
if not executor_url or not _executor_roundtrip_allowed("BRAIN_REMOTE_STYLE_PROFILE_ENABLED", default=False):
return
try:
requests.post(
f"{executor_url}/preferences/learn",
json={
"request_id": request_id,
"user_input": user_input,
"assistant_reply": reply,
"context": context or {},
},
headers=_brain_headers(),
timeout=(
_executor_connect_timeout(),
_executor_read_timeout("BRAIN_REMOTE_TRACE_STORE_TIMEOUT_SEC", 2.5),
),
)
except requests.exceptions.ReadTimeout:
logger.info("Style learning timed out; continuing")
except Exception:
logger.warning("Failed to learn user style on executor", exc_info=True)
def _dispatch_background(task, *args) -> None:
try:
threading.Thread(target=task, args=args, daemon=True).start()
except Exception:
logger.warning("Background task dispatch failed", exc_info=True)
def _restart_process(delay_sec: float = 1.0) -> None:
def _run() -> None:
time.sleep(max(0.2, float(delay_sec)))
target_root = _sync_target_root()
os.chdir(target_root)
os.environ["KAPO_INTERNAL_RESTART"] = "1"
os.environ.setdefault("KAPO_FAST_INTERNAL_RESTART", "1")
if _reuse_public_url_on_restart():
current_public_url = _load_saved_public_url()
if current_public_url:
_remember_public_url(current_public_url)
os.environ["KAPO_RESTART_REUSE_PUBLIC_URL"] = "1"
port = str(os.getenv("BRAIN_PORT", "7860") or "7860")
app_module = str(os.getenv("KAPO_UVICORN_APP") or "").strip()
if not app_module:
if (Path(target_root) / "brain_server" / "api" / "main.py").exists():
app_module = "brain_server.api.main:app"
elif (Path(target_root) / "api" / "main.py").exists():
app_module = "api.main:app"
else:
app_module = "brain_server.api.main:app"
os.execv(
sys.executable,
[
sys.executable,
"-m",
"uvicorn",
app_module,
"--host",
"0.0.0.0",
"--port",
port,
],
)
threading.Thread(target=_run, daemon=True).start()
@app.get("/")
async def root():
return {"status": "ok", "service": "brain_server", "docs": "/docs", "health": "/health"}
@app.get("/runtime/firebase")
async def runtime_firebase_snapshot():
try:
return {"status": "ok", "firebase": _json_safe(_firebase_runtime_snapshot())}
except Exception as exc:
logger.warning("Failed to build firebase runtime snapshot", exc_info=True)
return {"status": "degraded", "firebase": {"platforms": [], "models": [], "prompts": [], "roles": []}, "detail": str(exc)}
@app.get("/runtime/errors")
async def runtime_errors(limit: int = 50, level: str = "WARNING"):
normalized = str(level or "WARNING").strip().upper()
allowed = {"DEBUG": 10, "INFO": 20, "WARNING": 30, "ERROR": 40, "CRITICAL": 50}
threshold = allowed.get(normalized, 30)
items = [item for item in list(RUNTIME_LOG_BUFFER) if allowed.get(str(item.get("level") or "").upper(), 0) >= threshold]
return {"status": "ok", "count": len(items[-limit:]), "items": items[-limit:]}
@app.get("/runtime/modernization")
async def runtime_modernization():
try:
return {"status": "ok", "modernization": _runtime_modernization_snapshot()}
except Exception as exc:
logger.warning("Failed to build runtime modernization snapshot", exc_info=True)
return {"status": "degraded", "detail": str(exc), "modernization": {}}
@app.get("/model/status")
async def model_status():
return {
"loaded": MODEL is not None,
"error": MODEL_ERROR,
"repo_id": MODEL_META.get("repo_id"),
"filename": MODEL_META.get("filename"),
}
@app.post("/model/load")
async def model_load(req: ModelLoadRequest):
ensure_model_loaded(req.repo_id, req.filename, hf_token=req.hf_token)
return await model_status()
@app.post("/model/hotswap")
async def model_hotswap(req: ModelLoadRequest):
global MODEL
if MODEL is not None:
del MODEL
gc.collect()
MODEL = None
ensure_model_loaded(req.repo_id, req.filename, hf_token=req.hf_token)
return await model_status()
@app.post("/embeddings")
async def embeddings(payload: dict[str, Any]):
if EMBED_MODEL is None:
_load_embed_model()
texts = payload.get("texts") or []
if not texts:
return {"embeddings": []}
return {"embeddings": EMBED_MODEL.encode(texts).tolist()}
@app.post("/chat")
async def chat(req: ChatRequest):
try:
exact_reply = _extract_exact_reply_instruction_safe(req.user_input)
if exact_reply:
runtime_context = {**(req.context or {}), "user_input": req.user_input}
trace_payload = {
"mode": "chat",
"user_input": req.user_input,
"reply": exact_reply,
"plan": None,
"execution": None,
"knowledge": [],
"web_results": [],
"context": runtime_context,
}
memory = MemoryAgent()
memory.write_short_term(req.request_id, trace_payload)
return {
"status": "ok",
"mode": "chat",
"reply": exact_reply,
"plan": None,
"rationale": None,
"execution": None,
"knowledge": [],
"web_results": [],
"timestamp": time.time(),
}
planner = PlannerAgent()
reasoning = ReasoningAgent()
memory = MemoryAgent()
runtime_context = {**(req.context or {}), "user_input": req.user_input}
routing_context = runtime_context.get("routing") if isinstance(runtime_context.get("routing"), dict) else {}
requested_role = str(runtime_context.get("role_name") or routing_context.get("role") or "").strip().lower()
mode = "task" if _is_task_request(req.user_input) else "chat"
if not req.auto_execute and requested_role:
mode = "chat"
knowledge = _retrieve_knowledge(req.user_input, top_k=4) if _is_knowledge_request(req.user_input, req.context) else []
web_results = _search_web(req.user_input) if _is_research_request(req.user_input) else []
context_block = _append_runtime_instructions(_format_context_blocks(knowledge, web_results), runtime_context)
response_text = ""
plan_steps = None
execution = None
rationale = None
if mode == "task":
plan_steps = planner.run(req.user_input, req.context)
rationale = reasoning.run(req.user_input, plan_steps)
response_text = (
"سأتعامل مع هذه الرسالة كطلب تنفيذ، وقمت ببناء خطة مبدئية وسأبدأ التنفيذ تلقائياً."
if _contains_arabic(req.user_input)
else "I treated this as an execution request, built a plan, and started automatic execution."
)
if req.auto_execute:
from api.routes_execute import ExecuteRequest, execute as execute_route
execution = await execute_route(
ExecuteRequest(
request_id=req.request_id,
plan={"steps": plan_steps},
executor_url=os.getenv("EXECUTOR_URL", "").strip() or None,
)
)
if execution.get("report", {}).get("success"):
response_text += (
"\n\nتم التنفيذ بنجاح مبدئياً."
if _contains_arabic(req.user_input)
else "\n\nExecution completed successfully."
)
else:
response_text += (
"\n\nتمت المحاولة لكن توجد مخرجات تحتاج مراجعة."
if _contains_arabic(req.user_input)
else "\n\nExecution ran, but the result still needs review."
)
else:
response_text = _generate_response(req.user_input, req.history, context_block)
trace_payload = {
"mode": mode,
"user_input": req.user_input,
"reply": response_text,
"plan": plan_steps,
"execution": execution,
"knowledge": knowledge,
"web_results": web_results,
"context": runtime_context,
}
memory.write_short_term(req.request_id, trace_payload)
_dispatch_background(_store_chat_trace, req.request_id, trace_payload)
_dispatch_background(_ingest_chat_knowledge, req.request_id, req.user_input, response_text)
_dispatch_background(_learn_user_style, req.request_id, req.user_input, response_text, runtime_context)
return {
"status": "ok",
"mode": mode,
"reply": response_text,
"plan": plan_steps,
"rationale": rationale,
"execution": execution,
"knowledge": knowledge,
"web_results": web_results,
"timestamp": time.time(),
}
except Exception as exc:
logger.exception("Chat failed")
return {
"status": "error",
"mode": "chat",
"reply": "حدث خطأ أثناء معالجة الرسالة." if _contains_arabic(req.user_input) else "Chat processing failed.",
"detail": str(exc),
"timestamp": time.time(),
}
@app.post("/init-connection")
async def init_connection(payload: ConnectionInit):
_remember_executor_url(payload.executor_url)
FIREBASE.set_document("runtime", "executor", {"executor_url": payload.executor_url})
public_url = _report_known_public_url()
if not public_url:
public_url = start_ngrok(payload.ngrok_token)
return {"status": "connected", "brain_public_url": public_url}
@app.post("/system/publish-url")
async def system_publish_url(req: PublishUrlRequest | None = None):
payload = req or PublishUrlRequest()
explicit_public_url = str(payload.public_url or "").strip().rstrip("/")
if explicit_public_url:
_remember_public_url(explicit_public_url)
_report_brain_url(explicit_public_url)
FIREBASE.set_document("brains", explicit_public_url, {"url": explicit_public_url, "status": "healthy", "source": "explicit_publish"})
return {"status": "published", "brain_public_url": explicit_public_url, "mode": "explicit"}
if not payload.start_tunnel:
public_url = _report_known_public_url()
if public_url:
return {"status": "published", "brain_public_url": public_url, "mode": "saved"}
return {"status": "skipped", "brain_public_url": None, "mode": "none"}
public_url = start_ngrok(payload.ngrok_token)
if public_url:
return {"status": "published", "brain_public_url": public_url, "mode": "ngrok"}
public_url = _report_known_public_url()
return {"status": "published" if public_url else "error", "brain_public_url": public_url, "mode": "saved" if public_url else "none"}
@app.get("/system/files")
async def system_files(path: str = "", include_content: bool = False):
try:
target = _resolve_sync_path(path)
if not target.exists():
return {"status": "error", "detail": "Path not found", "path": path}
if target.is_file():
payload = {"status": "ok", "entry": _describe_sync_entry(target)}
if include_content:
payload["content"] = target.read_text(encoding="utf-8", errors="ignore")
return payload
items = sorted((_describe_sync_entry(item) for item in target.iterdir()), key=lambda item: (not item["is_dir"], item["name"].lower()))
return {"status": "ok", "root": str(_sync_root_path()), "path": path, "items": items}
except Exception as exc:
logger.exception("File listing failed")
return {"status": "error", "detail": str(exc), "path": path}
@app.post("/system/files/write")
async def system_files_write(payload: FileWriteRequest):
try:
target = _resolve_sync_path(payload.path)
target.parent.mkdir(parents=True, exist_ok=True)
if target.exists() and target.is_dir():
return {"status": "error", "detail": "Target path is a directory"}
if target.exists() and not payload.overwrite:
return {"status": "error", "detail": "File already exists"}
target.write_text(payload.content or "", encoding="utf-8")
return {"status": "saved", "entry": _describe_sync_entry(target)}
except Exception as exc:
logger.exception("File write failed")
return {"status": "error", "detail": str(exc), "path": payload.path}
@app.post("/system/files/mkdir")
async def system_files_mkdir(payload: FileMkdirRequest):
try:
target = _resolve_sync_path(payload.path)
target.mkdir(parents=True, exist_ok=True)
return {"status": "created", "entry": _describe_sync_entry(target)}
except Exception as exc:
logger.exception("Directory creation failed")
return {"status": "error", "detail": str(exc), "path": payload.path}
@app.delete("/system/files")
async def system_files_delete(payload: FileDeleteRequest):
try:
target = _resolve_sync_path(payload.path)
if not target.exists():
return {"status": "deleted", "path": payload.path, "existed": False}
if target.is_dir():
if payload.recursive:
shutil.rmtree(target)
else:
target.rmdir()
else:
target.unlink()
return {"status": "deleted", "path": payload.path, "existed": True}
except Exception as exc:
logger.exception("Delete failed")
return {"status": "error", "detail": str(exc), "path": payload.path}
if HAS_MULTIPART:
@app.post("/system/sync")
async def sync_codebase(file: UploadFile = File(...), restart: bool = False):
temp_zip = os.path.join(tempfile.gettempdir(), "kapo_update.zip")
try:
with open(temp_zip, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
with zipfile.ZipFile(temp_zip, "r") as zip_ref:
zip_ref.extractall(_sync_target_root())
if restart:
_restart_process()
return {"status": "synced", "target_root": _sync_target_root(), "restart_scheduled": restart}
except Exception as exc:
logger.exception("Code sync failed")
return {"status": "error", "detail": str(exc)}
@app.post("/system/archive/upload")
async def system_archive_upload(file: UploadFile = File(...), target_path: str = "", restart: bool = False):
temp_zip = os.path.join(tempfile.gettempdir(), "kapo_archive_upload.zip")
try:
extract_root = _resolve_sync_path(target_path)
extract_root.mkdir(parents=True, exist_ok=True)
with open(temp_zip, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
with zipfile.ZipFile(temp_zip, "r") as zip_ref:
zip_ref.extractall(extract_root)
if restart:
_restart_process()
return {
"status": "extracted",
"target_root": str(extract_root),
"restart_scheduled": restart,
}
except Exception as exc:
logger.exception("Archive upload failed")
return {"status": "error", "detail": str(exc)}
else:
@app.post("/system/sync")
async def sync_codebase_unavailable():
return {"status": "error", "detail": "python-multipart is required for /system/sync"}
@app.post("/system/archive/upload")
async def system_archive_upload_unavailable():
return {"status": "error", "detail": "python-multipart is required for /system/archive/upload"}
@app.post("/system/restart")
async def system_restart(req: RestartRequest | None = None):
delay_sec = req.delay_sec if req else 1.0
_restart_process(delay_sec=delay_sec)
return {
"status": "restarting",
"delay_sec": delay_sec,
"target_root": _sync_target_root(),
}
def _health_payload(check_executor: bool = False, executor_url: str | None = None) -> dict[str, Any]:
cfg = load_config()
base_exec_url = (executor_url or os.getenv("EXECUTOR_URL", "")).strip().rstrip("/")
exec_ok = False
exec_checked = False
exec_error = None
health_timeout = int(cfg.get("REQUEST_TIMEOUT_SEC", 20) or 20)
health_retries = max(1, int(cfg.get("REQUEST_RETRIES", 2) or 2))
if base_exec_url and check_executor:
exec_checked = True
for _ in range(health_retries):
try:
response = requests.get(
f"{base_exec_url}/health",
headers=get_executor_headers(cfg),
timeout=health_timeout,
)
exec_ok = response.status_code == 200
if exec_ok:
exec_error = None
break
exec_error = response.text
except Exception as exc:
exec_error = str(exc)
faiss_path = cfg.get("FAISS_INDEX_PATH")
return {
"status": "ok",
"model_loaded": MODEL is not None,
"model_error": MODEL_ERROR,
"embedding_loaded": EMBED_MODEL is not None,
"faiss_ok": bool(faiss_path and os.path.exists(faiss_path)),
"executor_checked": exec_checked,
"executor_ok": exec_ok,
"executor_error": exec_error,
"remote_brain_only": str(os.getenv("REMOTE_BRAIN_ONLY", "")).strip().lower() in {"1", "true", "yes", "on"},
"runtime_root": os.getenv("KAPO_RUNTIME_ROOT", ""),
"sync_root": _sync_target_root(),
"timestamp": time.time(),
}
def _decode_b64_text(value: str) -> str:
raw = str(value or "").strip()
if not raw:
return ""
padding = "=" * (-len(raw) % 4)
try:
return base64.urlsafe_b64decode((raw + padding).encode("utf-8")).decode("utf-8").strip()
except Exception:
return ""
def _runtime_modernization_snapshot() -> dict[str, Any]:
bootstrap = DRIVE_STATE.ensure_bootstrap_loaded(force=False) or {}
patch_manifest_url = str(
os.getenv("KAPO_PATCH_MANIFEST_URL", "")
or bootstrap.get("patch_manifest_url")
or ""
).strip()
remote_env_url = _decode_b64_text(os.getenv("KAPO_REMOTE_ENV_URL_B64", "")) or str(os.getenv("KAPO_REMOTE_ENV_URL", "") or "").strip()
public_url = str(os.getenv("BRAIN_PUBLIC_URL", "") or LAST_BRAIN_URL_REPORT.get("url") or _load_saved_public_url() or "").strip()
applied_version = _load_applied_runtime_version()
control_plane_url = str(os.getenv("KAPO_CONTROL_PLANE_URL", "") or "").strip()
queue_name = str(os.getenv("KAPO_CLOUDFLARE_QUEUE_NAME", "") or "").strip()
return {
"shared_state_backend": _shared_state_backend(),
"firebase_enabled": str(os.getenv("FIREBASE_ENABLED", "0")).strip().lower() in {"1", "true", "yes", "on"},
"drive": {
"folder_id": str(os.getenv("GOOGLE_DRIVE_SHARED_STATE_FOLDER_ID", "") or os.getenv("GOOGLE_DRIVE_STORAGE_FOLDER_ID", "") or "").strip(),
"prefix": str(os.getenv("GOOGLE_DRIVE_SHARED_STATE_PREFIX", "") or os.getenv("GOOGLE_DRIVE_STORAGE_PREFIX", "") or "").strip(),
"bootstrap_loaded": bool(bootstrap),
},
"bootstrap": {
"configured": bool(str(os.getenv("KAPO_BOOTSTRAP_URL", "") or os.getenv("GOOGLE_DRIVE_BOOTSTRAP_URL", "") or "").strip()),
"url": str(os.getenv("KAPO_BOOTSTRAP_URL", "") or os.getenv("GOOGLE_DRIVE_BOOTSTRAP_URL", "") or "").strip(),
"loaded": bool(bootstrap),
"version": str(bootstrap.get("version") or "").strip(),
"updated_at": bootstrap.get("updated_at"),
},
"patch": {
"configured": bool(patch_manifest_url),
"manifest_url": patch_manifest_url,
"startup_self_update_enabled": _startup_self_update_enabled(),
"applied_version": applied_version,
},
"remote_env": {
"configured": bool(remote_env_url),
"loaded": str(os.getenv("KAPO_REMOTE_ENV_LOADED", "0")).strip().lower() in {"1", "true", "yes", "on"},
"url": remote_env_url,
},
"control_plane": {
"configured": bool(control_plane_url),
"url": control_plane_url,
"queue_name": queue_name,
"queue_configured": bool(queue_name),
},
"transport": {
"provider": str(os.getenv("BRAIN_TUNNEL_PROVIDER", "ngrok") or "ngrok").strip().lower(),
"auto_ngrok": _ngrok_bootstrap_enabled(),
"reuse_public_url_on_restart": _reuse_public_url_on_restart(),
"public_url": public_url,
"executor_url": str(os.getenv("EXECUTOR_URL", "") or "").strip(),
},
"runtime": {
"root": str(_sync_target_root()),
"model_profile_id": str(os.getenv("MODEL_PROFILE_ID", "") or "").strip(),
"model_loaded": MODEL is not None,
"embed_loaded": EMBED_MODEL is not None,
},
}
def _persist_runtime_state_snapshot(reason: str = "periodic") -> dict[str, Any]:
payload = _health_payload(check_executor=False)
public_url = os.getenv("BRAIN_PUBLIC_URL") or _load_saved_public_url() or ""
state_id = _brain_state_id()
FIREBASE.set_document(
"brains",
public_url or state_id,
{
"url": public_url,
"health": payload,
"status": "healthy" if payload["model_loaded"] else "degraded",
"model_repo": os.getenv("MODEL_REPO", DEFAULT_MODEL_REPO),
"model_file": os.getenv("MODEL_FILE", DEFAULT_MODEL_FILE),
"model_profile_id": os.getenv("MODEL_PROFILE_ID", DEFAULT_MODEL_PROFILE_ID),
"roles": os.getenv("BRAIN_ROLES", "supervisor,chat,coding,planner,arabic,fallback"),
"languages": os.getenv("BRAIN_LANGUAGES", "ar,en"),
"updated_at": time.time(),
"source": reason,
},
min_interval_sec=5.0,
)
FIREBASE.set_document(
"brain_runtime",
state_id,
{
"brain_url": public_url,
"reason": reason,
"health": payload,
"recent_logs": list(RUNTIME_LOG_BUFFER)[-25:],
"updated_at": time.time(),
},
min_interval_sec=5.0,
)
return payload
def _runtime_state_pulse() -> None:
interval = max(20.0, float(os.getenv("BRAIN_STATE_SYNC_INTERVAL_SEC", "60") or 60))
while True:
try:
_persist_runtime_state_snapshot(reason="background_pulse")
except Exception:
logger.warning("Background runtime state sync failed", exc_info=True)
time.sleep(interval)
@app.get("/health")
async def health(executor_url: str | None = None, check_executor: bool = False):
payload = _health_payload(check_executor=check_executor, executor_url=executor_url)
_persist_runtime_state_snapshot(reason="health_endpoint")
return payload