PackedLLM / PackedLLM.py
HiMind's picture
Upload 5 files
299b8f3 verified
Raw
History Blame Contribute Delete
300 kB
from __future__ import annotations
'''PackedLLM, By: Chance Brownfield-|-HiMindAi@proton.me'''
import ast
import argparse
import base64
import contextlib
import dataclasses
import hashlib
import importlib
import importlib.util
import io
import json
import lzma
import math
import os
import platform
import queue
import re
import shutil
import subprocess
import sys
import tempfile
import textwrap
import threading
import time
import traceback
import types
import uuid
import zipfile
from collections import OrderedDict, defaultdict, deque
from dataclasses import dataclass, field, asdict
from datetime import datetime
from pathlib import Path
from typing import (
Any, Callable, Dict, Iterable, List, Mapping,
MutableMapping, Optional, Sequence, Tuple, Union
)
from urllib.parse import urlparse, parse_qs, quote, unquote
from multiprocessing import Process, Queue, get_context
import concurrent.futures
import numpy as np
import psutil
import requests
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import Tensor
try:
from transformers import (
MarianMTModel,
MarianTokenizer,
AutoModelForSeq2SeqLM,
AutoTokenizer,
)
_HAS_TRANSFORMERS = True
except ImportError:
_HAS_TRANSFORMERS = False
MarianMTModel = MarianTokenizer = None
AutoModelForSeq2SeqLM = AutoTokenizer = None
try:
from sentence_transformers import SentenceTransformer, util
_HAS_SENTENCE_TRANSFORMERS = True
except ImportError:
_HAS_SENTENCE_TRANSFORMERS = False
try:
import fitz # PyMuPDF
_HAS_PYMUPDF = True
except ImportError:
_HAS_PYMUPDF = False
try:
from sklearn.cluster import KMeans, AgglomerativeClustering
from sklearn.decomposition import PCA
from sklearn.metrics.pairwise import cosine_similarity
_HAS_SKLEARN = True
except ImportError:
_HAS_SKLEARN = False
KMeans = AgglomerativeClustering = PCA = cosine_similarity = None
try:
import spacy
_HAS_SPACY = True
except ImportError as e:
raise RuntimeError(f"spaCy is required: {e}")
try:
from bs4 import BeautifulSoup
except ImportError:
BeautifulSoup = None
try:
import trafilatura
except ImportError:
trafilatura = None
try:
from readability import Document
except ImportError:
Document = None
try:
from newspaper import Article
except ImportError:
Article = None
try:
from goose3 import Goose
except ImportError:
Goose = None
try:
from boilerpy3 import extractors
except ImportError:
extractors = None
try:
from inscriptis import get_text as inscriptis_text
except ImportError:
inscriptis_text = None
try:
from lxml import html as lxml_html
except ImportError:
lxml_html = None
try:
from youtube_transcript_api import YouTubeTranscriptApi
except ImportError:
YouTubeTranscriptApi = None
try:
from llama_cpp import Llama
_HAS_LLAMA_CPP = True
except ImportError:
_HAS_LLAMA_CPP = False
Llama = None
try:
import wgpu
_HAS_WGPU = True
except ImportError:
_HAS_WGPU = False
try:
from huggingface_hub import snapshot_download
_HAS_HF_HUB = True
except ImportError:
_HAS_HF_HUB = False
try:
from safetensors.torch import load_file as safetensors_load
except ImportError:
safetensors_load = None
MODEL_DIR = Path("models")
EMBEDDING_PATH = MODEL_DIR / "all-MiniLM-L6-v2"
SUMMARIZER_PATH = MODEL_DIR / "distilbart-cnn-12-6"
SPACY_MODEL_PATH = MODEL_DIR / "spacy" / "en_core_web_sm"
SPACY_MODEL_NAME = "en_core_web_sm"
DEFAULT_CHECKPOINT_PATH = "LM.pt"
DEFAULT_BUNDLE_PATH = "PackedLM.pt"
DEFAULT_IMAGE_TEST_SOURCE = "sample_img.png"
DEFAULT_ZH_EN_DIR = MODEL_DIR / "opus-mt-zh-en"
_CHUNK_BYTES = 32 * 1024 * 1024
_CODE_FENCE_RE = re.compile(r"```(?:python)?\s*\n(.*?)```", re.DOTALL | re.IGNORECASE)
CHINESE_RE = re.compile(r"[\u4e00-\u9fff]")
CHINESE_SPAN_RE = re.compile(
r"[\u4e00-\u9fff]+(?:[\u3000-\u303F\uFF00-\uFFEF\u2000-\u206F"
r"\u2E00-\u2E7F\u3000-\u303F\uFF00-\uFFEF\s,.;:!?\-—()\[\]{},。!?、;:]+"
r"[\u4e00-\u9fff]+)*"
)
GGUF_EMBED_FILENAME = "jina-embeddings-v3-Q8_0.gguf"
for p in [MODEL_DIR, EMBEDDING_PATH, SUMMARIZER_PATH, SPACY_MODEL_PATH.parent]:
p.mkdir(parents=True, exist_ok=True)
def _extract_python_code(text: str) -> str:
if not isinstance(text, str):
return ""
text = text.strip()
fences = _CODE_FENCE_RE.findall(text)
candidates = [f.strip() for f in fences] if fences else [text]
for candidate in candidates:
cleaned = _strip_to_valid_python(candidate)
if cleaned:
return cleaned
return text
def _strip_to_valid_python(code: str) -> str:
try:
ast.parse(code)
return code
except SyntaxError:
pass
lines = code.splitlines()
for start in range(1, min(len(lines), 10) + 1):
candidate = "\n".join(lines[start:])
try:
ast.parse(candidate)
return candidate
except SyntaxError:
continue
for end in range(len(lines) - 1, max(len(lines) - 10, 0), -1):
candidate = "\n".join(lines[:end])
try:
ast.parse(candidate)
return candidate
except SyntaxError:
continue
return ""
def _json_dumps(obj: Any) -> str:
return json.dumps(obj, ensure_ascii=False, default=str)
def _parse_json_safe(text: Any) -> Any:
if not isinstance(text, str):
return None
cleaned = text.strip()
if cleaned.startswith("```"):
lines = cleaned.splitlines()
if lines and lines[-1].strip() == "```":
cleaned = "\n".join(lines[1:-1])
else:
cleaned = "\n".join(lines[1:])
try:
return json.loads(cleaned)
except Exception:
for start_char, end_char in [("{", "}"), ("[", "]")]:
si = cleaned.find(start_char)
ei = cleaned.rfind(end_char)
if si != -1 and ei != -1 and ei > si:
try:
return json.loads(cleaned[si:ei + 1])
except Exception:
pass
return None
def _safe_import_class(name: str) -> Optional[type]:
try:
frame = inspect.stack()[2].frame
cls = frame.f_globals.get(name) or builtins.__dict__.get(name)
return cls if isinstance(cls, type) else None
except Exception:
return None
def _safe_call(obj: Any, name: str, *args: Any, default: Any = None, **kwargs: Any) -> Any:
fn = getattr(obj, name, None)
if not callable(fn):
return default
try:
return fn(*args, **kwargs)
except Exception:
return default
def _bytes_to_chunks(data: bytes, chunk_size: int = _CHUNK_BYTES) -> List[bytes]:
return [data[i: i + chunk_size] for i in range(0, max(len(data), 1), chunk_size)]
def _chunks_to_bytes(chunks: List[bytes]) -> bytes:
return b"".join(chunks)
def _read_file_chunked(path: Optional[Union[str, Path]]) -> Optional[List[bytes]]:
if not path:
return None
p = Path(path)
if not p.exists():
return None
chunks: List[bytes] = []
try:
with open(p, "rb") as fh:
while True:
chunk = fh.read(_CHUNK_BYTES)
if not chunk:
break
chunks.append(chunk)
except Exception:
return None
return chunks if chunks else [b""]
def _write_chunks_to_temp(chunks: Optional[List[bytes]], suffix: str, prefix: str = "packedllm_") -> Optional[str]:
if not chunks:
return None
fd, path = tempfile.mkstemp(prefix=prefix, suffix=suffix)
os.close(fd)
try:
with open(path, "wb") as fh:
for chunk in chunks:
fh.write(chunk)
except Exception:
try:
os.unlink(path)
except Exception:
pass
return None
return path
def _normalise_expert_name(name: str) -> str:
s = re.sub(r"(?<=[a-z0-9])([A-Z])", r"_\1", name)
return s.lower()
def _expert_names_canonical(names: List[str]) -> List[str]:
seen: set = set()
out: List[str] = []
for n in names:
key = _normalise_expert_name(n)
if key not in seen:
seen.add(key)
out.append(key)
return out
def capture_telemetry() -> Dict[str, Any]:
process = psutil.Process(os.getpid())
cpu_total_pct = psutil.cpu_percent(interval=None)
cpu_process_pct = process.cpu_percent(interval=None)
ram_info = psutil.virtual_memory()
metrics: Dict[str, Any] = {
"timestamp_ns": time.perf_counter_ns(),
"cpu": {
"system_total_percent": cpu_total_pct,
"process_percent": cpu_process_pct,
},
"ram": {
"system_total_gb": ram_info.total / (1024 ** 3),
"system_available_gb": ram_info.available / (1024 ** 3),
"system_used_gb": ram_info.used / (1024 ** 3),
"process_rss_gb": process.memory_info().rss / (1024 ** 3),
},
"gpu_hardware_metrics": {
"driver_detected": False,
"device_name": "None",
"total_vram_gb": 0.0,
"used_vram_gb": 0.0,
"free_vram_gb": 0.0,
"gpu_utilization_percent": 0.0,
},
}
try:
cmd = (
"nvidia-smi --query-gpu=name,memory.total,memory.free,memory.used,"
"utilization.gpu --format=csv,noheader,nounits"
)
output = subprocess.check_output(cmd.split(), stderr=subprocess.DEVNULL).decode("ascii").strip()
if output:
parts = [p.strip() for p in output.split(",")]
metrics["gpu_hardware_metrics"] = {
"driver_detected": True,
"device_name": parts[0],
"total_vram_gb": float(parts[1]) / 1024.0,
"free_vram_gb": float(parts[2]) / 1024.0,
"used_vram_gb": float(parts[3]) / 1024.0,
"gpu_utilization_percent": float(parts[4]),
}
except Exception:
pass
return metrics
def calculate_delta(start: Dict[str, Any], end: Dict[str, Any]) -> Dict[str, Any]:
s_gpu = start["gpu_hardware_metrics"]
e_gpu = end["gpu_hardware_metrics"]
vram_delta = (e_gpu["used_vram_gb"] - s_gpu["used_vram_gb"]) if e_gpu["driver_detected"] else 0.0
gpu_util = e_gpu["gpu_utilization_percent"] if e_gpu["driver_detected"] else 0.0
return {
"ram_process_delta_gb": end["ram"]["process_rss_gb"] - start["ram"]["process_rss_gb"],
"vram_allocated_delta_gb": vram_delta,
"gpu_instantaneous_utilization_pct": gpu_util,
"cpu_system_delta_pct": end["cpu"]["system_total_percent"] - start["cpu"]["system_total_percent"],
}
def normalize_unicode(s: str) -> str:
s = unicodedata.normalize("NFKC", s)
s = re.sub(r"[\u200B-\u200F\uFEFF\u00AD]", "", s)
s = re.sub(r"[\x00-\x1F\x7F]", "", s)
return s
def canonicalize_numbers(s: str) -> str:
return re.sub(r"\d+\.\d+|\d+", "N", s)
def strip_latex_wrappers(s: str) -> str:
s = s.replace("\\[", "").replace("\\]", "")
s = s.replace("\\(", "").replace("\\)", "")
s = re.sub(r"\$+", "", s)
return s
def semantic_key(line: str) -> str:
line = line.strip().lower()
line = normalize_unicode(line)
line = strip_latex_wrappers(line)
line = re.sub(r"\\frac\{([^}]*)}\{([^}]*)}", r"frac(\1,\2)", line)
line = re.sub(r"\^{\s*([^}]*)\s*}", r"^(\1)", line)
line = canonicalize_numbers(line)
line = re.sub(r"\s+", " ", line)
return line.strip()
def collapse_repeated_blocks_with_report(text: str, block_size: int = 2) -> Tuple[str, List[str]]:
lines = [l for l in text.splitlines() if l.strip()]
out: List[str] = []
seen = set()
removed: List[str] = []
i = 0
while i < len(lines):
block_lines = lines[i:i+block_size]
block_keys = tuple(semantic_key(l) for l in block_lines)
if len(block_keys) < block_size:
out.extend(lines[i:])
break
if block_keys in seen:
removed.extend(block_lines)
i += 1
continue
seen.add(block_keys)
out.extend(block_lines)
i += block_size
return "\n".join(out), removed
def collapse_repeated_semantic_lines_with_report(text: str, max_repeat: int = 1) -> Tuple[str, List[str]]:
out: List[str] = []
removed: List[str] = []
prev_key = None
count = 0
for line in text.splitlines():
if not line.strip():
out.append(line)
prev_key = None
count = 0
continue
k = semantic_key(line)
if k == prev_key:
count += 1
if count > max_repeat:
removed.append(line)
continue
else:
prev_key = k
count = 0
out.append(line)
return "\n".join(out), removed
def collapse_repeated_lines(text: str, block_size: int = 2, max_repeat: int = 1, passes: int = 2, verbose: bool = True) -> str:
out = text or ""
total_removed: List[str] = []
for _ in range(max(1, int(passes))):
out, removed_blocks = collapse_repeated_blocks_with_report(out, block_size=block_size)
total_removed.extend(removed_blocks)
out, removed_lines = collapse_repeated_semantic_lines_with_report(out, max_repeat=max_repeat)
total_removed.extend(removed_lines)
seen = set()
unique_removed = []
for r in total_removed:
if r not in seen:
seen.add(r)
unique_removed.append(r)
if verbose:
if unique_removed:
print(f"[collapse_repeated_lines] Removed {len(unique_removed)} unique repeated line(s)/block(s). Examples:")
for ex in unique_removed[:50]:
print(f"- {ex}")
else:
print("[collapse_repeated_lines] No repeated blocks or semantic-line repeats detected.")
return out.strip()
def _now_iso() -> str:
return datetime.utcnow().isoformat()
def _norm_ws(text: str) -> str:
return re.sub(r"\s+", " ", (text or "")).strip()
def _safe_json_dumps(obj: Any) -> str:
return json.dumps(obj, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
def _safe_json_loads(text: str, default: Any = None) -> Any:
try:
return json.loads(text)
except Exception:
return default
def _maybe_list(x: Any) -> List[Any]:
if x is None:
return []
if isinstance(x, list):
return x
if isinstance(x, tuple):
return list(x)
return [x]
def _normalize_whitespace(text: str) -> str:
return re.sub(r"\s+", " ", text or "").strip()
def split_sentences(text: str) -> List[str]:
text = _normalize_whitespace(text)
if not text:
return []
parts = re.split(r"(?<=[.!?])\s+", text)
return [p.strip() for p in parts if p.strip()]
def _safe_hash(text: str) -> int:
return hash(_normalize_whitespace(text))
def _ensure_min_text(text: str, fallback: str = "") -> str:
text = _normalize_whitespace(text)
if text:
return text
return _normalize_whitespace(fallback)
class FileManager:
def __init__(self, base_dir: str):
self.base_dir = os.path.abspath(base_dir)
self.global_dir = os.path.join(self.base_dir, "global_data")
os.makedirs(self.global_dir, exist_ok=True)
def abs_path(self, path: str) -> str:
return os.path.abspath(path)
def copy_to_global(self, src: str, dest_name: Optional[str] = None) -> str:
dest_name = dest_name or os.path.basename(src)
dest = os.path.join(self.global_dir, dest_name)
if os.path.abspath(src) == os.path.abspath(dest):
return dest
if not os.path.exists(dest):
shutil.copy2(src, dest)
return dest
def write_bytes_to_global(self, data: bytes, dest_name: str) -> str:
dest = os.path.join(self.global_dir, dest_name)
with open(dest, "wb") as f:
f.write(data)
return dest
def hardlink_or_copy(self, src: str, dst: str):
try:
os.link(src, dst)
except Exception:
shutil.copy2(src, dst)
def atomic_replace(self, src_tmp: str, dest: str):
os.replace(src_tmp, dest)
def compute_sha256(self, path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(1 << 20), b""):
h.update(chunk)
return h.hexdigest()
class CodeBoxError(Exception):
pass
class AssetNotFoundError(CodeBoxError):
pass
class RunnerCacheError(CodeBoxError):
pass
class LRUCache:
def __init__(self, capacity: int = 2):
self.capacity = capacity
self.cache = OrderedDict()
def get(self, key):
if key not in self.cache:
return None
self.cache.move_to_end(key)
return self.cache[key]
def put(self, key, value):
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
return
self.cache[key] = value
if len(self.cache) > self.capacity:
old_key, old_val = self.cache.popitem(last=False)
try:
if hasattr(old_val, "close"):
old_val.close()
if hasattr(old_val, "cleanup"):
old_val.cleanup()
except Exception:
pass
try:
import torch
del old_val
torch.cuda.empty_cache()
except Exception:
pass
def keys(self):
return list(self.cache.keys())
def clear(self):
self.cache.clear()
try:
import torch
torch.cuda.empty_cache()
except Exception:
pass
# The CodeBox
class CodeBox:
ASSETS_FILENAME = "assets.pt"
ASSET_SCHEMA_VERSION = 1
def __init__(self, base_dir: str = "./codebox_storage", runner_cache_capacity: int = 2):
self.base_dir = os.path.abspath(base_dir)
self.envs_dir = os.path.join(self.base_dir, "envs")
self.file_manager = FileManager(self.base_dir)
os.makedirs(self.envs_dir, exist_ok=True)
self.env_dic: Dict[str, Dict[str, Any]] = {}
self.code_bank: Dict[str, Dict[str, Any]] = {}
self.asset_registry: Dict[str, Dict[str, Any]] = {}
self._load_registry()
self._runner_cache = LRUCache(capacity=runner_cache_capacity)
self._ensure_loader_template()
def _registry_path(self) -> str:
return os.path.join(self.file_manager.global_dir, self.ASSETS_FILENAME)
def _persist_registry(self):
tmp_fd, tmp_path = tempfile.mkstemp(dir=self.file_manager.global_dir)
os.close(tmp_fd)
payload = {
"schema_version": self.ASSET_SCHEMA_VERSION,
"assets": self.asset_registry
}
torch.save(payload, tmp_path)
self.file_manager.atomic_replace(tmp_path, self._registry_path())
def _load_registry(self):
path = self._registry_path()
if os.path.exists(path):
try:
payload = torch.load(path)
if isinstance(payload, dict) and "assets" in payload:
self.asset_registry = payload["assets"]
else:
self.asset_registry = payload
except Exception:
corrupted = path + f".corrupt.{int(time.time())}"
shutil.move(path, corrupted)
self.asset_registry = {}
else:
self.asset_registry = {}
def register_asset(self, alias: str, file_path: Optional[str] = None,
asset_type: str = "bin", metadata: Dict = None,
embed_bytes: Optional[bytes] = None, force: bool = False) -> Dict[str, Any]:
metadata = metadata or {}
if embed_bytes is not None:
bytes_name = f"{alias}.embedded"
dest = self.file_manager.write_bytes_to_global(embed_bytes, bytes_name)
sha = self.file_manager.compute_sha256(dest)
entry = {
"source_path": dest,
"type": asset_type,
"metadata": metadata,
"embedded": True,
"bytes_name": bytes_name,
"sha256": sha,
"registered_at": time.time()
}
self.asset_registry[alias] = entry
self._persist_registry()
return entry
if not file_path:
raise ValueError("Provide either file_path or embed_bytes")
src_abs = os.path.abspath(file_path)
dest = self.file_manager.copy_to_global(src_abs, dest_name=os.path.basename(src_abs))
sha = self.file_manager.compute_sha256(dest)
entry = {
"source_path": dest,
"type": asset_type,
"metadata": metadata,
"embedded": False,
"sha256": sha,
"registered_at": time.time()
}
if alias in self.asset_registry and not force:
if self.asset_registry[alias].get("sha256") == sha:
return self.asset_registry[alias]
self.asset_registry[alias] = entry
self._persist_registry()
return entry
def unregister_asset(self, alias: str):
if alias in self.asset_registry:
del self.asset_registry[alias]
self._persist_registry()
def _ensure_loader_template(self):
loader_path = os.path.join(self.file_manager.global_dir, "_codebox_loader.py")
if os.path.exists(loader_path):
return
loader_code = r'''
import os
import json
# Try optional heavy deps; if missing, fall back to JSON registry and path-only behavior.
try:
import torch as _torch
except Exception:
_torch = None
try:
from safetensors.torch import load_file as _safetensors_load
except Exception:
_safetensors_load = None
def _registry_pt_path(assets_dir):
return os.path.join(assets_dir, "assets.pt")
def _registry_json_path(assets_dir):
return os.path.join(assets_dir, "assets.json")
def _load_registry(assets_dir):
# Prefer torch payload if torch is available and file exists
pt_path = _registry_pt_path(assets_dir)
json_path = _registry_json_path(assets_dir)
if _torch is not None and os.path.exists(pt_path):
try:
payload = _torch.load(pt_path)
if isinstance(payload, dict) and "assets" in payload:
return payload["assets"]
return payload
except Exception:
# fall through to json
pass
if os.path.exists(json_path):
with open(json_path, "r", encoding="utf-8") as f:
return json.load(f)
# last resort: try to load pt even without torch (will raise)
if os.path.exists(pt_path):
raise RuntimeError("Torch not available to read assets.pt; install torch or ensure assets.json exists.")
return {}
def load_asset(alias):
assets_dir = os.environ.get("CODEBOX_ASSETS_DIR")
if not assets_dir:
raise RuntimeError("CODEBOX_ASSETS_DIR not set")
registry = _load_registry(assets_dir)
entry = registry.get(alias)
if not entry:
raise KeyError(f"Asset '{alias}' not found in registry")
path = entry["source_path"]
typ = entry.get("type", "bin")
if typ == "safetensors":
if _safetensors_load is None:
return path
return _safetensors_load(path)
if typ == "pt":
if _torch is None:
raise RuntimeError("Torch not available in this environment to load .pt assets.")
return _torch.load(path, map_location="cpu")
if typ == "json":
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
return path
'''
loader_code = textwrap.dedent(loader_code)
with open(loader_path, "w", encoding="utf-8") as f:
f.write(loader_code)
def _sync_required_assets(self, working_dir: str, required_assets: Optional[List[str]] = None):
registry_dst_pt = os.path.join(working_dir, self.ASSETS_FILENAME)
registry_dst_json = os.path.join(working_dir, "assets.json")
payload = {"schema_version": self.ASSET_SCHEMA_VERSION, "assets": self.asset_registry}
tmp_fd, tmp_path = tempfile.mkstemp(dir=working_dir)
os.close(tmp_fd)
torch.save(payload, tmp_path)
os.replace(tmp_path, registry_dst_pt)
tmp_fd, tmp_path = tempfile.mkstemp(dir=working_dir)
os.close(tmp_fd)
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(self.asset_registry, f)
os.replace(tmp_path, registry_dst_json)
assets_to_mount = required_assets if required_assets else list(self.asset_registry.keys())
for alias in assets_to_mount:
entry = self.asset_registry.get(alias)
if not entry:
continue
src = entry["source_path"]
target = os.path.join(working_dir, os.path.basename(src))
if not os.path.exists(target):
try:
self.file_manager.hardlink_or_copy(src, target)
except Exception:
shutil.copy2(src, target)
def _inject_loader_into_env(self, env_src_dir: str):
src = os.path.join(self.file_manager.global_dir, "_codebox_loader.py")
dst = os.path.join(env_src_dir, "_codebox_loader.py")
if not os.path.exists(dst):
shutil.copy2(src, dst)
def _get_python_bin(self, venv_path: str) -> str:
if os.name == 'nt':
return os.path.join(venv_path, "Scripts", "python.exe")
return os.path.join(venv_path, "bin", "python")
def create_venv(self, venv_id: str, requirements: List[str] = None) -> str:
venv_path = os.path.join(self.envs_dir, venv_id)
if venv_id not in self.env_dic:
subprocess.run([sys.executable, "-m", "venv", venv_path], check=True)
os.makedirs(os.path.join(venv_path, "src"), exist_ok=True)
self.env_dic[venv_id] = {"path": venv_path, "packages": []}
# inject loader template into src
self._inject_loader_into_env(os.path.join(venv_path, "src"))
if requirements:
self.install_packages(venv_id, requirements)
return venv_path
def install_packages(self, venv_id: str, packages: List[str]):
if venv_id not in self.env_dic:
self.create_venv(venv_id)
python_bin = self._get_python_bin(self.env_dic[venv_id]["path"])
cmd = [python_bin, "-m", "pip", "install", "--quiet"] + packages
subprocess.run(cmd, check=True)
existing = set(self.env_dic[venv_id]["packages"])
for pkg in packages:
if pkg not in existing:
self.env_dic[venv_id]["packages"].append(pkg)
def export_venv(self, venv_id: str) -> str:
if venv_id not in self.env_dic:
raise ValueError(f"Environment '{venv_id}' does not exist.")
python_bin = self._get_python_bin(self.env_dic[venv_id]["path"])
res = subprocess.run([python_bin, "-m", "pip", "freeze"], capture_output=True, text=True)
manifest = {
"venv_id": venv_id,
"pip_freeze": res.stdout.splitlines(),
"metadata": {k: v for k, v in self.env_dic[venv_id].items() if k != "path"}
}
return json.dumps(manifest, indent=2)
def import_venv(self, manifest_json: str):
manifest = json.loads(manifest_json)
venv_id = manifest["venv_id"]
self.create_venv(venv_id, requirements=manifest["pip_freeze"])
def _execute_supervised(self, python_bin: str, script_path: str, working_dir: str,
timeout: Optional[int] = 30, max_ram_mb: int = 4096,
required_assets: Optional[List[str]] = None) -> Dict[str, Any]:
self._sync_required_assets(working_dir, required_assets)
env_vars = os.environ.copy()
env_vars["PYTHONPATH"] = working_dir
env_vars["CODEBOX_ASSETS_DIR"] = working_dir
proc = subprocess.Popen(
[python_bin, script_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=working_dir,
env=env_vars,
text=True
)
alarms = []
MAX_CPU = 95.0
def monitor():
try:
p = psutil.Process(proc.pid)
while proc.poll() is None:
mem_mb = p.memory_info().rss / (1024 * 1024)
cpu = p.cpu_percent(interval=0.2)
if mem_mb > max_ram_mb:
alarms.append(f"RESOURCE KILL: Memory usage exceeded ({mem_mb:.1f}MB > {max_ram_mb}MB)")
proc.kill()
break
if cpu > MAX_CPU:
alarms.append(f"RESOURCE WARNING: Sustained CPU spike ({cpu}%)")
except psutil.NoSuchProcess:
pass
mon_thread = threading.Thread(target=monitor, daemon=True)
mon_thread.start()
try:
stdout, stderr = proc.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
proc.kill()
stdout, stderr = proc.communicate()
alarms.append(f"RESOURCE KILL: Code execution timed out ({timeout}s limit).")
return {
"stdout": stdout.strip(),
"stderr": stderr.strip(),
"success": proc.returncode == 0 and not any("KILL" in a for a in alarms),
"exit_code": proc.returncode,
"technical_alarms": alarms
}
def run_code(self, code_block: str, venv_id: str = "default", requirements: List[str] = None,
timeout: Optional[int] = 30, max_ram_mb: int = 4096,
required_assets: Optional[List[str]] = None) -> Dict[str, Any]:
is_temp = False
if requirements:
venv_id = f"temp_{int(time.time())}"
self.create_venv(venv_id, requirements)
is_temp = True
elif venv_id not in self.env_dic:
self.create_venv(venv_id)
env_meta = self.env_dic[venv_id]
src_dir = os.path.join(env_meta["path"], "src")
python_bin = self._get_python_bin(env_meta["path"])
temp_script = os.path.join(src_dir, f"_run_{int(time.time())}.py")
with open(temp_script, 'w', encoding='utf-8') as f:
f.write(code_block)
try:
result = self._execute_supervised(python_bin, temp_script, src_dir, timeout, max_ram_mb, required_assets)
finally:
if os.path.exists(temp_script):
os.remove(temp_script)
if is_temp:
shutil.rmtree(env_meta["path"], ignore_errors=True)
del self.env_dic[venv_id]
return result
def save_script(self, code_id: str, venv_id: str, source_code: str):
if venv_id not in self.env_dic:
self.create_venv(venv_id)
src_dir = os.path.join(self.env_dic[venv_id]["path"], "src")
filepath = os.path.join(src_dir, f"{code_id}.py")
with open(filepath, 'w', encoding='utf-8') as f:
f.write(source_code)
self.code_bank[code_id] = {"venv_id": venv_id, "filepath": filepath}
self._inject_loader_into_env(src_dir)
def call_function(self, code_id: str = None, function_call: str = None, function_map: Dict[str, Any] = None,
timeout: Optional[int] = 30, max_ram_mb: int = 4096,
required_assets: Optional[List[str]] = None) -> Dict[str, Any]:
if function_map:
first_step = list(function_map.values())[0]
venv_id = self.code_bank[first_step["code_id"]]["venv_id"]
env_meta = self.env_dic[venv_id]
src_dir = os.path.join(env_meta["path"], "src")
python_bin = self._get_python_bin(env_meta["path"])
lines = ["import json\nimport sys\ncontext = {}\n"]
for step, data in function_map.items():
c_id, f_name = data["code_id"], data["function"]
args = data.get("args", {})
out_var = data.get("output_var", f"out_{step}")
lines.append(f"import {c_id}")
arg_strs = []
for k, v in args.items():
if isinstance(v, str) and v.startswith("$"):
arg_strs.append(f"{k}=context['{v[1:]}']")
else:
arg_strs.append(f"{k}={repr(v)}")
lines.append(f"try:\n context['{out_var}'] = {c_id}.{f_name}({', '.join(arg_strs)})")
lines.append(f"except Exception as e:\n print(f'Pipeline failed at {step}: {{e}}', file=sys.stderr)\n sys.exit(1)\n")
lines.append("print(json.dumps(context))")
wrapper_code = "\n".join(lines)
wrapper_path = os.path.join(src_dir, "_dag_runner.py")
with open(wrapper_path, 'w', encoding='utf-8') as f:
f.write(wrapper_code)
try:
result = self._execute_supervised(python_bin, wrapper_path, src_dir, timeout, max_ram_mb, required_assets)
finally:
if os.path.exists(wrapper_path):
os.remove(wrapper_path)
return result
elif code_id and function_call:
import ast
try:
tree = ast.parse(function_call)
expr = tree.body[0].value
if not isinstance(expr, ast.Call):
raise ValueError("Target signature is not a valid call statement.")
func_name = expr.func.id
extracted_args = {}
for keyword in expr.keywords:
extracted_args[keyword.arg] = ast.literal_eval(keyword.value)
except Exception as e:
raise ValueError(f"AST Function Call Parser failed on expression matching: {e}")
macro_map = {
"step_1": {
"code_id": code_id,
"function": func_name,
"args": extracted_args,
"output_var": "result"
}
}
return self.call_function(function_map=macro_map, timeout=timeout, max_ram_mb=max_ram_mb, required_assets=required_assets)
else:
raise ValueError("Provide either code_id + function_call, or function_map")
def get_runner(self, key: str):
return self._runner_cache.get(key)
def put_runner(self, key: str, runner_obj):
self._runner_cache.put(key, runner_obj)
def prune_cache(self, days_unused: int = 7, logger=None):
cutoff = time.time() - days_unused * 86400
assets_to_keep = {self.ASSETS_FILENAME, "_codebox_loader.py"}
try:
with os.scandir(self.file_manager.global_dir) as it:
for entry in it:
try:
if entry.is_dir():
continue
name = entry.name
if name in assets_to_keep:
continue
mtime = entry.stat().st_mtime
if mtime < cutoff:
try:
os.remove(entry.path)
except Exception as e:
if logger:
logger.warning("Failed to remove %s: %s", entry.path, e)
except FileNotFoundError:
continue
except PermissionError:
if logger:
logger.warning("Permission denied pruning %s", entry.path)
continue
except Exception as e:
if logger:
logger.error("Prune cache failed for %s: %s", self.file_manager.global_dir, e)
def resolve_asset_path(self, alias: str) -> str:
entry = self.asset_registry.get(alias)
if not entry:
raise AssetNotFoundError(alias)
return entry["source_path"]
class Box:
"""
Persistent wrapper around CodeBox.
Resolution order:
1. box_location argument
2. AppData config location
3. Create new CodeBox.pt
"""
APP_NAME = "CodeBox"
CONFIG_FILE = "box_config.json"
DEFAULT_MODEL_FILE = "CodeBox.pt"
def __init__(
self,
box_location: Optional[str] = None,
base_dir: Optional[str] = None,
runner_cache_capacity: int = 2,
):
self._model_path = self._resolve_model_path(box_location)
if os.path.exists(self._model_path):
self.model = torch.load(self._model_path)
else:
self.model = CodeBox(
base_dir=base_dir or self._default_storage_dir(),
runner_cache_capacity=runner_cache_capacity,
)
self.save()
@classmethod
def _appdata_dir(cls):
if os.name == "nt":
root = os.getenv("APPDATA")
else:
root = os.path.expanduser("~/.config")
path = os.path.join(root, cls.APP_NAME)
os.makedirs(path, exist_ok=True)
return path
@classmethod
def _config_path(cls):
return os.path.join(cls._appdata_dir(), cls.CONFIG_FILE)
@classmethod
def _default_storage_dir(cls):
path = os.path.join(cls._appdata_dir(), "storage")
os.makedirs(path, exist_ok=True)
return path
@classmethod
def _default_model_path(cls):
return os.path.join(cls._appdata_dir(), cls.DEFAULT_MODEL_FILE)
def _resolve_model_path(self, box_location):
if box_location:
path = os.path.abspath(box_location)
self._write_config(path)
return path
config = self._read_config()
if config:
saved_path = config.get("box_location")
if saved_path and os.path.exists(saved_path):
return saved_path
path = self._default_model_path()
self._write_config(path)
return path
def _read_config(self):
cfg = self._config_path()
if not os.path.exists(cfg):
return {}
try:
with open(cfg, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def _write_config(self, model_path):
cfg = self._config_path()
with open(cfg, "w", encoding="utf-8") as f:
json.dump(
{
"box_location": os.path.abspath(model_path)
},
f,
indent=2,
)
def save(self):
torch.save(self.model, self._model_path)
def save_as(self, path):
path = os.path.abspath(path)
torch.save(self.model, path)
self._model_path = path
self._write_config(path)
@classmethod
def load(cls, path):
return cls(box_location=path)
def register_asset(self, *args, **kwargs):
result = self.model.register_asset(*args, **kwargs)
self.save()
return result
def unregister_asset(self, *args, **kwargs):
result = self.model.unregister_asset(*args, **kwargs)
self.save()
return result
def create_venv(self, *args, **kwargs):
result = self.model.create_venv(*args, **kwargs)
self.save()
return result
def install_packages(self, *args, **kwargs):
result = self.model.install_packages(*args, **kwargs)
self.save()
return result
def save_script(self, *args, **kwargs):
result = self.model.save_script(*args, **kwargs)
self.save()
return result
def put_runner(self, *args, **kwargs):
result = self.model.put_runner(*args, **kwargs)
self.save()
return result
def __getattr__(self, name):
return getattr(self.model, name)
def __contains__(self, alias):
return alias in self.model.asset_registry
def __len__(self):
return len(self.model.asset_registry)
def __repr__(self):
return (
f"Box("
f"assets={len(self.model.asset_registry)}, "
f"envs={len(self.model.env_dic)}, "
f"path='{self._model_path}')"
)
def close(self):
try:
self.model._runner_cache.clear()
except Exception:
pass
self.save()
def __del__(self):
try:
self.close()
except Exception:
pass
class TextModelBundle:
def __init__(self, model_dir: str = MODEL_DIR):
self.model_dir = model_dir
self.embedding_path = os.path.join(model_dir, "all-MiniLM-L6-v2")
self.summarizer_path = os.path.join(model_dir, "distilbart-cnn-12-6")
self.spacy_model_path = os.path.join(model_dir, "spacy", "en_core_web_sm")
self.spacy_model_name = SPACY_MODEL_NAME
self.embedding_model = self._load_embeddings()
self.summarizer_model, self.tokenizer = self._load_summarizer()
self.nlp = self._load_spacy()
def _load_embeddings(self):
if os.path.exists(self.embedding_path) and os.path.isdir(self.embedding_path):
return SentenceTransformer(self.embedding_path)
model = SentenceTransformer("all-MiniLM-L6-v2")
model.save(self.embedding_path)
return model
def _load_summarizer(self):
if os.path.exists(self.summarizer_path) and os.path.isdir(self.summarizer_path):
model = AutoModelForSeq2SeqLM.from_pretrained(self.summarizer_path)
tokenizer = AutoTokenizer.from_pretrained(self.summarizer_path)
return model, tokenizer
model = AutoModelForSeq2SeqLM.from_pretrained("sshleifer/distilbart-cnn-12-6")
tokenizer = AutoTokenizer.from_pretrained("sshleifer/distilbart-cnn-12-6")
model.save_pretrained(self.summarizer_path)
tokenizer.save_pretrained(self.summarizer_path)
return model, tokenizer
def _load_spacy(self):
try:
if os.path.exists(self.spacy_model_path):
return spacy.load(self.spacy_model_path)
return spacy.load(self.spacy_model_name)
except Exception:
nlp = spacy.blank("en")
if "sentencizer" not in nlp.pipe_names:
nlp.add_pipe("sentencizer")
return nlp
def embed(self, texts, convert_to_tensor=True):
return self.embedding_model.encode(texts, convert_to_tensor=convert_to_tensor)
def generate_summary(self, text: str, max_length: int = 300, min_length: int = 50) -> str:
text = _normalize_whitespace(text)
if not text:
return ""
inputs = self.tokenizer(
text,
return_tensors="pt",
truncation=True,
max_length=1024,
)
device = next(self.summarizer_model.parameters()).device
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
summary_ids = self.summarizer_model.generate(
**inputs,
max_length=max_length,
min_length=min_length,
num_beams=4,
early_stopping=True,
)
return self.tokenizer.decode(summary_ids[0], skip_special_tokens=True)
def chunk_text_by_context(text: str, bundle: TextModelBundle, num_chunks: int = 20) -> List[str]:
sentences = split_sentences(text)
if not sentences:
return []
if len(sentences) == 1:
return [sentences[0]]
embeddings = bundle.embed(sentences)
if embeddings.ndim != 2 or embeddings.shape[0] <= 1:
return [" ".join(sentences)]
n_samples, n_features = embeddings.shape
n_components = max(1, min(num_chunks, n_samples, n_features))
reduced = PCA(n_components=n_components).fit_transform(embeddings)
k = min(num_chunks, n_samples)
clustering = AgglomerativeClustering(n_clusters=k)
labels = clustering.fit_predict(reduced)
chunks: Dict[int, List[str]] = {}
for sent, lbl in zip(sentences, labels):
chunks.setdefault(int(lbl), []).append(sent)
return [" ".join(chunks[i]) for i in sorted(chunks.keys())]
def safe_summarize_iterative(bundle: TextModelBundle, text: str, max_length: int = 500, min_length: int = 300, overlap: int = 100) -> str:
text = _normalize_whitespace(text)
if not text:
return ""
word_threshold = max_length
if len(text.split()) <= word_threshold:
return text
token_ids = bundle.tokenizer.encode(text, add_special_tokens=False)
if len(token_ids) <= 1024:
try:
return bundle.generate_summary(text, max_length=max_length, min_length=min_length)
except Exception:
return text
sentences = split_sentences(text)
if len(sentences) > 1:
mid = len(sentences) // 2
left = " ".join(sentences[:mid])
right = " ".join(sentences[mid:])
a = safe_summarize_iterative(bundle, left, max_length, min_length, overlap)
b = safe_summarize_iterative(bundle, right, max_length, min_length, overlap)
return _normalize_whitespace(f"{a} {b}")
chunks = []
start = 0
while start < len(token_ids):
end = min(start + 1024, len(token_ids))
chunk_tokens = token_ids[start:end]
chunk_text = bundle.tokenizer.decode(
chunk_tokens,
skip_special_tokens=True,
clean_up_tokenization_spaces=True,
)
chunks.append(chunk_text)
if end == len(token_ids):
break
start = end - overlap
summaries = []
for chunk in chunks:
chunk = _normalize_whitespace(chunk)
if not chunk:
continue
if len(chunk.split()) <= word_threshold:
summaries.append(chunk)
continue
try:
summaries.append(bundle.generate_summary(chunk, max_length=max_length, min_length=min_length))
except Exception:
summaries.append(chunk)
combined = _normalize_whitespace(" ".join(summaries))
if not combined:
return text
if len(combined.split()) <= word_threshold:
return combined
return safe_summarize_iterative(bundle, combined, max_length, min_length, overlap)
def safe_summarize(bundle: TextModelBundle, text: str, max_length: int = 750, min_length: int = 500, overlap: int = 250, depth: int = 0, max_depth: int = 15) -> str:
text = _normalize_whitespace(text)
if not text:
return ""
if depth > max_depth:
return safe_summarize_iterative(bundle, text, max_length=max_length, min_length=min_length, overlap=overlap)
if len(text.split()) <= max_length:
return text
token_ids = bundle.tokenizer.encode(text, add_special_tokens=False)
if len(token_ids) <= 1024:
try:
return bundle.generate_summary(text, max_length=max_length, min_length=min_length)
except Exception:
return text
sentences = split_sentences(text)
if len(sentences) > 1:
mid = len(sentences) // 2
left = " ".join(sentences[:mid])
right = " ".join(sentences[mid:])
try:
s1 = safe_summarize(bundle, left, max_length, min_length, overlap, depth + 1, max_depth)
s2 = safe_summarize(bundle, right, max_length, min_length, overlap, depth + 1, max_depth)
return safe_summarize(bundle, f"{s1} {s2}", max_length, min_length, overlap, depth + 1, max_depth)
except RecursionError:
return safe_summarize_iterative(bundle, text, max_length=max_length, min_length=min_length, overlap=overlap)
pieces = []
start = 0
while start < len(token_ids):
end = min(start + 1024, len(token_ids))
chunk_tokens = token_ids[start:end]
chunk_text = bundle.tokenizer.decode(
chunk_tokens,
skip_special_tokens=True,
clean_up_tokenization_spaces=True,
)
pieces.append(chunk_text)
if end == len(token_ids):
break
start = end - overlap
chunk_summaries = []
for piece in pieces:
piece = _normalize_whitespace(piece)
if not piece:
continue
if len(piece.split()) <= max_length:
chunk_summaries.append(piece)
else:
try:
chunk_summaries.append(bundle.generate_summary(piece, max_length=max_length, min_length=min_length))
except Exception:
chunk_summaries.append(piece)
combined = _normalize_whitespace(" ".join(chunk_summaries))
if not combined:
return text
if len(combined.split()) <= max_length:
return combined
return safe_summarize(bundle, combined, max_length, min_length, overlap, depth + 1, max_depth)
def summarize_relevant_clusters(bundle: TextModelBundle, input_query: str, texts: List[str], similarity_threshold: Optional[float] = None, num_clusters: int = 12) -> List[str]:
texts = [_normalize_whitespace(t) for t in texts if _normalize_whitespace(t)]
if not texts:
return []
if len(texts) == 1:
return [safe_summarize(bundle, texts[0])]
embeddings = bundle.embed(texts, convert_to_tensor=True)
sim_matrix = util.pytorch_cos_sim(embeddings, embeddings)
if similarity_threshold is None:
if sim_matrix.size(0) > 1:
idx = torch.triu_indices(sim_matrix.size(0), sim_matrix.size(1), offset=1)
similarities = sim_matrix[idx[0], idx[1]]
similarity_threshold = similarities.mean().item() if similarities.numel() > 0 else 0.85
else:
similarity_threshold = 0.85
keep_indices = []
for i in range(len(texts)):
if not any(float(sim_matrix[i][j].item()) > similarity_threshold for j in keep_indices):
keep_indices.append(i)
dedup_texts = [texts[i] for i in keep_indices]
if not dedup_texts:
dedup_texts = texts[:]
dedup_embeddings = embeddings[keep_indices] if keep_indices else embeddings
n_clusters = min(num_clusters, len(dedup_texts))
n_clusters = max(1, n_clusters)
if len(dedup_texts) == 1:
return [safe_summarize(bundle, dedup_texts[0])]
kmeans = KMeans(n_clusters=n_clusters, random_state=0, n_init="auto")
labels = kmeans.fit_predict(dedup_embeddings.cpu().numpy())
clusters: Dict[int, List[str]] = {}
for idx, lbl in enumerate(labels):
clusters.setdefault(int(lbl), []).append(dedup_texts[idx])
try:
doc = bundle.nlp(input_query)
keywords = {token.lemma_.lower() for token in doc if getattr(token, "pos_", "") in ("NOUN", "VERB", "ADJ", "PROPN")}
except Exception:
keywords = set()
def is_cluster_relevant(cluster_texts: List[str]) -> bool:
if not keywords:
return True
joined = " ".join(cluster_texts).lower()
return any(k in joined for k in keywords)
relevant_clusters = [c for c in clusters.values() if is_cluster_relevant(c)]
if not relevant_clusters:
relevant_clusters = list(clusters.values())
cluster_summaries = []
for cluster in relevant_clusters:
combined = _normalize_whitespace(" ".join(cluster))
if combined:
cluster_summaries.append(safe_summarize(bundle, combined))
if not cluster_summaries:
return [safe_summarize(bundle, " ".join(dedup_texts))]
final_text = _normalize_whitespace(" ".join(cluster_summaries))
return [safe_summarize(bundle, final_text)]
@dataclass
class PageCandidate:
title: str
url: str
snippet: str = ""
rank: float = 0.0
@dataclass
class CrawlResult:
query: str
answer: str = ""
partial_texts: List[str] = field(default_factory=list)
used_candidates: List[str] = field(default_factory=list)
failed_candidates: List[str] = field(default_factory=list)
fallback_used: bool = False
elapsed_seconds: float = 0.0
error: Optional[str] = None
class CrawlWorker:
def __init__(self):
self.bundle = TextModelBundle()
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)
self.session = requests.Session()
self.session.headers.update({
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
})
def close(self):
try:
self.executor.shutdown(wait=False, cancel_futures=True)
except Exception:
pass
try:
self.session.close()
except Exception:
pass
def run_query(self, query: str, stats: Optional[dict] = None) -> CrawlResult:
t0 = time.perf_counter()
result = CrawlResult(query=query)
try:
candidates = self.build_candidates(query)
if not candidates:
result.answer = f"No results found for: {query}"
result.fallback_used = True
return result
target_successes = 5
successful_texts: List[str] = []
fallback_snippets: List[str] = []
for idx, cand in enumerate(candidates):
if len(successful_texts) >= target_successes:
break
text = self.fetch_and_extract(cand.url, page_timeout=20)
if text and len(text.strip()) >= 80:
successful_texts.append(_normalize_whitespace(text))
result.used_candidates.append(cand.url)
else:
result.failed_candidates.append(cand.url)
if cand.snippet:
fallback_snippets.append(f"{cand.title}. {cand.snippet}".strip())
if len(successful_texts) < target_successes:
for cand in candidates[len(successful_texts) + len(result.failed_candidates):]:
if len(successful_texts) >= target_successes:
break
if cand.url in result.used_candidates or cand.url in result.failed_candidates:
continue
text = self.fetch_and_extract(cand.url, page_timeout=20)
if text and len(text.strip()) >= 80:
successful_texts.append(_normalize_whitespace(text))
result.used_candidates.append(cand.url)
else:
result.failed_candidates.append(cand.url)
if cand.snippet:
fallback_snippets.append(f"{cand.title}. {cand.snippet}".strip())
if successful_texts:
merged = " ".join(successful_texts)
chunks = chunk_text_by_context(merged, self.bundle, num_chunks=min(8, max(2, len(successful_texts))))
summary_list = summarize_relevant_clusters(self.bundle, query, chunks, similarity_threshold=None, num_clusters=min(8, len(chunks)))
answer = _normalize_whitespace(" ".join(summary_list))
result.answer = answer if answer else _normalize_whitespace(merged)
result.partial_texts = successful_texts
return result
snippet_text = _normalize_whitespace(" ".join(fallback_snippets))
if snippet_text:
result.answer = snippet_text
result.fallback_used = True
return result
ranked_text = _normalize_whitespace(" ".join(f"{c.title}. {c.snippet}".strip() for c in candidates[:5]))
result.answer = ranked_text if ranked_text else f"No usable text found for: {query}"
result.fallback_used = True
return result
except Exception as e:
result.error = f"{e}\n{traceback.format_exc()}"
result.answer = result.answer or f"[ERROR] {e}"
return result
finally:
result.elapsed_seconds = time.perf_counter() - t0
if stats is not None:
stats.setdefault("runs", []).append({
"query": query,
"seconds": round(result.elapsed_seconds, 3),
"used_pages": len(result.used_candidates),
"failed_pages": len(result.failed_candidates),
"fallback_used": result.fallback_used,
"error": result.error,
})
def build_candidates(self, query: str, num_results: int = 15) -> List[PageCandidate]:
raw = []
raw.extend(self.duckduckgo_search(query, num_results=num_results))
raw.extend(self.resulthunter_search(query, num_results=num_results))
raw.extend(self.google_search(query, num_results=num_results))
deduped: List[PageCandidate] = []
seen = set()
scored = []
for title, url, snippet in raw:
norm = self.normalize_url(url)
if not norm or norm in seen:
continue
seen.add(norm)
scored.append(PageCandidate(title=title or norm, url=url, snippet=snippet or ""))
if not scored:
return []
titles_and_snippets = [f"{c.title} {c.snippet}".strip() for c in scored]
query_emb = self.bundle.embed([query], convert_to_tensor=True)[0]
page_embs = self.bundle.embed(titles_and_snippets, convert_to_tensor=True)
sim_scores = util.cos_sim(query_emb, page_embs)[0]
order = sim_scores.argsort(descending=True).tolist()
ordered = [scored[i] for i in order]
return ordered[:15]
def fetch_and_extract(self, url: str, page_timeout: int = 20) -> str:
url = self.normalize_url(url)
if not url:
return ""
future = self.executor.submit(self._fetch_and_extract_sync, url)
try:
return future.result(timeout=page_timeout) or ""
except concurrent.futures.TimeoutError:
return ""
except Exception:
return ""
def _fetch_and_extract_sync(self, url: str) -> str:
try:
r = self.session.get(url, timeout=(8, 15), allow_redirects=True)
r.raise_for_status()
except Exception:
return ""
content_type = r.headers.get("Content-Type", "").lower()
if "application/pdf" in content_type or url.lower().endswith(".pdf"):
text = self._extract_pdf_bytes(r.content)
return _normalize_whitespace(text)
html = r.text or ""
text = self.universal_page_parser(url, html, response=r, use_browser=False)
return _normalize_whitespace(text)
def _extract_pdf_bytes(self, pdf_bytes: bytes) -> str:
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp:
tmp.write(pdf_bytes)
pdf_path = tmp.name
doc = fitz.open(pdf_path)
text = " ".join(page.get_text() for page in doc)
doc.close()
try:
os.remove(pdf_path)
except Exception:
pass
return text
except Exception:
return ""
def universal_page_parser(self, url: str, html: str, response=None, use_browser: bool = False) -> str:
text = ""
if YouTubeTranscriptApi is not None and ("youtu" in url.lower()):
video_id = self._extract_youtube_id(url)
if video_id:
try:
transcript = YouTubeTranscriptApi.get_transcript(video_id)
text = " ".join(entry["text"] for entry in transcript).strip()
if text:
return text
except Exception:
pass
if trafilatura is not None:
try:
t = trafilatura.extract(html, include_comments=False, include_tables=True)
if t and len(t.strip()) > 80:
return t.strip()
except Exception:
pass
if extractors is not None:
try:
boilerpy_text = extractors.ArticleExtractor().get_content(html)
if boilerpy_text and len(boilerpy_text.strip()) > 80:
return boilerpy_text.strip()
except Exception:
pass
if Document is not None:
try:
doc = Document(html)
soup = BeautifulSoup(doc.summary(), "html.parser")
text_readability = soup.get_text(" ", strip=True)
if text_readability and len(text_readability.strip()) > 80:
return text_readability.strip()
except Exception:
pass
if Article is not None:
try:
article = Article(url)
article.set_html(html)
article.parse()
text_newspaper = article.text or ""
if text_newspaper and len(text_newspaper.strip()) > 80:
return text_newspaper.strip()
except Exception:
pass
if Goose is not None:
try:
goose_text = Goose().extract(raw_html=html).cleaned_text
if goose_text and len(goose_text.strip()) > 80:
return goose_text.strip()
except Exception:
pass
if inscriptis_text is not None:
try:
inscriptis_parsed = inscriptis_text(html)
if inscriptis_parsed and len(inscriptis_parsed.strip()) > 80:
return inscriptis_parsed.strip()
except Exception:
pass
if lxml_html is not None:
try:
lxml_tree = lxml_html.fromstring(html)
lxml_text = " ".join(lxml_tree.xpath("//p//text()"))
if lxml_text and len(lxml_text.strip()) > 80:
return lxml_text.strip()
except Exception:
pass
try:
soup = BeautifulSoup(html, "html.parser")
for tag in soup(["script", "style", "noscript", "header", "footer", "nav", "aside", "form", "input", "button", "svg", "canvas", "iframe", "object", "embed", "img", "video", "audio"]):
tag.decompose()
tags = ["p", "li", "span", "div", "h1", "h2", "h3", "h4", "h5", "h6"]
bs_text = " ".join(t.get_text(" ", strip=True) for tag in tags for t in soup.find_all(tag))
if bs_text and len(bs_text.strip()) > 80:
return bs_text.strip()
except Exception:
pass
try:
soup = BeautifulSoup(html, "html.parser")
for tag in soup(["script", "style", "noscript", "header", "footer", "nav", "aside", "form", "input", "button", "svg", "canvas", "iframe", "object", "embed", "img", "video", "audio"]):
tag.decompose()
visible_text = soup.get_text(" ", strip=True)
if visible_text and len(visible_text.strip()) > 50:
return visible_text.strip()
except Exception:
pass
return ""
def _extract_youtube_id(self, url: str) -> Optional[str]:
patterns = [
r"(?:v=)([A-Za-z0-9_-]{11})",
r"youtu\.be/([A-Za-z0-9_-]{11})",
r"youtube\.com/shorts/([A-Za-z0-9_-]{11})",
r"youtube\.com/embed/([A-Za-z0-9_-]{11})",
]
for pattern in patterns:
m = re.search(pattern, url)
if m:
return m.group(1)
return None
def normalize_url(self, url: str) -> str:
if not url:
return ""
if "resulthunter.com" in url:
qs = parse_qs(urlparse(url).query)
if "url" in qs:
return unquote(qs["url"][0])
if url.startswith("/videos/watch/"):
parsed = urlparse(url)
path_parts = parsed.path.split("/")
if len(path_parts) >= 4:
video_id = path_parts[3]
if len(video_id) >= 11:
return f"https://www.youtube.com/watch?v={video_id}"
if "duckduckgo.com/l/" in url:
qs = parse_qs(urlparse(url).query)
if "uddg" in qs and qs["uddg"]:
return unquote(qs["uddg"][0])
return url if url.startswith("http") else f"https://{url.lstrip('/')}"
def is_ad_link(self, url: str) -> bool:
ad_keywords = ["advert", "ads", "doubleclick", "sponsor", "promo"]
return any(term in (url or "").lower() for term in ad_keywords)
# ---------------- search engines ----------------
def duckduckgo_search(self, query: str, num_results: int = 10):
url = "https://html.duckduckgo.com/html/"
data = {"q": query}
try:
r = self.session.post(url, data=data, timeout=(8, 20))
r.raise_for_status()
except requests.RequestException:
return []
soup = BeautifulSoup(r.text, "html.parser")
results = []
for result in soup.select("div.result"):
title_a = result.select_one("a.result__url") or result.select_one("a.result__a")
snippet_a = result.select_one("a.result__snippet") or result.select_one("div.result__snippet")
if not title_a:
continue
title = title_a.get_text(strip=True)
href = title_a.get("href", "")
if href.startswith("//duckduckgo.com/l/?uddg="):
href = unquote(href.split("uddg=")[1].split("&")[0])
snippet = snippet_a.get_text(strip=True) if snippet_a else ""
results.append((title, href, snippet))
if len(results) >= num_results:
break
return results
def resulthunter_search(self, query: str, num_results: int = 10):
encoded_query = quote(query)
url = f"https://www.resulthunter.com/search?q={encoded_query}"
try:
r = self.session.get(url, timeout=(8, 15))
r.raise_for_status()
except requests.RequestException:
return []
soup = BeautifulSoup(r.text, "html.parser")
results = []
result_divs = soup.find_all("div", class_="web-result")
if not result_divs:
result_divs = soup.find_all("div", class_=lambda c: c and "result" in c.lower())
for result in result_divs:
link_tag = result.find("a", href=True)
if not link_tag:
continue
title = link_tag.get_text(strip=True)
link = link_tag["href"]
if not link.startswith("http"):
continue
snippet_tag = result.find("p", class_="web-result-desc") or result.find("p")
snippet = snippet_tag.get_text(strip=True) if snippet_tag else ""
results.append((title, link, snippet))
if len(results) >= num_results:
break
return results
def google_search(self, query: str, num_results: int = 10):
encoded_query = quote(query)
url = f"https://www.google.com/search?q={encoded_query}&num={num_results + 5}"
try:
r = self.session.get(url, timeout=(8, 15))
r.raise_for_status()
except requests.RequestException:
return []
soup = BeautifulSoup(r.text, "html.parser")
results = []
for g in soup.select("div.g"):
title_el = g.select_one("h3")
link_el = g.select_one("a[href]")
if not (title_el and link_el):
continue
title = title_el.get_text(strip=True)
href = link_el["href"]
snippet_el = g.select_one("div.VwiC3b")
if snippet_el:
snippet = snippet_el.get_text(strip=True)
else:
snippet = g.get_text(separator=" ", strip=True).replace(title, "", 1).strip()
results.append((title, href, snippet))
if len(results) >= num_results:
break
return results
def _worker_main(command_queue: Queue, response_queue: Queue):
worker = CrawlWorker()
try:
while True:
msg = command_queue.get()
if not msg:
continue
mtype = msg.get("type")
if mtype == "shutdown":
response_queue.put({"type": "shutdown_ack"})
break
if mtype != "query":
continue
req_id = msg["request_id"]
query = msg["query"]
stats = msg.get("stats")
result = worker.run_query(query, stats=stats)
response_queue.put({
"type": "result",
"request_id": req_id,
"result": result,
})
finally:
worker.close()
class WebSearchModule(nn.Module):
def __init__(self, model_dir: str = MODEL_DIR):
super().__init__()
self.model_dir = model_dir
self.bundle = TextModelBundle(model_dir=model_dir)
self._worker = None
self._command_queue = None
self._response_queue = None
self._worker_ctx = None
self._worker_started = False
def __getstate__(self):
state = self.__dict__.copy()
state["_worker"] = None
state["_command_queue"] = None
state["_response_queue"] = None
state["_worker_ctx"] = None
state["_worker_started"] = False
return state
def __setstate__(self, state):
self.__dict__.update(state)
self._worker = None
self._command_queue = None
self._response_queue = None
self._worker_ctx = None
self._worker_started = False
def _ensure_worker(self):
if self._worker is not None and self._worker.is_alive():
return
self._worker_ctx = get_context("spawn")
self._command_queue = self._worker_ctx.Queue()
self._response_queue = self._worker_ctx.Queue()
self._worker = self._worker_ctx.Process(
target=_worker_main,
args=(self._command_queue, self._response_queue),
daemon=True,
)
self._worker.start()
self._worker_started = True
def close(self):
if getattr(self, "_worker", None) is None:
return
try:
if self._worker.is_alive() and self._command_queue is not None:
self._command_queue.put({"type": "shutdown"})
try:
if self._response_queue is not None:
self._response_queue.get(timeout=10)
except Exception:
pass
self._worker.join(timeout=10)
if self._worker.is_alive():
self._worker.terminate()
self._worker.join(timeout=5)
finally:
try:
if self._command_queue is not None:
self._command_queue.close()
except Exception:
pass
try:
if self._response_queue is not None:
self._response_queue.close()
except Exception:
pass
self._worker = None
self._command_queue = None
self._response_queue = None
self._worker_ctx = None
self._worker_started = False
def __del__(self):
try:
self.close()
except Exception:
pass
def embed(self, texts, convert_to_tensor=True):
return self.bundle.embed(texts, convert_to_tensor=convert_to_tensor)
def summarize(self, text: str, max_length: int = 300, min_length: int = 50):
return self.bundle.generate_summary(text, max_length=max_length, min_length=min_length)
def forward(self, query: str, stats: Optional[dict] = None, timeout: Optional[float] = None) -> str:
self._ensure_worker()
req_id = str(uuid.uuid4())
if stats is not None:
stats.setdefault("spiders_created", 0)
stats.setdefault("spiders_completed", 0)
stats.setdefault("spiders_killed", 0)
stats.setdefault("total_seconds", 0.0)
stats.setdefault("runs", [])
stats["spiders_created"] += 1
self._command_queue.put({
"type": "query",
"request_id": req_id,
"query": query,
"stats": stats,
})
started = time.perf_counter()
effective_timeout = timeout if timeout is not None else None
while True:
if effective_timeout is not None and (time.perf_counter() - started) > effective_timeout:
return f"Query is still running in the worker process for: {query}"
try:
msg = self._response_queue.get(timeout=0.25)
except queue.Empty:
continue
except Exception:
continue
if msg.get("type") != "result" or msg.get("request_id") != req_id:
continue
result: CrawlResult = msg["result"]
if stats is not None:
stats["spiders_completed"] += 1
stats["total_seconds"] += result.elapsed_seconds
stats["runs"].append({
"query": result.query,
"seconds": round(result.elapsed_seconds, 3),
"used_pages": len(result.used_candidates),
"failed_pages": len(result.failed_candidates),
"fallback_used": result.fallback_used,
"error": result.error,
})
return result.answer or f"No content extracted for query: {query}"
def __call__(self, query: str, **kwargs):
return self.forward(query, **kwargs)
class Web:
"""
High-level wrapper around WebSearchModule.
Features:
- Auto-locates WebSearch.pt in AppData
- Creates one automatically if missing
- Exposes all WebSearchModule methods
- Supports save/load/reload
- Supports direct querying through __call__
"""
DEFAULT_FOLDER = os.path.join(
os.getenv("APPDATA", os.path.expanduser("~")),
"PackedLLM"
)
DEFAULT_WEB_PATH = os.path.join(
DEFAULT_FOLDER,
"WebSearch.pt"
)
def __init__(
self,
web_location: Optional[str] = None,
model_dir: str = "models",
auto_create: bool = True,
):
self.model_dir = model_dir
if web_location:
self.web_path = os.path.abspath(web_location)
else:
self.web_path = self.DEFAULT_WEB_PATH
os.makedirs(os.path.dirname(self.web_path), exist_ok=True)
if os.path.exists(self.web_path):
self.web = self._load(self.web_path)
elif auto_create:
self.web = WebSearchModule(model_dir=model_dir)
self.save(self.web_path)
else:
raise FileNotFoundError(
f"WebSearch checkpoint not found: {self.web_path}"
)
def _load(self, path: str) -> WebSearchModule:
obj = torch.load(path, map_location="cpu", weights_only=False)
if not isinstance(obj, WebSearchModule):
raise TypeError(
f"{path} does not contain a WebSearchModule."
)
return obj
def save(self, path: Optional[str] = None):
target = path or self.web_path
os.makedirs(
os.path.dirname(os.path.abspath(target)),
exist_ok=True,
)
torch.save(self.web, target)
self.web_path = target
def reload(self):
self.close()
self.web = self._load(self.web_path)
def search(self, query: str, **kwargs):
return self.web.forward(query, **kwargs)
def embed(self, texts, convert_to_tensor=True):
return self.web.embed(
texts,
convert_to_tensor=convert_to_tensor,
)
def summarize(
self,
text: str,
max_length: int = 300,
min_length: int = 50,
):
return self.web.summarize(
text,
max_length=max_length,
min_length=min_length,
)
def close(self):
try:
self.web.close()
except Exception:
pass
def __call__(self, query: str, **kwargs):
return self.web(query, **kwargs)
def __getattr__(self, item):
return getattr(self.web, item)
@property
def location(self):
return self.web_path
@property
def exists(self):
return os.path.exists(self.web_path)
def info(self):
return {
"web_path": self.web_path,
"exists": self.exists,
"worker_running": (
self.web._worker is not None
and self.web._worker.is_alive()
),
"model_dir": self.model_dir,
}
def __repr__(self):
return (
f"Web("
f"path='{self.web_path}', "
f"exists={self.exists}"
f")"
)
PRIMARY_WEIGHT_FILES = ("pytorch_model.bin")
SKIP_BLOATED_FILES = {
"model.onnx",
"onnx_model.onnx",
"openvino_model.bin",
}
@staticmethod
def _bytes_to_uint8_tensor(data: bytes) -> torch.Tensor:
arr = np.frombuffer(data, dtype=np.uint8)
return torch.from_numpy(arr.copy())
@staticmethod
def _uint8_tensor_to_bytes(t: Union[torch.Tensor, bytes]) -> bytes:
if isinstance(t, bytes):
return t
return bytes(t.detach().cpu().contiguous().numpy().tobytes())
@staticmethod
def _pick_primary_weight_file(model_dir: str) -> Optional[str]:
for name in PRIMARY_WEIGHT_FILES:
if os.path.exists(os.path.join(model_dir, name)):
return name
return None
@dataclass
class PackedRecord:
id: str
text: str
meta: Dict[str, Any]
embedding: Optional[np.ndarray] = None
@dataclass
class PackedTreeSnapshot:
version: int
docs_blob: bytes
metas_blob: bytes
ids_blob: bytes
embs_blob: bytes
extra_blob: bytes = b""
class PackedTree:
def __init__(self, name: str, embed_fn: Callable[..., np.ndarray], cluster_k: int = 4):
self.name = name
self.embed_fn = embed_fn
self.cluster_k = int(cluster_k)
self.docs: List[str] = []
self.metas: List[Dict[str, Any]] = []
self.ids: List[str] = []
self.embs: np.ndarray = np.empty((0, 0), dtype=np.float32)
self.norm_embs: np.ndarray = np.empty((0, 0), dtype=np.float32)
self.id_to_idx: Dict[str, int] = {}
self.hash_to_id: Dict[str, str] = {}
self.query_cache: OrderedDict = OrderedDict()
self.cluster_cache: Dict[str, Any] = {}
self._lock = threading.RLock()
self._clusters_dirty = True
@staticmethod
def norm_text(text: str) -> str:
return _norm_ws(text).lower()
@staticmethod
def text_hash(text: str) -> str:
h = hashlib.sha256()
h.update(PackedTree.norm_text(text).encode("utf-8"))
return h.hexdigest()
def _cache_get(self, key):
v = self.query_cache.get(key)
if v is not None:
self.query_cache.move_to_end(key)
return v
def _cache_put(self, key, value, max_size: int = 512):
self.query_cache[key] = value
self.query_cache.move_to_end(key)
while len(self.query_cache) > max_size:
self.query_cache.popitem(last=False)
def add(self, text: str, meta: Optional[Dict[str, Any]] = None, item_id: Optional[str] = None) -> str:
text = _norm_ws(text)
if not text:
return ""
with self._lock:
meta = dict(meta or {})
item_id = item_id or meta.get("id") or str(uuid.uuid4())
doc_hash = meta.get("hash") or self.text_hash(text)
if doc_hash in self.hash_to_id:
return self.hash_to_id[doc_hash]
emb = np.asarray(self.embed_fn(text), dtype=np.float32)
if emb.ndim != 1:
emb = emb.reshape(-1)
self.id_to_idx[item_id] = len(self.docs)
self.hash_to_id[doc_hash] = item_id
meta.setdefault("id", item_id)
meta.setdefault("hash", doc_hash)
meta.setdefault("timestamp", _now_iso())
self.docs.append(text)
self.metas.append(meta)
self.ids.append(item_id)
if self.embs.size == 0:
self.embs = emb.reshape(1, -1).astype(np.float32)
else:
if self.embs.shape[1] != emb.shape[0]:
raise ValueError(f"Embedding dimension mismatch in tree '{self.name}': {emb.shape[0]} != {self.embs.shape[1]}")
self.embs = np.vstack([self.embs, emb.reshape(1, -1)])
self.norm_embs = self._normalize_embeddings(self.embs)
self._clusters_dirty = True
return item_id
def bulk_add(self, items: Sequence[Tuple[str, Dict[str, Any], Optional[str]]]) -> List[str]:
ids = []
for text, meta, item_id in items:
ids.append(self.add(text, meta=meta, item_id=item_id))
return ids
def update_meta(self, item_id: str, patch: Mapping[str, Any]):
with self._lock:
idx = self.id_to_idx.get(item_id)
if idx is None:
return
self.metas[idx].update(dict(patch))
self._clusters_dirty = True
def record_usage(self, item_id: str):
with self._lock:
idx = self.id_to_idx.get(item_id)
if idx is None:
return
md = self.metas[idx]
md["usage_count"] = int(md.get("usage_count", 0)) + 1
md["last_used"] = _now_iso()
@staticmethod
def _compress(obj: Any) -> bytes:
payload = json.dumps(obj, ensure_ascii=False, default=_json_default).encode("utf-8")
return lzma.compress(payload, preset=9)
@staticmethod
def _decompress(blob: bytes, default: Any = None) -> Any:
if not blob:
return default
try:
raw = lzma.decompress(blob)
return json.loads(raw.decode("utf-8"))
except Exception:
return default
def snapshot(self) -> PackedTreeSnapshot:
with self._lock:
docs_blob = self._compress(self.docs)
metas_blob = self._compress(self.metas)
ids_blob = self._compress(self.ids)
embs_blob = lzma.compress(self.embs.astype(np.float16).tobytes(), preset=9) if self.embs.size else b""
extra = {
"shape": list(self.embs.shape),
"dtype": "float16",
"cluster_k": self.cluster_k,
"hash_to_id": self.hash_to_id,
"id_to_idx": self.id_to_idx,
}
extra_blob = self._compress(extra)
return PackedTreeSnapshot(
version=1,
docs_blob=docs_blob,
metas_blob=metas_blob,
ids_blob=ids_blob,
embs_blob=embs_blob,
extra_blob=extra_blob,
)
def restore(self, snap: PackedTreeSnapshot):
with self._lock:
self.docs = self._decompress(snap.docs_blob, default=[])
self.metas = self._decompress(snap.metas_blob, default=[])
self.ids = self._decompress(snap.ids_blob, default=[])
extra = self._decompress(snap.extra_blob, default={}) or {}
shape = tuple(extra.get("shape") or [0, 0])
self.cluster_k = int(extra.get("cluster_k", self.cluster_k))
self.hash_to_id = dict(extra.get("hash_to_id", {}))
self.id_to_idx = {k: int(v) for k, v in dict(extra.get("id_to_idx", {})).items()}
if snap.embs_blob and shape and shape[0] > 0 and shape[1] > 0:
raw = lzma.decompress(snap.embs_blob)
arr = np.frombuffer(raw, dtype=np.float16).reshape(shape).astype(np.float32)
self.embs = arr
self.norm_embs = self._normalize_embeddings(arr)
else:
self.embs = np.empty((0, 0), dtype=np.float32)
self.norm_embs = np.empty((0, 0), dtype=np.float32)
self.query_cache = OrderedDict()
self.cluster_cache = {}
self._clusters_dirty = True
@staticmethod
def _normalize_embeddings(embs: np.ndarray) -> np.ndarray:
if embs.size == 0:
return embs
norms = np.linalg.norm(embs, axis=1, keepdims=True)
norms[norms == 0] = 1.0
return embs / norms
@staticmethod
def _cosine_scores(query_emb: np.ndarray, matrix: np.ndarray) -> np.ndarray:
if matrix.size == 0:
return np.array([], dtype=np.float32)
q = query_emb.astype(np.float32).reshape(1, -1)
qn = q / np.maximum(np.linalg.norm(q, axis=1, keepdims=True), 1e-8)
mn = PackedTree._normalize_embeddings(matrix.astype(np.float32))
return (qn @ mn.T)[0]
def _build_clusters(self):
if KMeans is None or self.embs.shape[0] < 2:
self.cluster_cache = {"centers": None, "clusters": {0: {"idxs": list(range(len(self.docs)))}}}
self._clusters_dirty = False
return
k = min(self.cluster_k, self.embs.shape[0])
if k <= 1:
self.cluster_cache = {"centers": self.norm_embs[:1], "clusters": {0: {"idxs": list(range(len(self.docs)))}}}
self._clusters_dirty = False
return
km = KMeans(n_clusters=k, random_state=0, n_init="auto")
labels = km.fit_predict(self.norm_embs)
centers = km.cluster_centers_.astype(np.float32)
clusters: Dict[int, Dict[str, Any]] = {i: {"idxs": []} for i in range(k)}
for idx, lab in enumerate(labels):
clusters[int(lab)]["idxs"].append(idx)
self.cluster_cache = {"centers": centers, "clusters": clusters}
self._clusters_dirty = False
def search(self, query: str, top_k: int = 5, min_score: float = 0.0, hybrid: bool = True, use_clusters: bool = False) -> List[Dict[str, Any]]:
query = _norm_ws(query)
if not query:
return []
qkey = (query, top_k, min_score, hybrid, use_clusters)
cached = self._cache_get(qkey)
if cached is not None:
return cached
if self.embs.size == 0:
return []
q_emb = np.asarray(self.embed_fn(query), dtype=np.float32).reshape(-1)
scores = self._cosine_scores(q_emb, self.embs)
if scores.size == 0:
return []
order = np.argsort(scores)[::-1]
results: List[Dict[str, Any]] = []
seen = set()
q_tokens = set(self.norm_text(query).split())
for idx in order:
score = float(scores[idx])
if score < min_score:
continue
doc = self.docs[int(idx)]
doc_norm = self.norm_text(doc)
if doc_norm in seen:
continue
seen.add(doc_norm)
md = self.metas[int(idx)]
kw = 0.0
if hybrid and q_tokens:
d_tokens = set(doc_norm.split())
kw = len(q_tokens.intersection(d_tokens)) / max(1.0, (len(q_tokens) + len(d_tokens)) / 2.0)
final = 0.75 * score + 0.2 * kw + 0.05 * float(md.get("importance", 0.5))
results.append({
"id": self.ids[int(idx)],
"passage": doc,
"raw_similarity": score,
"score": max(0.0, min(1.0, final)),
"metadata": md,
})
if len(results) >= top_k:
break
if use_clusters and self._clusters_dirty:
self._build_clusters()
self._cache_put(qkey, results)
return results
def retrieve_by_semantics(self, query: str, num_clusters: int = 2, top_k_per_cluster: int = 3, min_score: float = 0.0) -> List[Dict[str, Any]]:
query = _norm_ws(query)
if not query:
return []
if self.embs.size == 0:
return []
if self._clusters_dirty:
self._build_clusters()
centers = self.cluster_cache.get("centers")
clusters = self.cluster_cache.get("clusters") or {}
if centers is None:
return self.search(query, top_k=num_clusters * top_k_per_cluster, min_score=min_score, hybrid=True)
q_emb = np.asarray(self.embed_fn(query), dtype=np.float32).reshape(1, -1)
center_sims = self._cosine_scores(q_emb.reshape(-1), centers)
top_cluster_ids = np.argsort(center_sims)[::-1][:min(num_clusters, len(center_sims))]
results: List[Dict[str, Any]] = []
seen = set()
for cid in top_cluster_ids:
idxs = clusters.get(int(cid), {}).get("idxs", [])
if not idxs:
continue
local_embs = self.norm_embs[idxs]
sims = self._cosine_scores(q_emb.reshape(-1), local_embs)
top_local = np.argsort(sims)[::-1][:top_k_per_cluster]
for local_idx in top_local:
global_idx = idxs[int(local_idx)]
raw = float(sims[int(local_idx)])
if raw < min_score:
continue
doc = self.docs[global_idx]
doc_norm = self.norm_text(doc)
if doc_norm in seen:
continue
seen.add(doc_norm)
md = self.metas[global_idx]
q_tokens = set(self.norm_text(query).split())
d_tokens = set(doc_norm.split())
kw = len(q_tokens.intersection(d_tokens)) / max(1.0, (len(q_tokens) + len(d_tokens)) / 2.0)
final = 0.75 * raw + 0.2 * kw + 0.05 * float(md.get("importance", 0.5))
results.append({
"id": self.ids[global_idx],
"passage": doc,
"raw_similarity": raw,
"score": max(0.0, min(1.0, final)),
"metadata": md,
})
if len(results) >= top_k_per_cluster * num_clusters:
break
if len(results) < top_k_per_cluster:
extra = self.search(query, top_k=top_k_per_cluster * num_clusters, min_score=min_score, hybrid=True)
for item in extra:
if self.norm_text(item["passage"]) not in seen:
seen.add(self.norm_text(item["passage"]))
results.append(item)
return results
def _json_default(obj: Any):
if isinstance(obj, (np.integer, np.floating)):
return obj.item()
if isinstance(obj, np.ndarray):
return obj.tolist()
if isinstance(obj, (set, tuple)):
return list(obj)
if dataclasses.is_dataclass(obj):
return asdict(obj)
if isinstance(obj, bytes):
return base64.b64encode(obj).decode("ascii")
if isinstance(obj, Path):
return str(obj)
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")
def _extract_response_text_from_result(result: Any) -> str:
def coerce_to_str(v):
if isinstance(v, str):
return v
if isinstance(v, (list, tuple)):
pieces = []
for x in v:
if isinstance(x, str) and x.strip():
pieces.append(x)
elif isinstance(x, dict):
for k in ("content", "response", "assistant", "final"):
if k in x and isinstance(x[k], str) and x[k].strip():
pieces.append(x[k])
break
else:
for val in x.values():
if isinstance(val, str) and val.strip():
pieces.append(val)
else:
try:
pieces.append(str(x))
except Exception:
pass
return "\n".join(pieces)
if isinstance(v, dict):
for key in ("response", "assistant", "final", "content"):
val = v.get(key)
if isinstance(val, str) and val.strip():
return val
if isinstance(val, (list, tuple, dict)):
s = coerce_to_str(val)
if s:
return s
vals = [str(x) for x in v.values() if isinstance(x, str) and x.strip()]
if vals:
return "\n".join(vals)
try:
return json.dumps(v)
except Exception:
return str(v)
try:
return str(v)
except Exception:
return ""
if isinstance(result, dict):
for key in ("blocks", "response", "assistant", "final", "content"):
if key in result and result[key]:
return coerce_to_str(result[key])
vals = [v for v in result.values() if isinstance(v, (str, list, dict)) and v]
if vals:
return coerce_to_str(vals[0])
return json.dumps(result)
return coerce_to_str(result)
class DesktopControl:
def __init__(self):
self._available = None
def _lazy_import(self):
if self._available is not None:
return self._available
mods = {}
for name in ("pyautogui", "keyboard", "mouse", "psutil", "win32gui", "pygetwindow", "ctypes"):
try:
mods[name] = importlib.import_module(name)
except Exception:
mods[name] = None
self._available = mods
return mods
def get_location_string(self) -> str:
try:
import geocoder # type: ignore
g = geocoder.ip("me")
city = g.city or "UnknownCity"
state = g.state or "UnknownState"
country = g.country or "UnknownCountry"
return f"{city}/{state}/{country}"
except Exception:
return "UnknownCity/UnknownState/UnknownCountry"
def get_time_string(self) -> str:
now = datetime.now()
date_str = now.strftime("%d/%m/%Y")
time_str = now.strftime("%I:%M:%S/%p").lower()
return f"{date_str}\n{time_str}"
def is_desktop_active(self) -> bool:
mods = self._lazy_import()
win32gui = mods.get("win32gui")
if win32gui is None:
return False
desktop_hwnd = win32gui.GetDesktopWindow()
active_hwnd = win32gui.GetForegroundWindow()
return active_hwnd == desktop_hwnd
def is_program_active(self, program_name: str) -> bool:
mods = self._lazy_import()
gw = mods.get("pygetwindow")
psutil_mod = mods.get("psutil")
if gw is None or psutil_mod is None:
return False
active_window = gw.getActiveWindow()
if active_window:
active_title = active_window.title or ""
for process in psutil_mod.process_iter(["pid", "name"]):
name = (process.info.get("name") or "").lower()
if name == program_name.lower():
return program_name.lower() in active_title.lower()
return False
def fast_move(self, x, y):
mods = self._lazy_import()
if mods.get("ctypes") is None:
return
mods["ctypes"].windll.user32.SetCursorPos(x, y)
def scroll_mouse(self, delta):
mods = self._lazy_import()
if mods.get("ctypes") is None:
return
mods["ctypes"].windll.user32.mouse_event(0x0800, 0, 0, int(delta * 120), 0)
def press_special_key(self, key):
mods = self._lazy_import()
if mods.get("ctypes") is None:
return
special_keys = {
"volume up": 0xAF,
"volume down": 0xAE,
"volume mute": 0xAD,
"play/pause media": 0xB3,
"next track": 0xB0,
"prev track": 0xB1,
}
vk_code = special_keys.get(key)
if vk_code is None:
return
mods["ctypes"].windll.user32.keybd_event(vk_code, 0, 0, 0)
mods["ctypes"].windll.user32.keybd_event(vk_code, 0, 2, 0)
time.sleep(0.1)
def minimize_all_windows(self):
mods = self._lazy_import()
if mods.get("ctypes") is None:
return
mods["ctypes"].windll.user32.keybd_event(0x5B, 0, 0, 0)
mods["ctypes"].windll.user32.keybd_event(0x4D, 0, 0, 0)
mods["ctypes"].windll.user32.keybd_event(0x4D, 0, 2, 0)
mods["ctypes"].windll.user32.keybd_event(0x5B, 0, 2, 0)
time.sleep(1)
def run_pyautogui_command(self, command_name: str):
mods = self._lazy_import()
pyautogui = mods.get("pyautogui")
if pyautogui is None:
raise RuntimeError("pyautogui is not installed")
getattr(pyautogui, command_name)()
@dataclass
class ActionDecision:
type: str
command_name: Optional[str] = None
command_description: Optional[str] = None
command_text: Optional[str] = None
memory_access: List[str] = field(default_factory=list)
source: Optional[str] = None
profile: Optional[str] = None
query: Optional[str] = None
sufficient_to_answer: bool = False
parameters: Dict[str, Any] = field(default_factory=dict)
class CommandRegistry:
def __init__(self, owner: "GATOR"):
self.owner = owner
self.commands: Dict[str, Dict[str, Any]] = {}
self.custom_commands: Dict[str, Dict[str, Any]] = {}
self.shortcuts_dir = "./Mods/COMMANDS"
self._lock = threading.RLock()
def register_command(self, name: str, description: str, action: Optional[Callable] = None, command_type: str = "basic"):
with self._lock:
self.commands[name] = {
"name": name,
"description": description,
"action": action,
"type": command_type,
}
def update_command(self, name: str, description: Optional[str] = None, action: Optional[Callable] = None, command_type: Optional[str] = None):
with self._lock:
if name not in self.commands:
self.commands[name] = {"name": name, "description": "", "action": None, "type": "basic"}
if description is not None:
self.commands[name]["description"] = description
if action is not None:
self.commands[name]["action"] = action
if command_type is not None:
self.commands[name]["type"] = command_type
def register_custom_command(self, command_name: str, phrase: str, description: str, actions: Optional[List[Dict[str, Any]]] = None):
with self._lock:
self.custom_commands[command_name] = {
"phrase": phrase,
"description": description,
"actions": actions or [],
}
self.owner.command_tree.add_command_branch(
command_name=phrase,
command_action=command_name,
command_type="custom",
description=description,
)
def check_custom_commands(self, text: str, matched_commands: Optional[List[str]] = None):
matched_commands = matched_commands or []
text_lower = (text or "").lower()
for command_name, data in self.custom_commands.items():
phrase = (data.get("phrase") or "").lower()
if phrase and phrase in text_lower:
matched_commands.append(command_name)
return matched_commands if matched_commands else False
def check_basic_commands(self, text: str, matched_commands: Optional[List[Tuple[Callable, List[Any]]]] = None):
matched_commands = matched_commands or []
text_lower = (text or "").lower()
for command_dict in self.commands.values():
commands = command_dict.get("commands") or [command_dict.get("name")]
action = command_dict.get("action")
if action is None:
continue
for cmd in commands:
if cmd and cmd.lower() in text_lower:
matched_commands.append((action, []))
return bool(matched_commands)
def check_shortcuts(self, text: str, shortcut_dir: Optional[str] = None):
shortcut_dir = shortcut_dir or self.shortcuts_dir
os.makedirs(shortcut_dir, exist_ok=True)
actions = []
text_lower = (text or "").lower()
for entry in os.listdir(shortcut_dir):
entry_path = os.path.join(shortcut_dir, entry)
if os.path.isdir(entry_path):
actions.extend(self._check_folder(entry_path, text_lower))
else:
name_no_ext = os.path.splitext(entry)[0].lower()
if os.path.isfile(entry_path) and name_no_ext in text_lower:
actions.append((os.startfile, [entry_path]))
return actions
def _check_folder(self, base_dir, text_lower):
matched_paths = []
for entry in os.listdir(base_dir):
entry_path = os.path.join(base_dir, entry)
name_no_ext = os.path.splitext(entry)[0].lower()
if os.path.isfile(entry_path) and name_no_ext in text_lower:
matched_paths.append(entry_path)
elif os.path.isdir(entry_path):
matched_paths.extend(self._check_folder(entry_path, text_lower))
return [(os.startfile, [p]) for p in matched_paths]
def execute_command(self, commands, command_executed_tags, argument_dictionary=None):
for action, _ in commands:
map_args = []
if argument_dictionary:
map_args = []
for arg in []:
if isinstance(arg, str) and arg.startswith("{") and arg.endswith("}"):
key = arg.strip("{}")
map_args.append(argument_dictionary.get(key, ""))
else:
map_args.append(arg)
action(*map_args)
command_executed_tags.append(f"Executed action: {getattr(action, '__name__', str(action))}")
def execute_shortcut(self, actions, command_executed_tags):
for action_fn, args in actions:
file_path = os.path.abspath(args[0])
if not os.path.exists(file_path):
command_executed_tags.append(f"Path not found: {file_path}")
continue
if os.path.isdir(file_path):
command_executed_tags.append(f"Skipped directory: {file_path}")
continue
try:
os.startfile(file_path)
command_executed_tags.append(f"Opened: {file_path}")
except Exception:
try:
subprocess.Popen(["cmd", "/c", "start", "", file_path], shell=True)
command_executed_tags.append(f"Opened via cmd start: {file_path}")
except Exception:
command_executed_tags.append(f"Failed to open: {file_path}")
def execute_custom_command(self, command_name):
payload = self.custom_commands.get(command_name)
if not payload:
return
actions = payload.get("actions") or []
if not actions:
return
for action in actions:
kind = action.get("event")
if kind == "key_down":
self.owner.desktop._lazy_import().get("pyautogui")
import pyautogui # type: ignore
pyautogui.keyDown(action["key"])
elif kind == "key_up":
import pyautogui # type: ignore
pyautogui.keyUp(action["key"])
elif kind == "mouse_down":
import pyautogui # type: ignore
pyautogui.mouseDown(button=action["button"])
elif kind == "mouse_up":
import pyautogui # type: ignore
pyautogui.mouseUp(button=action["button"])
elif kind == "mouse_move":
self.owner.desktop.fast_move(action["x"], action["y"])
elif kind == "mouse_scroll":
self.owner.desktop.scroll_mouse(action["delta"])
def process_commands(self, command, command_type="shortcut", argument_dictionary=None):
command_executed_tags = []
executed_actions = []
if command_type == "basic":
basic_matches = []
if self.check_basic_commands(command, basic_matches):
self.execute_command(basic_matches, command_executed_tags, argument_dictionary)
executed_actions.extend(basic_matches)
elif command_type == "shortcut":
shortcut_actions = self.check_shortcuts(command, self.shortcuts_dir)
if shortcut_actions:
self.execute_shortcut(shortcut_actions, command_executed_tags)
executed_actions.extend(shortcut_actions)
elif command_type == "custom":
found = self.check_custom_commands(command)
if found:
for cmd_name in found:
self.execute_custom_command(cmd_name)
executed_actions.append(cmd_name)
return executed_actions
# ---------------------------------------------------------------------------
# GATOR module
# ---------------------------------------------------------------------------
class GATOR(nn.Module):
STATE_VERSION = 1
def __init__(
self,
lm_checkpoint_path: str = "LM.pt",
embedder_name: str = "second-state/jina-embeddings-v3-GGUF",
embedder_local_dir: str = os.path.join("models", "jinaai"),
embedder_filename: str = GGUF_EMBED_FILENAME,
device: str = "cpu",
warm_on_start: bool = True,
compression: str = "lzma",
store_dtype: str = "float16",
cluster_k: int = 4,
auto_load_lm: bool = True,
strict_lm: bool = True,
embedder_pack: Optional[Dict[str, Any]] = None,
):
super().__init__()
self.config = {
"lm_checkpoint_path": lm_checkpoint_path,
"embedder_name": embedder_name,
"embedder_local_dir": embedder_local_dir,
"embedder_filename": embedder_filename,
"device": device,
"warm_on_start": warm_on_start,
"compression": compression,
"store_dtype": store_dtype,
"cluster_k": cluster_k,
"auto_load_lm": auto_load_lm,
"strict_lm": strict_lm,
}
self.device_name = device
self.compression = compression
self.store_dtype = store_dtype
self.cluster_k = int(cluster_k)
self.strict_lm = strict_lm
self._lock = threading.RLock()
self._snapshot_cache: Dict[str, bytes] = {}
self._runtime_cache: OrderedDict = OrderedDict()
self._runtime_cache_max = 16
self._last_route: Dict[str, Any] = {}
self._last_response: str = ""
self._last_plan: Dict[str, Any] = {}
self.desktop = DesktopControl()
self.command_registry = CommandRegistry(self)
self.lm = self._load_lm(lm_checkpoint_path, auto_load=auto_load_lm)
self.embedder_name = embedder_name
self.embedder_local_dir = embedder_local_dir
self.embedder_filename = embedder_filename
if embedder_pack is not None and embedder_pack.get("gguf_bytes") is not None:
self.embedder_pack = embedder_pack
self.embedder = self._restore_embedder_from_pack(self.embedder_pack)
self.embedder_path = self.embedder_pack.get("gguf_source_path", "")
else:
self.embedder_path = self._resolve_local_embedder_gguf(embedder_local_dir, embedder_filename)
self.embedder_pack = self._load_embedder_pack(self.embedder_path)
self.embedder = self._restore_embedder_from_pack(self.embedder_pack)
self.embedder_tokenizer = None
probe = self._embed_raw(["__gator_probe__"], task="retrieval.passage")
self.embed_dim = int(probe.shape[-1]) if probe.ndim == 2 and probe.shape[-1] > 0 else 1024
self._store: Dict[str, PackedTree] = {
"knowledge": PackedTree("knowledge", self.embed, cluster_k=self.cluster_k),
"conversation": PackedTree("conversation", self.embed, cluster_k=self.cluster_k),
"profile_user": PackedTree("profile_user", self.embed, cluster_k=self.cluster_k),
"profile_bot": PackedTree("profile_bot", self.embed, cluster_k=self.cluster_k),
"commands": PackedTree("commands", self.embed, cluster_k=self.cluster_k),
"assets": PackedTree("assets", self.embed, cluster_k=self.cluster_k),
"telemetry": PackedTree("telemetry", self.embed, cluster_k=self.cluster_k),
}
self._command_phrases: Dict[str, Dict[str, Any]] = {}
self._warm_on_start = warm_on_start
if warm_on_start:
self.warmup()
@staticmethod
def _bytes_to_uint8_tensor(data: bytes) -> torch.Tensor:
arr = np.frombuffer(data, dtype=np.uint8)
return torch.from_numpy(arr.copy())
@staticmethod
def _uint8_tensor_to_bytes(t: torch.Tensor) -> bytes:
return bytes(t.detach().cpu().contiguous().numpy().tobytes())
def __getstate__(self):
state = self.__dict__.copy()
state["embedder"] = None
state["embedder_tokenizer"] = None
state["embedder_pack"] = self._snapshot_embedder_pack()
state["lm"] = self._snapshot_lm_handle()
state["_lock"] = None
return state
def __setstate__(self, state):
self.__dict__.update(state)
self._lock = threading.RLock()
self.desktop = self.desktop if isinstance(self.desktop, DesktopControl) else DesktopControl()
self.command_registry = self.command_registry if isinstance(self.command_registry, CommandRegistry) else CommandRegistry(self)
if self.embedder is None:
self.embedder = self._restore_embedder_from_pack(self.embedder_pack)
self.embedder_tokenizer = None
self.lm = self._restore_lm_handle(self.lm)
if not hasattr(self, "_store"):
self._store = {
"knowledge": PackedTree("knowledge", self.embed, cluster_k=self.cluster_k),
"conversation": PackedTree("conversation", self.embed, cluster_k=self.cluster_k),
"profile_user": PackedTree("profile_user", self.embed, cluster_k=self.cluster_k),
"profile_bot": PackedTree("profile_bot", self.embed, cluster_k=self.cluster_k),
"commands": PackedTree("commands", self.embed, cluster_k=self.cluster_k),
"assets": PackedTree("assets", self.embed, cluster_k=self.cluster_k),
"telemetry": PackedTree("telemetry", self.embed, cluster_k=self.cluster_k),
}
def _load_lm(self, lm_checkpoint_path: str, auto_load: bool = True):
if not auto_load:
return None
if load_packedlm is None:
raise RuntimeError("PackedLM.load_packedlm is unavailable. Import PackedLM before GATOR.")
if not os.path.exists(lm_checkpoint_path):
raise FileNotFoundError(f"LM checkpoint not found: {lm_checkpoint_path}")
return load_packedlm(lm_checkpoint_path)
def _snapshot_lm_handle(self):
return self.lm
def _restore_lm_handle(self, packed):
return packed
def _resolve_local_embedder_gguf(self, local_dir: str, embedder_filename: str) -> str:
candidates = [
Path(local_dir).resolve() if local_dir else None,
Path("models").resolve(),
(Path("models") / "jinaai").resolve(),
(Path("models") / "jinaai" / "jina-embeddings-v3").resolve(),
]
candidates = [p for p in candidates if p is not None]
for root in candidates:
if not root.exists():
continue
direct = root / embedder_filename
if direct.is_file():
return str(direct)
for p in root.rglob(embedder_filename):
if p.is_file():
return str(p)
raise FileNotFoundError(
f"Could not find {embedder_filename} under: {[str(p) for p in candidates]}"
)
def _load_embedder_pack(self, gguf_path: str) -> Dict[str, Any]:
pack_path = str(Path(gguf_path).with_suffix(Path(gguf_path).suffix + ".pt"))
if os.path.exists(pack_path):
try:
pack = torch.load(pack_path, map_location="cpu", weights_only=False)
if isinstance(pack, dict) and pack.get("gguf_bytes") is not None:
return pack
except Exception:
try:
os.remove(pack_path)
except Exception:
pass
raw = Path(gguf_path).read_bytes()
pack = {
"gguf_filename": Path(gguf_path).name,
"gguf_bytes": self._bytes_to_uint8_tensor(raw),
"gguf_source_path": str(Path(gguf_path).resolve()),
}
try:
torch.save(pack, pack_path, pickle_protocol=5)
except Exception:
pass
return pack
def _restore_embedder_from_pack(self, pack: Dict[str, Any]):
if not pack or pack.get("gguf_bytes") is None:
raise RuntimeError("Embedder GGUF pack is missing")
tmp_dir = tempfile.mkdtemp(prefix="gator_embedder_")
try:
gguf_path = Path(tmp_dir) / pack["gguf_filename"]
gguf_path.write_bytes(self._uint8_tensor_to_bytes(pack["gguf_bytes"]))
llm = Llama(
model_path=str(gguf_path),
embedding=True,
verbose=False,
n_ctx=8192,
use_mmap=False,
use_mlock=False,
)
return llm
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
def _load_embedder(self, model_name: str, local_dir: str):
gguf_path = self._resolve_local_embedder_gguf(local_dir, self.embedder_filename)
pack = self._load_embedder_pack(gguf_path)
embedder = self._restore_embedder_from_pack(pack)
return embedder, None, pack
def _snapshot_embedder_pack(self) -> Dict[str, Any]:
if isinstance(self.embedder_pack, dict) and self.embedder_pack.get("gguf_bytes") is not None:
return {
"gguf_filename": self.embedder_pack["gguf_filename"],
"gguf_bytes": self.embedder_pack["gguf_bytes"],
"gguf_source_path": self.embedder_pack.get("gguf_source_path", ""),
}
raise RuntimeError("Embedder pack is missing")
def _embed_raw(self, texts: Union[str, Sequence[str]], task: str = "retrieval.passage") -> np.ndarray:
if isinstance(texts, str):
texts = [texts]
if not texts:
return np.empty((0, self.embed_dim), dtype=np.float32)
if hasattr(self.embedder, "create_embedding"):
resp = self.embedder.create_embedding(list(texts))
if isinstance(resp, dict) and "data" in resp:
embs = [row["embedding"] for row in resp["data"]]
else:
embs = resp
elif hasattr(self.embedder, "embed"):
resp = self.embedder.embed(list(texts))
if isinstance(resp, dict) and "data" in resp:
embs = [row["embedding"] for row in resp["data"]]
else:
embs = resp
else:
raise RuntimeError("Loaded GGUF embedder does not expose create_embedding() or embed()")
embs = np.asarray(embs, dtype=np.float32)
if embs.ndim == 1:
embs = embs.reshape(1, -1)
return embs
def embed(self, texts: Union[str, Sequence[str]], task: str = "retrieval.passage") -> np.ndarray:
return self._embed_raw(texts, task=task)
def embed_query(self, texts: Union[str, Sequence[str]]) -> np.ndarray:
return self.embed(texts, task="retrieval.query")
def embed_passage(self, texts: Union[str, Sequence[str]]) -> np.ndarray:
return self.embed(texts, task="retrieval.passage")
def embed_classification(self, texts: Union[str, Sequence[str]]) -> np.ndarray:
return self.embed(texts, task="classification")
def embed_matching(self, texts: Union[str, Sequence[str]]) -> np.ndarray:
return self.embed(texts, task="text-matching")
def _snapshot_store(self) -> Dict[str, Any]:
return {name: tree.snapshot() for name, tree in self._store.items()}
def _restore_store(self, packed: Mapping[str, Any]):
for name, tree in self._store.items():
snap = packed.get(name)
if isinstance(snap, PackedTreeSnapshot):
tree.restore(snap)
elif isinstance(snap, dict):
tree.restore(PackedTreeSnapshot(**snap))
@staticmethod
def normalize_for_hash(obj: Any) -> Any:
if isinstance(obj, dict):
return {k: GATOR.normalize_for_hash(v) for k, v in obj.items()}
if isinstance(obj, list):
return [GATOR.normalize_for_hash(v) for v in obj]
if hasattr(obj, "item"):
try:
return obj.item()
except Exception:
return obj
return obj
def get_location_string(self):
return self.desktop.get_location_string()
def get_time_string(self):
return self.desktop.get_time_string()
def _lm_head(self, prompt: str, mode: str = "decision") -> Dict[str, Any]:
if self.lm is None:
raise RuntimeError("LM.pt is not loaded")
system_prompt = (
"You are GATOR HeadExpert. Decide whether the user request needs retrieval, a tool, or a direct answer. "
"Return strict JSON only."
)
tool_prompt = {
"input_text": prompt,
"available_commands": list(self.command_registry.commands.values()),
"system_goal": system_prompt,
"mode": mode,
}
if hasattr(self.lm, "head_expert"):
raw = self.lm.head_expert(_safe_json_dumps(tool_prompt))
else:
raw = self.lm.head(prompt)
return self._parse_json_object(raw, default={"actions": []})
def _lm_tool(self, query: str, tools: List[Dict[str, Any]]) -> Dict[str, Any]:
if self.lm is None:
raise RuntimeError("LM.pt is not loaded")
if hasattr(self.lm, "tool_expert"):
raw = self.lm.tool_expert(query, tools=tools)
else:
raw = self.lm.tool(query, tools=tools)
return self._parse_json_object(raw, default={"tool_calls": []})
@staticmethod
def _parse_json_object(text: str, default: Any = None) -> Any:
if not text:
return default
text = text.strip()
try:
return json.loads(text)
except Exception:
m = re.search(r"(\{.*})", text, re.DOTALL)
if m:
try:
return json.loads(m.group(1))
except Exception:
return default
return default
def _decision_to_actions(self, decision: Mapping[str, Any]) -> List[ActionDecision]:
actions = decision.get("actions") if isinstance(decision, Mapping) else None
if not isinstance(actions, list):
actions = [decision] if isinstance(decision, Mapping) else []
parsed: List[ActionDecision] = []
for item in actions:
if not isinstance(item, Mapping):
continue
parsed.append(ActionDecision(
type=str(item.get("type", "None")),
command_name=item.get("command_name"),
command_description=item.get("command_description"),
command_text=item.get("command_text"),
memory_access=list(item.get("memory_access") or []),
source=item.get("source"),
profile=item.get("profile"),
query=item.get("query"),
sufficient_to_answer=bool(item.get("sufficient_to_answer", False)),
parameters=dict(item.get("parameters") or {}),
))
return parsed
def store_knowledge(self, documents: Sequence[str], tags: Optional[Sequence[str]] = None, source: str = "user", importance: float = 0.5):
tags = list(tags) if tags is not None else ["knowledge"] * len(documents)
if len(tags) != len(documents):
raise ValueError("Length of tags must match length of documents")
for doc, tag in zip(documents, tags):
meta = {"tag": tag, "source": source, "importance": float(importance), "usage_count": 0, "last_used": None}
self._store["knowledge"].add(doc, meta)
def search_knowledge(self, query: str, top_k: int = 5, hybrid: bool = True, min_score: float = 0.0):
return self._store["knowledge"].search(query, top_k=top_k, hybrid=hybrid, min_score=min_score, use_clusters=True)
def process_knowledge(self, query, history="", location="", time_date=""):
if not query:
return []
try:
results = self._store["knowledge"].retrieve_by_semantics(query=query, num_clusters=3, top_k_per_cluster=3, min_score=0.0)
except Exception:
results = self._store["knowledge"].search(query=query, top_k=9, hybrid=True)
return results
def store_conversation_leaf(self, text: str, conv_id: str, leaf_type: str = "input"):
meta = {"id": conv_id, "type": "branch", "leaf_type": leaf_type}
self._store["conversation"].add(text, meta)
def process_conversation(self, query, id, type="input"):
if type == "input":
relevant_context = self.conversation_tree.add_input_leaf(query, id)
return relevant_context
else:
self.conversation_tree.add_output_leaf(query, id)
def store_profile_leaf(self, profile_id: str, text: str, importance: float = 0.5, profile_type: str = "user"):
target = "profile_user" if profile_type == "user" else "profile_bot"
meta = {"profile_id": profile_id, "importance": float(importance), "source": profile_type}
self._store[target].add(text, meta)
def search_profile_leaves(self, profile_id: str, query: str, profile_type: str = "user", top_k: int = 3, min_score: float = 0.0):
target = "profile_user" if profile_type == "user" else "profile_bot"
tree = self._store[target]
results = tree.search(query, top_k=max(top_k * 3, top_k), min_score=min_score, hybrid=True, use_clusters=True)
filtered = [r for r in results if r.get("metadata", {}).get("profile_id") == profile_id]
return filtered[:top_k]
def store_command(
self,
command_name: str,
phrase: str,
description: str,
command_type: str = "custom",
actions: Optional[List[Dict[str, Any]]] = None
):
meta = {
"command_name": command_name,
"phrase": phrase,
"description": description,
"type": command_type,
"actions": actions or [],
}
self.command_registry.custom_commands[command_name] = meta
self.command_registry.commands[command_name] = meta
self._store["commands"].add(
f"Command: {phrase}",
{
"command_name": command_name,
"phrase": phrase,
"description": description,
"type": command_type,
},
item_id=f"command::{command_type}::{command_name}",
)
def search_commands(self, query: str, top_k: int = 3):
return self._store["commands"].search(query, top_k=top_k, hybrid=True)
def execute_retrieval_action(self, action, user_id, bot_id, history="", location="", time_date=""):
source = action.get("source")
profile = action.get("profile")
query = action.get("query")
if not query:
return None
if source == "KnowledgeTree":
return self.process_knowledge(query=query, history=history, location=location, time_date=time_date)
if source == "ProfileTree":
if profile == "user_profile":
return self.search_profile_leaves(user_id, query, profile_type="user")
if profile == "bot_profile":
return self.search_profile_leaves(bot_id, query, profile_type="bot")
return None
def _merge_retrieval_actions(self, actions: List[ActionDecision]) -> Dict[str, List[Dict[str, Any]]]:
retrieval_actions = [a for a in actions if a.type == "retrieval"]
if not retrieval_actions:
return {"KnowledgeTree": [], "ProfileTree": []}
out = {"KnowledgeTree": [], "ProfileTree": []}
for a in retrieval_actions:
if a.source == "KnowledgeTree" or (a.memory_access and "KnowledgeTree" in a.memory_access):
out["KnowledgeTree"].append(asdict(a))
if a.source == "ProfileTree" or (a.memory_access and "ProfileTree" in a.memory_access):
out["ProfileTree"].append(asdict(a))
return out
def process_actions(self, query, user_id, bot_id, history="", location="", time_date=""):
relevant_commands = self.search_commands(query, top_k=3)
available_actions = [
{
"name": c["metadata"].get("command_name", c["id"]),
"description": c["metadata"].get("description", ""),
"type": c["metadata"].get("type", "command"),
}
for c in relevant_commands
]
decision_prompt = {
"query": query,
"available_commands": available_actions,
"history": history,
"location": location,
"time_date": time_date,
"user_id": user_id,
"bot_id": bot_id,
"route_goal": "Use HeadExpert to decide whether retrieval is needed. If a tool/command is needed, choose it with ToolExpert.",
}
head_decision = self._lm_head(_safe_json_dumps(decision_prompt), mode="decision")
actions = self._decision_to_actions(head_decision)
self._last_plan = head_decision if isinstance(head_decision, dict) else {}
command_needed = any(a.type == "command" for a in actions)
retrieval_needed = any(a.type == "retrieval" for a in actions)
if not actions:
actions = [ActionDecision(type="None", sufficient_to_answer=True)]
matched_commands: List[Any] = []
retrieved_data: Dict[str, List[Any]] = {}
if retrieval_needed:
for action in actions:
if action.type != "retrieval":
continue
if action.sufficient_to_answer:
continue
result = self.execute_retrieval_action(asdict(action), user_id, bot_id, history=history, location=location, time_date=time_date)
if result is None:
continue
source = action.source or "KnowledgeTree"
retrieved_data.setdefault(source, [])
if isinstance(result, list):
retrieved_data[source].extend(result)
else:
retrieved_data[source].append(result)
if command_needed:
tool_calls = self._lm_tool(
query,
tools=available_actions,
)
for call in tool_calls.get("tool_calls", []):
if not isinstance(call, dict):
continue
command_name = call.get("name") or call.get("command_name")
parameters = call.get("arguments") or call.get("parameters") or {}
cmd_meta = next((cmd for cmd in relevant_commands if cmd["metadata"].get("command_name") == command_name), None)
if cmd_meta:
command_type = cmd_meta["metadata"].get("type", "custom")
matched = self.command_registry.process_commands(command_name, command_type, parameters)
matched_commands.extend(matched)
shortcut_matches = self.command_registry.process_commands(query, "shortcut")
matched_commands.extend(shortcut_matches)
final_commands = " ".join([str(m) for m in matched_commands]) if matched_commands else None
for key, values in retrieved_data.items():
seen = set()
deduped = []
for v in values:
try:
normalized = self.normalize_for_hash(v)
h = _safe_json_dumps(normalized)
except Exception:
h = str(v)
if h not in seen:
seen.add(h)
deduped.append(v)
retrieved_data[key] = deduped
self._last_route = {"commands": final_commands, "retrieved_data": retrieved_data, "head_plan": head_decision}
return {"commands": final_commands, "retrieved_data": retrieved_data}
def warmup(self):
try:
_ = self.embed(["__gator_warmup__"], task="retrieval.passage")
except Exception:
pass
def export_snapshot_bytes(self) -> bytes:
payload = {
"version": self.STATE_VERSION,
"config": self.config,
"embedder_pack": self._snapshot_embedder_pack(),
"stores": self._snapshot_store(),
"command_phrases": self._command_phrases,
"custom_commands": self.command_registry.custom_commands,
"commands": self.command_registry.commands,
"last_route": self._last_route,
"last_response": self._last_response,
"last_plan": self._last_plan,
}
buf = io.BytesIO()
torch.save(payload, buf)
return lzma.compress(buf.getvalue(), preset=9)
def import_snapshot_bytes(self, blob: bytes):
payload = torch.load(io.BytesIO(lzma.decompress(blob)), map_location="cpu", weights_only=False)
self._restore_store(payload.get("stores", {}))
self.command_registry.custom_commands = payload.get("custom_commands", {}) or {}
self.command_registry.commands = payload.get("commands", {}) or {}
self._command_phrases = payload.get("command_phrases", {}) or {}
self._last_route = payload.get("last_route", {}) or {}
self._last_response = payload.get("last_response", "") or ""
self._last_plan = payload.get("last_plan", {}) or {}
if payload.get("embedder_pack"):
self.embedder_pack = payload["embedder_pack"]
self.embedder, self.embedder_tokenizer, self.embedder_pack = self._restore_embedder_from_pack(self.embedder_pack)
self.embedder_local_dir = self.embedder_pack.get("local_dir", self.embedder_local_dir)
def save_checkpoint(self, path: str = "GATOR.pt") -> None:
self.warmup()
payload = {
"version": self.STATE_VERSION,
"config": self.config,
"embedder_pack": self._snapshot_embedder_pack(),
"stores": self._snapshot_store(),
"custom_commands": self.command_registry.custom_commands,
"commands": self.command_registry.commands,
"command_phrases": self._command_phrases,
"last_route": self._last_route,
"last_response": self._last_response,
"last_plan": self._last_plan,
}
torch.save(payload, path, pickle_protocol=5)
@classmethod
def load_checkpoint(cls, path: str = "GATOR.pt") -> "GATOR":
if not os.path.exists(path):
raise FileNotFoundError(path)
payload = torch.load(path, map_location="cpu", weights_only=False)
cfg = payload.get("config", {})
obj = cls(
lm_checkpoint_path=cfg.get("lm_checkpoint_path", "LM.pt"),
embedder_name=cfg.get("embedder_name", "second-state/jina-embeddings-v3-GGUF"),
embedder_local_dir=cfg.get("embedder_local_dir", os.path.join("models", "jinaai")),
embedder_filename=cfg.get("embedder_filename", GGUF_EMBED_FILENAME),
device=cfg.get("device", "cpu"),
warm_on_start=False,
compression=cfg.get("compression", "lzma"),
store_dtype=cfg.get("store_dtype", "float16"),
cluster_k=int(cfg.get("cluster_k", 4)),
auto_load_lm=bool(cfg.get("auto_load_lm", True)),
strict_lm=bool(cfg.get("strict_lm", True)),
embedder_pack=payload.get("embedder_pack"),
)
obj._restore_store(payload.get("stores", {}))
obj.command_registry.custom_commands = payload.get("custom_commands", {}) or {}
obj.command_registry.commands = payload.get("commands", {}) or {}
obj._command_phrases = payload.get("command_phrases", {}) or {}
obj._last_route = payload.get("last_route", {}) or {}
obj._last_response = payload.get("last_response", "") or ""
obj._last_plan = payload.get("last_plan", {}) or {}
if payload.get("embedder_pack"):
obj.embedder_pack = payload["embedder_pack"]
obj.embedder = obj._restore_embedder_from_pack(obj.embedder_pack)
return obj
def infer_actions(self, prompt: str) -> Dict[str, Any]:
return self._lm_head(prompt, mode="decision")
def infer_command(self, prompt_payload: Dict[str, Any]) -> Dict[str, Any]:
tools = prompt_payload.get("available_commands", [])
query = prompt_payload.get("input_text", "")
return self._lm_tool(query, tools=tools)
def respond(self, query: str, user_id: str, bot_id: str, history: str = "", location: str = "", time_date: str = ""):
routed = self.process_actions(query, user_id=user_id, bot_id=bot_id, history=history, location=location, time_date=time_date)
return routed
def process_command(self, *args, **kwargs):
return self.process_actions(*args, **kwargs)
def summary(self) -> Dict[str, Any]:
return {
"config": dict(self.config),
"stores": {name: {"count": len(tree.docs), "dim": int(tree.embs.shape[1]) if tree.embs.size else self.embed_dim} for name, tree in self._store.items()},
"commands": len(self.command_registry.commands),
"custom_commands": len(self.command_registry.custom_commands),
"last_route": self._last_route,
"last_plan": self._last_plan,
}
def run_self_test(self) -> Dict[str, Any]:
report = {}
try:
report["embed"] = tuple(self.embed(["hello world"], task="retrieval.passage").shape)
except Exception as e:
report["embed_error"] = repr(e)
try:
self.store_knowledge(["The capital of France is Paris."], tags=["fact"], source="test", importance=1.0)
report["knowledge"] = self.search_knowledge("What is the capital of France?", top_k=1)
except Exception as e:
report["knowledge_error"] = repr(e)
try:
self.store_profile_leaf("user-1", "Likes Japanese food.", importance=0.8, profile_type="user")
report["profile"] = self.search_profile_leaves("user-1", "food preference", profile_type="user", top_k=1)
except Exception as e:
report["profile_error"] = repr(e)
try:
self.store_command("test_cmd", "test command", "A small test command", command_type="custom", actions=[])
report["commands"] = self.search_commands("test command", top_k=1)
except Exception as e:
report["commands_error"] = repr(e)
return {"report": report, "summary": self.summary()}
@property
def profile_tree(self):
return types.SimpleNamespace(search_leaves=lambda profile_id, query, top_k=3, min_score=0.0, use_clusters=True: self.search_profile_leaves(profile_id, query, profile_type="user", top_k=top_k, min_score=min_score))
@property
def conversation_tree(self):
return types.SimpleNamespace(
add_input_leaf=lambda text, id: self._store["conversation"].search(text, top_k=3, hybrid=True, use_clusters=True),
add_output_leaf=lambda text, id: self.store_conversation_leaf(text, id, leaf_type="output"),
)
@property
def knowledge_tree(self):
return types.SimpleNamespace(
retrieve_by_semantics=lambda query, num_clusters=2, top_k_per_cluster=3, min_score=0.0: self._store["knowledge"].retrieve_by_semantics(query, num_clusters=num_clusters, top_k_per_cluster=top_k_per_cluster, min_score=min_score),
search=lambda query, top_k=5, hybrid=True: self._store["knowledge"].search(query, top_k=top_k, hybrid=hybrid),
_embed_text=lambda text: self.embed(text, task="retrieval.passage")[0],
_warm_collection=lambda: self.warmup(),
)
@property
def command_tree(self):
return types.SimpleNamespace(
search_relevant_commands=lambda query, top_k=3: self.search_commands(query, top_k=top_k),
add_command_branch=lambda command_name, command_action, command_type, description: self.store_command(command_action, command_name, description, command_type=command_type),
update_command_description=lambda command_name, new_description: self.command_registry.update_command(command_name, description=new_description),
)
def save(self, path: str = "GATOR.pt"):
return self.save_checkpoint(path)
def __repr__(self) -> str:
return f"GATOR(embedder={self.embedder_name!r}, local_dir={self.embedder_local_dir!r}, stores={list(self._store.keys())}, commands={len(self.command_registry.commands)})"
class MemoryBank:
"""
Thin owner/wrapper around GATOR.
- Loads an existing GATOR.pt if present.
- Builds a new one if missing.
- Uses AppData/LocalAppData as the default location when no path is passed.
- Delegates all unknown attributes/methods directly to the underlying GATOR instance.
"""
DEFAULT_APP_FOLDER = "PackedLLM"
DEFAULT_BUNDLE_NAME = "GATOR.pt"
def __init__(
self,
gator_location: Optional[Union[str, os.PathLike]] = None,
*,
build_if_missing: bool = True,
**gator_kwargs: Any,
):
self.root_dir = self._resolve_root_dir(gator_location)
self.root_dir.mkdir(parents=True, exist_ok=True)
self.checkpoint_path = self._resolve_checkpoint_path(self.root_dir)
if self.checkpoint_path.exists():
self.gator = GATOR.load_checkpoint(str(self.checkpoint_path))
else:
if not build_if_missing:
raise FileNotFoundError(f"No GATOR checkpoint found at: {self.checkpoint_path}")
self.gator = GATOR(**gator_kwargs)
self.gator.save_checkpoint(str(self.checkpoint_path))
@classmethod
def _default_appdata_dir(cls) -> Path:
local_appdata = os.getenv("LOCALAPPDATA")
appdata = os.getenv("APPDATA")
if local_appdata:
base = Path(local_appdata)
elif appdata:
base = Path(appdata)
else:
base = Path.home() / "AppData" / "Local"
return base / cls.DEFAULT_APP_FOLDER
@classmethod
def _resolve_root_dir(cls, gator_location: Optional[Union[str, os.PathLike]]) -> Path:
if gator_location is None:
return cls._default_appdata_dir()
path = Path(gator_location).expanduser().resolve()
if path.suffix.lower() == ".pt":
return path.parent
return path
@classmethod
def _resolve_checkpoint_path(cls, root_dir: Path) -> Path:
return root_dir / cls.DEFAULT_BUNDLE_NAME
def save(self) -> str:
self.root_dir.mkdir(parents=True, exist_ok=True)
self.gator.save_checkpoint(str(self.checkpoint_path))
return str(self.checkpoint_path)
def reload(self) -> None:
if not self.checkpoint_path.exists():
raise FileNotFoundError(str(self.checkpoint_path))
self.gator = GATOR.load_checkpoint(str(self.checkpoint_path))
def rebuild(self, **gator_kwargs: Any) -> None:
self.gator = GATOR(**gator_kwargs)
self.gator.save_checkpoint(str(self.checkpoint_path))
def __getattr__(self, name: str) -> Any:
return getattr(self.gator, name)
def __dir__(self):
base = set(super().__dir__())
try:
base.update(dir(self.gator))
except Exception:
pass
return sorted(base)
def __repr__(self) -> str:
return f"MemoryBank(root_dir={str(self.root_dir)!r}, checkpoint_path={str(self.checkpoint_path)!r})"
class HardwareProbe:
@staticmethod
def cpu() -> Dict[str, Any]:
physical = psutil.cpu_count(logical=False) or psutil.cpu_count(logical=True) or 1
logical = psutil.cpu_count(logical=True) or physical
vm = psutil.virtual_memory()
return {
"physical_cores": physical,
"logical_cores": logical,
"available_ram_gb": vm.available / (1024 ** 3),
"total_ram_gb": vm.total / (1024 ** 3),
}
@staticmethod
def nvidia_gpu() -> Optional[Dict[str, Any]]:
try:
cmd = (
"nvidia-smi --query-gpu=name,memory.total,memory.free,memory.used,"
"utilization.gpu --format=csv,noheader,nounits"
)
out = subprocess.check_output(cmd.split(), stderr=subprocess.DEVNULL).decode("ascii").strip()
if not out:
return None
parts = [p.strip() for p in out.split(",")]
return {
"backend": "cuda",
"device_name": parts[0],
"total_vram_gb": float(parts[1]) / 1024.0,
"free_vram_gb": float(parts[2]) / 1024.0,
"used_vram_gb": float(parts[3]) / 1024.0,
"utilization_pct": float(parts[4]),
}
except Exception:
return None
@staticmethod
def torch_gpu() -> Optional[Dict[str, Any]]:
try:
if torch.cuda.is_available():
idx = torch.cuda.current_device()
free_b, total_b = torch.cuda.mem_get_info(idx)
return {
"backend": "cuda",
"device_name": torch.cuda.get_device_name(idx),
"total_vram_gb": total_b / (1024 ** 3),
"free_vram_gb": free_b / (1024 ** 3),
"used_vram_gb": (total_b - free_b) / (1024 ** 3),
"utilization_pct": None,
}
except Exception:
pass
try:
if getattr(torch.backends, "mps", None) is not None and torch.backends.mps.is_available():
vm = psutil.virtual_memory()
return {
"backend": "metal",
"device_name": "Apple Silicon (MPS)",
"total_vram_gb": vm.total / (1024 ** 3),
"free_vram_gb": vm.available / (1024 ** 3),
"used_vram_gb": (vm.total - vm.available) / (1024 ** 3),
"utilization_pct": None,
}
except Exception:
pass
return None
@staticmethod
def webgpu() -> Optional[Dict[str, Any]]:
if not _WGPU_AVAILABLE:
return None
try:
request = getattr(wgpu.gpu, "request_adapter_sync", None) or getattr(wgpu.gpu, "request_adapter")
adapter = request(power_preference="high-performance")
limits = getattr(adapter, "limits", {}) or {}
max_buffer_bytes = limits.get("max-buffer-size") or limits.get("maxBufferSize") or 0
est_gb = (max_buffer_bytes / (1024 ** 3)) * 0.5 if max_buffer_bytes else 1.0
return {
"backend": "webgpu",
"device_name": getattr(adapter, "summary", "WebGPU adapter"),
"total_vram_gb": est_gb,
"free_vram_gb": est_gb,
"used_vram_gb": 0.0,
"utilization_pct": None,
}
except Exception:
return None
@classmethod
def snapshot(cls) -> Dict[str, Any]:
gpu = cls.nvidia_gpu() or cls.torch_gpu()
webgpu = None if gpu is not None else cls.webgpu()
return {"cpu": cls.cpu(), "gpu": gpu, "webgpu": webgpu}
@dataclass
class RunMetrics:
duration_sec: float
tokens: int
tokens_per_sec: float
template_used: Optional[str]
offload_plan: Dict[str, Any]
telemetry_deltas: Dict[str, Any]
timestamp: float = field(default_factory=time.time)
class ExpertHandle(nn.Module):
def __init__(self, name: str, spec: Dict[str, Any]):
super().__init__()
self.name = name
self.spec: Dict[str, Any] = dict(spec)
self._llama = None
self.last_prompt: str = ""
self.last_response: str = ""
self.last_template_used: Optional[str] = None
self.last_offload_plan: Dict[str, Any] = {}
self.last_metrics: Dict[str, Any] = {}
self.metrics_history: List[Dict[str, Any]] = []
self.call_count: int = 0
self.total_inference_sec: float = 0.0
self.avg_inference_sec: float = 0.0
self.total_tokens_generated: int = 0
self.avg_tokens_per_sec: float = 0.0
self.load_time_sec: Optional[float] = None
def __getstate__(self):
state = self.__dict__.copy()
if self.name == "TranslationExpert" and isinstance(state.get("_llama"), dict):
m_data = state["_llama"]
model = m_data.get("model")
state["_llama"] = {
"state_dict": model.state_dict(),
"config": model.config,
"tokenizer": m_data.get("tokenizer"),
"local_dir": m_data.get("local_dir")
}
elif self.name != "TranslationExpert":
state["_llama"] = None
return state
def __setstate__(self, state):
self.__dict__.update(state)
if self.name == "TranslationExpert" and isinstance(self._llama, dict) and "state_dict" in self._llama:
from transformers import MarianMTModel
data = self._llama
model = MarianMTModel(data["config"])
model.load_state_dict(data["state_dict"])
self._llama = {
"tokenizer": data["tokenizer"],
"model": model,
"local_dir": data["local_dir"]
}
def is_loaded(self) -> bool:
return self._llama is not None
def record_run(
self,
prompt_repr: str,
response: str,
duration_sec: float,
tokens: int,
telemetry_deltas: Dict[str, Any],
template_used: Optional[str] = None,
) -> None:
self.last_prompt = prompt_repr
self.last_response = response
self.last_template_used = template_used
self.call_count += 1
self.total_inference_sec += duration_sec
self.avg_inference_sec = self.total_inference_sec / self.call_count
self.total_tokens_generated += tokens
tps = (tokens / duration_sec) if duration_sec > 0 else 0.0
self.avg_tokens_per_sec = ((self.avg_tokens_per_sec * (self.call_count - 1)) + tps) / self.call_count
entry = asdict(
RunMetrics(
duration_sec=duration_sec,
tokens=tokens,
tokens_per_sec=tps,
template_used=template_used,
offload_plan=self.last_offload_plan,
telemetry_deltas=telemetry_deltas,
)
)
self.last_metrics = entry
self.metrics_history.append(entry)
if len(self.metrics_history) > 25:
self.metrics_history.pop(0)
def forward(self, *args, **kwargs): # pragma: no cover
raise RuntimeError(f"ExpertHandle('{self.name}') is not directly callable.")
# ================================================================
# PackedLM
# ================================================================
class PackedLM(nn.ModuleDict):
DEFAULT_LAYER_GUESS = 32
VRAM_SAFETY_MARGIN = 1.15
WEBGPU_SAFETY_MARGIN = 1.40
_R1_USER_TOKEN = "<|User|>"
_R1_ASSISTANT_TOKEN = "<|Assistant|>"
_XLAM_TASK_INSTRUCTION_DEFAULT = (
"Based on the user's query, decide whether a function call is needed and, if so, "
"produce the correct call(s) using only the tools provided."
)
_XLAM_FORMAT_INSTRUCTION_DEFAULT = (
'Generate a JSON object of the form {"tool_calls": [{"name": "func_name", '
'"arguments": {"arg1": "value1"}}, ...]}. If no function call is needed, '
'return {"tool_calls": []}. Output JSON only, nothing else.'
)
def __init__(self, bundle_path: Optional[str] = DEFAULT_BUNDLE_PATH, auto_load_bundle: bool = True):
super().__init__()
self.bundle_path = bundle_path
self.bundle: Optional[Dict[str, Any]] = None
self.last_expert: str = ""
self._embed_tempfiles: Dict[str, str] = {}
self._hf_translation_dir = str(DEFAULT_ZH_EN_DIR)
if auto_load_bundle and bundle_path and os.path.exists(bundle_path):
self.load_bundle(bundle_path)
def __getstate__(self):
state = self.__dict__.copy()
for name, expert in self.items():
if name == "TranslationExpert":
continue
if hasattr(expert, "_llama"):
expert._llama = None
return state
def load_bundle(self, bundle_path: str) -> "PackedLM":
self.bundle_path = bundle_path
self.bundle = torch.load(bundle_path, map_location="cpu", weights_only=False)
models = self.bundle.get("models", {})
for name, spec in models.items():
self[name] = ExpertHandle(name, spec)
# Optional translation module for zh->en support.
if "TranslationExpert" not in self and "zh_en_translator" not in self:
self["TranslationExpert"] = ExpertHandle(
"TranslationExpert",
{
"kind": "hf_seq2seq",
"repo_id": ZH_EN_REPO_ID,
"local_dir": self._hf_translation_dir,
"source_lang": "zh",
"target_lang": "en",
},
)
return self
def reload_expert(self, expert_name: str) -> "PackedLM":
if expert_name not in self:
raise KeyError(f"Unknown expert '{expert_name}'. Loaded experts: {list(self.keys())}")
expert = self[expert_name]
if expert.spec.get("kind") == "hf_seq2seq":
self._load_translation_backend(force_reload=True)
return self
expert._llama = None
self._get_llama(expert, force_reload=True)
return self
def unload_expert(self, expert_name: str) -> None:
if expert_name in self and hasattr(self[expert_name], "_llama"):
self[expert_name]._llama = None
def unload_all(self) -> None:
for name in list(self.keys()):
self.unload_expert(name)
def summary(self) -> Dict[str, Any]:
out: Dict[str, Any] = {}
for name, expert in self.items():
out[name] = {
"loaded": expert.is_loaded(),
"call_count": expert.call_count,
"avg_inference_sec": round(expert.avg_inference_sec, 4),
"avg_tokens_per_sec": round(expert.avg_tokens_per_sec, 2),
"last_offload_plan": expert.last_offload_plan,
"last_response_preview": (expert.last_response[:160] + "...") if len(expert.last_response) > 160 else expert.last_response,
}
out["_last_expert"] = self.last_expert
return out
@staticmethod
def _tensor_to_bytes(t: Any) -> bytes:
if isinstance(t, bytes):
return t
if torch.is_tensor(t):
return bytes(t.detach().cpu().contiguous().numpy().tobytes())
raise TypeError(f"Unsupported embedded asset type: {type(t)}")
def _resolve_model_path(self, expert: ExpertHandle) -> str:
spec = expert.spec
path = spec.get("path", "")
if path and os.path.exists(path):
return path
assets = (self.bundle or {}).get("assets", {}).get("gguf", {})
embedded = assets.get(expert.name)
if embedded is None:
raise FileNotFoundError(f"Model '{expert.name}' has no external path and no embedded GGUF bytes in the bundle.")
cache_key = expert.name
if cache_key in self._embed_tempfiles and os.path.exists(self._embed_tempfiles[cache_key]):
return self._embed_tempfiles[cache_key]
raw = self._tensor_to_bytes(embedded)
tmp = tempfile.NamedTemporaryFile(prefix=f"{expert.name}_", suffix=".gguf", delete=False)
tmp.write(raw)
tmp.flush()
tmp.close()
self._embed_tempfiles[cache_key] = tmp.name
return tmp.name
def _resolve_projector_path(self, expert: ExpertHandle) -> Optional[str]:
spec = expert.spec
path = spec.get("mmproj_path") or spec.get("clip_model_path") or spec.get("clip_path")
if not path:
return None
if os.path.exists(path):
return path
assets = (self.bundle or {}).get("assets", {}).get("gguf", {})
embedded = assets.get(path)
if embedded is None:
return None
cache_key = f"{expert.name}_mmproj"
if cache_key in self._embed_tempfiles and os.path.exists(self._embed_tempfiles[cache_key]):
return self._embed_tempfiles[cache_key]
raw = self._tensor_to_bytes(embedded)
tmp = tempfile.NamedTemporaryFile(prefix=f"{expert.name}_mmproj_", suffix=".gguf", delete=False)
tmp.write(raw)
tmp.flush()
tmp.close()
self._embed_tempfiles[cache_key] = tmp.name
return tmp.name
def _plan_offload(self, expert: ExpertHandle) -> Dict[str, Any]:
hw = HardwareProbe.snapshot()
spec = expert.spec
try:
model_path = self._resolve_model_path(expert)
file_size_gb = os.path.getsize(model_path) / (1024 ** 3)
except Exception:
file_size_gb = float(spec.get("approx_size_gb", 2.0))
n_layers = int(spec.get("n_layers", self.DEFAULT_LAYER_GUESS))
reasoning: List[str] = []
if hw["gpu"] is not None:
gpu = hw["gpu"]
needed = file_size_gb * self.VRAM_SAFETY_MARGIN
if gpu["free_vram_gb"] >= needed:
n_gpu_layers = -1
reasoning.append(f"{gpu['backend']} GPU '{gpu['device_name']}' has {gpu['free_vram_gb']:.2f}GB free >= {needed:.2f}GB needed -> full offload")
else:
frac = max(0.0, gpu["free_vram_gb"] / needed) if needed > 0 else 0.0
n_gpu_layers = max(0, int(frac * n_layers))
reasoning.append(f"{gpu['backend']} GPU has only {gpu['free_vram_gb']:.2f}GB free of {needed:.2f}GB needed -> partial offload of {n_gpu_layers}/{n_layers} layers")
backend = gpu["backend"]
elif hw["webgpu"] is not None:
webgpu = hw["webgpu"]
needed = file_size_gb * self.WEBGPU_SAFETY_MARGIN
if webgpu["free_vram_gb"] >= needed:
n_gpu_layers = -1
reasoning.append("WebGPU adapter's estimated budget covers the full model -> full offload")
else:
frac = max(0.0, webgpu["free_vram_gb"] / needed) if needed > 0 else 0.0
n_gpu_layers = max(0, int(frac * n_layers))
reasoning.append(f"WebGPU adapter budget covers ~{frac * 100:.0f}% of the model -> partial offload of {n_gpu_layers}/{n_layers} layers")
backend = "webgpu"
else:
n_gpu_layers = 0
backend = "cpu"
reasoning.append("No CUDA/Metal GPU or WebGPU adapter detected -> CPU-only")
n_threads = max(1, hw["cpu"]["physical_cores"] - 1)
return {
"backend": backend,
"n_gpu_layers": n_gpu_layers,
"n_threads": n_threads,
"model_size_gb": round(file_size_gb, 3),
"hardware_snapshot": hw,
"rationale": " | ".join(reasoning),
}
def _get_llama(self, expert: ExpertHandle, force_reload: bool = False):
if expert.is_loaded() and not force_reload:
return expert._llama
if not _LLAMA_CPP_AVAILABLE:
raise RuntimeError("llama-cpp-python is not installed.")
model_path = self._resolve_model_path(expert)
plan = self._plan_offload(expert)
chat_handler = None
if expert.spec.get("vision", False):
projector_path = self._resolve_projector_path(expert)
if projector_path:
from llama_cpp.llama_chat_format import Qwen25VLChatHandler
chat_handler = Qwen25VLChatHandler(clip_model_path=projector_path)
t0 = time.perf_counter()
llama = Llama(
model_path=model_path,
n_ctx=expert.spec.get("ctx", 8192),
n_threads=plan["n_threads"],
n_gpu_layers=plan["n_gpu_layers"],
chat_handler=chat_handler,
verbose=False,
)
expert.load_time_sec = time.perf_counter() - t0
expert.last_offload_plan = plan
expert._llama = llama
return llama
def _load_translation_backend(self):
if "TranslationExpert" not in self:
raise RuntimeError("TranslationExpert not found.")
expert = self["TranslationExpert"]
if isinstance(expert._llama, dict) and "model" in expert._llama:
return expert._llama
if self.bundle and "assets" in self.bundle and "translation" in self.bundle["assets"]:
print("Hydrating TranslationExpert from embedded bundle assets...")
from transformers import MarianMTModel, MarianTokenizer, MarianConfig
import tempfile
import shutil
asset_data = self.bundle["assets"]["translation"]
tok_tmp = Path(tempfile.mkdtemp(prefix="zh_en_tok_"))
try:
for filename, filebytes in asset_data["tokenizer_files"].items():
target_path = tok_tmp / filename
target_path.parent.mkdir(parents=True, exist_ok=True)
target_path.write_bytes(filebytes)
tokenizer = MarianTokenizer.from_pretrained(str(tok_tmp))
finally:
shutil.rmtree(tok_tmp, ignore_errors=True)
config = MarianConfig.from_dict(asset_data["config"])
model = MarianMTModel(config)
model.load_state_dict(asset_data["state_dict"])
expert._llama = {
"tokenizer": tokenizer,
"model": model,
"local_dir": self._hf_translation_dir
}
return expert._llama
raise RuntimeError("Translation model data not found or corrupted in bundle assets.")
def _translate_with_internal_model(self, text: str) -> str:
backend = self._load_translation_backend()
tokenizer = backend["tokenizer"]
model = backend["model"]
inputs = tokenizer(text, return_tensors="pt", truncation=True)
inputs = {k: v.to(model.device) for k, v in inputs.items()}
with torch.no_grad():
generated = model.generate(**inputs, max_new_tokens=128, renormalize_logits=True, repetition_penalty=1.1,)
return tokenizer.batch_decode(generated, skip_special_tokens=True)[0].strip()
def translate_zh_en(self, text: str, template: Optional[Union[str, Callable]] = None) -> str:
del template # reserved for future parity with the other experts
if not text:
return ""
if not CHINESE_RE.search(text):
return text
return self._translate_with_internal_model(text)
def _translate_chinese_spans(self, text: str) -> str:
if not CHINESE_RE.search(text):
return text
def repl(match: re.Match) -> str:
segment = match.group(0).strip()
if not segment:
return segment
try:
translated = self.translate_zh_en(segment)
return translated if translated else segment
except Exception:
return segment
return CHINESE_SPAN_RE.sub(repl, text)
@staticmethod
def _is_url(value: str) -> bool:
try:
return urlparse(value).scheme in ("http", "https")
except Exception:
return False
@staticmethod
def _image_to_data_uri(image_path: str, max_pixels: int = 1_000_000) -> str:
if not _PIL_AVAILABLE:
raise RuntimeError("Pillow is required for local image encoding.")
with Image.open(image_path) as img:
if img.mode not in ("RGB", "RGBA"):
img = img.convert("RGB")
width, height = img.size
if width * height > max_pixels:
scale = (max_pixels / (width * height)) ** 0.5
img = img.resize((int(width * scale), int(height * scale)), Image.Resampling.LANCZOS)
buffer = io.BytesIO()
img.save(buffer, format="PNG")
b64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
return f"data:image/png;base64,{b64}"
@staticmethod
def _build_messages(
template: Optional[Union[str, list, Callable]],
prompt: str,
default_builder: Callable[..., List[Dict[str, Any]]],
**fields: Any,
) -> List[Dict[str, Any]]:
if template is None:
return default_builder(prompt, **fields)
if callable(template) and not isinstance(template, (str, list)):
return template(prompt, **fields)
if isinstance(template, list):
return template
if isinstance(template, str):
base = default_builder(prompt, **fields)
content = template.format(prompt=prompt, **fields)
if base and base[0].get("role") == "system":
return [base[0], {"role": "user", "content": content}]
return [{"role": "user", "content": content}]
raise TypeError("template must be None, a list[dict], a callable, or a str")
def _exec_chat(self, expert_name: str, messages: List[Dict[str, Any]], max_tokens: int, temperature: float, **gen_kwargs):
expert = self[expert_name]
llama = self._get_llama(expert)
t_pre = capture_telemetry()
t0 = time.perf_counter()
out = llama.create_chat_completion(messages=messages, max_tokens=max_tokens, temperature=temperature, **gen_kwargs)
dt = time.perf_counter() - t0
t_post = capture_telemetry()
text = out["choices"][0]["message"]["content"]
tokens = out.get("usage", {}).get("completion_tokens", 0)
deltas = calculate_delta(t_pre, t_post)
return text, tokens, dt, deltas
def _exec_completion(self, expert_name: str, raw_prompt: str, max_tokens: int, temperature: float, stop: Optional[List[str]] = None, **gen_kwargs):
expert = self[expert_name]
llama = self._get_llama(expert)
t_pre = capture_telemetry()
t0 = time.perf_counter()
out = llama.create_completion(prompt=raw_prompt, max_tokens=max_tokens, temperature=temperature, stop=stop, **gen_kwargs)
dt = time.perf_counter() - t0
t_post = capture_telemetry()
text = out["choices"][0]["text"]
tokens = out.get("usage", {}).get("completion_tokens", 0)
deltas = calculate_delta(t_pre, t_post)
return text, tokens, dt, deltas
def _finalize(self, expert_name: str, prompt_repr: str, final_text: str, duration_sec: float, tokens: int, telemetry_deltas: Dict[str, Any], template_used: Optional[str]) -> str:
expert = self[expert_name]
expert.record_run(prompt_repr=prompt_repr, response=final_text, duration_sec=duration_sec, tokens=tokens, telemetry_deltas=telemetry_deltas, template_used=template_used)
self.last_expert = expert_name
return final_text
def _default_creative_messages(self, prompt: str, tone: Optional[str] = None, length: Optional[str] = None, pov: Optional[str] = None, style: Optional[str] = None) -> List[Dict[str, Any]]:
system = "You are a creative writing assistant."
if tone:
system += f" Match this tone: {tone}."
constraints = [c for c in (f"length: {length}" if length else None, f"POV: {pov}" if pov else None, f"style: {style}" if style else None) if c]
user = prompt + ("\n\nConstraints: " + ", ".join(constraints) if constraints else "")
return [{"role": "system", "content": system}, {"role": "user", "content": user}]
def creative_expert(self, prompt: str, tone: Optional[str] = None, length: Optional[str] = None, pov: Optional[str] = None, style: Optional[str] = None, template: Optional[Union[str, list, Callable]] = None, max_tokens: int = 600, temperature: float = 0.9, **gen_kwargs) -> str:
messages = self._build_messages(template, prompt, self._default_creative_messages, tone=tone, length=length, pov=pov, style=style)
text, tokens, dt, deltas = self._exec_chat("CreativeExpert", messages, max_tokens, temperature, **gen_kwargs)
return self._finalize("CreativeExpert", prompt, text, dt, tokens, deltas, "custom" if template is not None else "default")
def _default_code_messages(self, prompt: str, language: Optional[str] = None, context: Optional[str] = None, constraints: Optional[str] = None) -> List[Dict[str, Any]]:
parts = [f"Task: {prompt}"]
if language:
parts.append(f"Language: {language}")
if context:
parts.append(f"Context:\n{context}")
if constraints:
parts.append(f"Constraints:\n{constraints}")
return [{"role": "user", "content": "\n".join(parts)}]
def code_expert(self, prompt: str, language: Optional[str] = None, context: Optional[str] = None, constraints: Optional[str] = None, template: Optional[Union[str, list, Callable]] = None, max_tokens: int = 1200, temperature: float = 0.2, **gen_kwargs) -> str:
messages = self._build_messages(template, prompt, self._default_code_messages, language=language, context=context, constraints=constraints)
text, tokens, dt, deltas = self._exec_chat("CodeExpert", messages, max_tokens, temperature, **gen_kwargs)
return self._finalize("CodeExpert", prompt, text, dt, tokens, deltas, "custom" if template is not None else "default")
def _logic_default_raw(self, prompt: str, mode: str):
instruction = prompt.strip()
if mode == "deep_then_answer":
instruction += "\nPlease reason step by step, then provide the final answer succinctly."
raw = f"{self._R1_USER_TOKEN}{instruction}{self._R1_ASSISTANT_TOKEN}"
return raw, None, lambda t: t.strip()
if mode == "think_only":
instruction += "\nPlease reason step by step, and do not provide a final answer."
raw = f"{self._R1_USER_TOKEN}{instruction}{self._R1_ASSISTANT_TOKEN}<think>\n"
return raw, ["</think>"], lambda t: "<think>\n" + t.strip() + "\n</think>"
if mode == "skip_reasoning":
raw = f"{self._R1_USER_TOKEN}{prompt.strip()}{self._R1_ASSISTANT_TOKEN}<think>\n\n</think>\n\n"
return raw, None, lambda t: t.strip()
raise ValueError(f"Unknown LogicExpert mode: {mode!r}")
def logic_expert(self, prompt: str, mode: str = "deep_then_answer", template: Optional[Union[str, Callable]] = None, max_tokens: int = 1024, temperature: float = 0.6, **gen_kwargs) -> str:
if mode not in ("deep_then_answer", "skip_reasoning", "think_only"):
raise ValueError("mode must be one of: deep_then_answer, skip_reasoning, think_only")
if template is not None:
stop = gen_kwargs.pop("stop", None)
raw_prompt = template(prompt, mode) if callable(template) else str(template).format(prompt=prompt, mode=mode)
wrap = lambda t: t
else:
raw_prompt, stop, wrap = self._logic_default_raw(prompt, mode)
raw_text, tokens, dt, deltas = self._exec_completion("LogicExpert", raw_prompt, max_tokens, temperature, stop=stop, **gen_kwargs)
final_text = wrap(raw_text)
return self._finalize("LogicExpert", prompt, final_text, dt, tokens, deltas, "custom" if template is not None else mode)
def _default_role_messages(self, prompt: str, character_card: Optional[str] = None) -> List[Dict[str, Any]]:
system = character_card or "You are roleplaying a character."
system += " Respond using Classic Internet RP formatting: *action* speech *narration*."
return [{"role": "system", "content": system}, {"role": "user", "content": prompt}]
def role_expert(self, prompt: str, character_card: Optional[str] = None, template: Optional[Union[str, list, Callable]] = None, max_tokens: int = 400, temperature: float = 0.9, **gen_kwargs) -> str:
messages = self._build_messages(template, prompt, self._default_role_messages, character_card=character_card)
text, tokens, dt, deltas = self._exec_chat("RoleExpert", messages, max_tokens, temperature, **gen_kwargs)
return self._finalize("RoleExpert", prompt, text, dt, tokens, deltas, "custom" if template is not None else "default")
def _default_affect_messages(self, text: str) -> List[Dict[str, Any]]:
system = "You are a compact classifier. Output only valid JSON."
user = f'Classify the emotional tone of this text:\n{text}\nReturn: {{"emotion": "...", "confidence": 0-1, "evidence": "..."}}'
return [{"role": "system", "content": system}, {"role": "user", "content": user}]
def affect_expert(self, text: str, template: Optional[Union[str, list, Callable]] = None, max_tokens: int = 300, temperature: float = 0.3, **gen_kwargs) -> str:
messages = self._build_messages(template, text, self._default_affect_messages)
out, tokens, dt, deltas = self._exec_chat("AffectExpert", messages, max_tokens, temperature, **gen_kwargs)
return self._finalize("AffectExpert", text, out, dt, tokens, deltas, "custom" if template is not None else "default")
def _default_vision_messages(self, prompt: str, image: Optional[str] = None) -> List[Dict[str, Any]]:
if not image:
return [{"role": "user", "content": prompt}]
data_uri = image if self._is_url(image) else self._image_to_data_uri(image)
content = [
{"type": "image_url", "image_url": {"url": data_uri}},
{"type": "text", "text": prompt},
]
return [{"role": "user", "content": content}]
def _run_multimodal(self, expert_name: str, prompt: str, image: Optional[str], template: Optional[Union[str, list, Callable]], max_tokens: int, temperature: float, **gen_kwargs) -> str:
messages = self._build_messages(template, prompt, self._default_vision_messages, image=image)
text, tokens, dt, deltas = self._exec_chat(expert_name, messages, max_tokens, temperature, **gen_kwargs)
return self._finalize(expert_name, prompt, text, dt, tokens, deltas, "custom" if template is not None else "default")
def vision_expert(self, prompt: str, image: Optional[str] = None, template: Optional[Union[str, list, Callable]] = None, max_tokens: int = 512, temperature: float = 0.4, **gen_kwargs) -> str:
return self._run_multimodal("VisionExpert", prompt, image, template, max_tokens, temperature, **gen_kwargs)
def head_expert(self, prompt: str, image: Optional[str] = None, template: Optional[Union[str, list, Callable]] = None, max_tokens: int = 512, temperature: float = 0.4, **gen_kwargs) -> str:
return self._run_multimodal("HeadExpert", prompt, image, template, max_tokens, temperature, **gen_kwargs)
def _default_math_messages(self, prompt: str) -> List[Dict[str, Any]]:
system = "You are a precise math and reasoning assistant."
user = f"Solve the following. Show formulas, compute carefully, and state the final answer clearly.\n\n{prompt}"
return [{"role": "system", "content": system}, {"role": "user", "content": user}]
def math_expert(self, prompt: str, template: Optional[Union[str, list, Callable]] = None, max_tokens: int = 500, temperature: float = 0.2, **gen_kwargs) -> str:
messages = self._build_messages(template, prompt, self._default_math_messages)
text, tokens, dt, deltas = self._exec_chat("MathExpert", messages, max_tokens, temperature, repeat_penalty=1.15, **gen_kwargs)
if CHINESE_RE.search(text):
text = self._translate_chinese_spans(text)
text = collapse_repeated_lines(text, max_repeat=1)
return self._finalize("MathExpert", prompt, text, dt, tokens, deltas, "custom" if template is not None else "default")
def _default_tool_prompt(self, task_instruction: str, tools_json: str, format_instruction: str, query: str) -> str:
return (
"You are an AI assistant for function calling. For politically sensitive questions, "
"security and privacy issues, and other non-computer science questions, you will refuse "
"to answer\n"
"### Instruction:\n"
f"[BEGIN OF TASK INSTRUCTION]\n{task_instruction}\n[END OF TASK INSTRUCTION]\n\n"
f"[BEGIN OF AVAILABLE TOOLS]\n{tools_json}\n[END OF AVAILABLE TOOLS]\n\n"
f"[BEGIN OF FORMAT INSTRUCTION]\n{format_instruction}\n[END OF FORMAT INSTRUCTION]\n\n"
f"[BEGIN OF QUERY]\n{query}\n[END OF QUERY]\n\n"
"### Response:\n"
)
@staticmethod
def _safe_parse_tool_json(text: str) -> Optional[Dict[str, Any]]:
text = text.strip()
try:
return json.loads(text)
except json.JSONDecodeError:
start, end = text.find("{"), text.rfind("}")
if start != -1 and end != -1 and end > start:
try:
return json.loads(text[start:end + 1])
except json.JSONDecodeError:
return None
return None
def tool_expert(self, query: str, tools: Optional[List[Dict[str, Any]]] = None, task_instruction: Optional[str] = None, format_instruction: Optional[str] = None, template: Optional[Union[str, Callable]] = None, max_tokens: int = 512, temperature: float = 0.2, **gen_kwargs) -> str:
tools = tools or []
task_instruction = task_instruction or self._XLAM_TASK_INSTRUCTION_DEFAULT
format_instruction = format_instruction or self._XLAM_FORMAT_INSTRUCTION_DEFAULT
tools_json = json.dumps(tools, indent=2)
if template is not None:
stop = gen_kwargs.pop("stop", ["### Instruction:"])
raw_prompt = template(query, tools, task_instruction, format_instruction) if callable(template) else str(template).format(query=query, tools=tools_json, task_instruction=task_instruction, format_instruction=format_instruction)
else:
stop = ["### Instruction:"]
raw_prompt = self._default_tool_prompt(task_instruction, tools_json, format_instruction, query)
raw_text, tokens, dt, deltas = self._exec_completion("ToolExpert", raw_prompt, max_tokens, temperature, stop=stop, **gen_kwargs)
parsed = self._safe_parse_tool_json(raw_text)
final_text = json.dumps(parsed, indent=2) if parsed is not None else raw_text.strip()
return self._finalize("ToolExpert", query, final_text, dt, tokens, deltas, "custom" if template is not None else "default")
def translation_expert(self, text: str) -> str:
return self.translate_zh_en(text)
def save_checkpoint(self, path: str = DEFAULT_CHECKPOINT_PATH) -> None:
self.unload_all()
torch.save(self, path)
@classmethod
def load_checkpoint(cls, path: str = DEFAULT_CHECKPOINT_PATH) -> "PackedLM":
return torch.load(path, map_location="cpu", weights_only=False)
def run_self_test(self, image_test_source: Optional[str] = None) -> Dict[str, Any]:
prompts = {
"CreativeExpert": "Write a short fantasy story (150 words max) about a dragon that discovers a computer hidden beneath a mountain.",
"CodeExpert": "Write a Python implementation of quicksort. Complete runnable function, include comments, briefly explain time complexity.",
"LogicExpert": "All robots can compute. Some computers are robots. No calculators are robots. What conclusions can be logically inferred?",
"RoleExpert": "Explain how transformer attention works while staying fully in character.",
"HeadExpert": "Explain how Mixture-of-Experts (MoE) routing works. Focus on: expert selection, gating, token routing, and efficiency benefits.",
"MathExpert": "A train travels 120 miles in 2 hours and then 180 miles in 3 hours. What was its average speed for the entire trip?",
"ToolExpert": "What is the weather in St. Louis right now?",
"AffectExpert": "Your core navigation router is completely dropping telemetry packets! Fix this or we pull our implementation down tonight!",
"VisionExpert": "Describe the visual elements, layout, and any text in this image.",
"TranslationExpert": "你好,世界。这个模型应该把中文翻译成英文。",
}
report: Dict[str, Any] = {}
for name in self.keys():
if name not in prompts:
continue
try:
if name == "LogicExpert":
report[name] = {mode: self.logic_expert(prompts[name], mode=mode) for mode in ("deep_then_answer", "skip_reasoning", "think_only")}
elif name in ("HeadExpert", "VisionExpert"):
img = image_test_source if (image_test_source and os.path.exists(image_test_source)) else None
method = self.head_expert if name == "HeadExpert" else self.vision_expert
report[name] = method(prompts[name], image=img)
elif name == "ToolExpert":
report[name] = self.tool_expert(prompts[name], tools=[
{"name": "get_weather", "description": "Get current weather for a city", "parameters": {"city": "string"}},
{"name": "send_discord_message", "description": "Send a message to a Discord webhook", "parameters": {"webhook_url": "string", "content": "string"}},
])
elif name == "TranslationExpert":
report[name] = self.translation_expert(prompts[name])
else:
dispatch = {
"CreativeExpert": self.creative_expert,
"CodeExpert": self.code_expert,
"RoleExpert": self.role_expert,
"AffectExpert": self.affect_expert,
"MathExpert": self.math_expert,
}
report[name] = dispatch[name](prompts[name])
except Exception as e:
report[name] = f"[ERROR] {type(e).__name__}: {e}"
return {"per_expert_responses": report, "summary": self.summary()}
__all__ = ["PackedLM", "ExpertHandle", "HardwareProbe", "capture_telemetry", "calculate_delta"]
class PackedLMCheckpointRuntime:
"""Load and operate a serialized PackedLM checkpoint.
Parameters
----------
checkpoint_path:
Path to the saved `LM.pt` checkpoint.
packedlm_module:
Optional module name to import before loading. Use this when the
`PackedLM` class lives in a separate Python module.
Examples: "packedlm", "my_project.packedlm", or None if the class
is already imported in the current process.
map_location:
Passed to `torch.load`. Usually "cpu".
weights_only:
Must be False for a full object checkpoint saved with `torch.save(obj, ...)`.
strict_type_check:
If True, verifies that the loaded object looks like a PackedLM instance.
"""
def __init__(
self,
checkpoint_path: Union[str, Path] = "LM.pt",
packedlm_module: Optional[str] = None,
map_location: str = "cpu",
weights_only: bool = False,
strict_type_check: bool = True,
):
self.checkpoint_path = Path(checkpoint_path)
self.packedlm_module = packedlm_module
self.map_location = map_location
self.weights_only = weights_only
self.strict_type_check = strict_type_check
self.model: Any = None
self.load()
def _import_checkpoint_module(self) -> None:
if not self.packedlm_module:
return
importlib.import_module(self.packedlm_module)
def _validate_model(self) -> None:
if self.model is None:
raise RuntimeError("PackedLM checkpoint is not loaded.")
required_attrs = [
"creative_expert",
"code_expert",
"logic_expert",
"role_expert",
"affect_expert",
"head_expert",
"vision_expert",
"math_expert",
"tool_expert",
"translation_expert",
"summary",
"run_self_test",
]
missing = [name for name in required_attrs if not hasattr(self.model, name)]
if missing and self.strict_type_check:
raise TypeError(
"Loaded object does not look like a PackedLM instance. Missing: "
+ ", ".join(missing)
)
def load(self) -> Any:
if not self.checkpoint_path.exists():
raise FileNotFoundError(f"Checkpoint not found: {self.checkpoint_path}")
self._import_checkpoint_module()
# The checkpoint is a full serialized object, so weights_only must be False.
self.model = torch.load(
self.checkpoint_path,
map_location=self.map_location,
weights_only=self.weights_only,
)
self._validate_model()
return self.model
def reload(self) -> Any:
self.model = None
return self.load()
def __getattr__(self, name: str) -> Any:
# Delegate unknown attributes to the loaded PackedLM object.
if name in {"model", "checkpoint_path", "packedlm_module", "map_location", "weights_only", "strict_type_check"}:
return super().__getattribute__(name)
if self.model is not None and hasattr(self.model, name):
return getattr(self.model, name)
raise AttributeError(name)
def __getitem__(self, key: str) -> Any:
return self.model[key]
def __contains__(self, key: str) -> bool:
return key in self.model
@property
def last_expert(self) -> str:
return getattr(self.model, "last_expert", "")
def experts(self) -> List[str]:
return list(self.model.keys())
def summary(self) -> Dict[str, Any]:
return self.model.summary()
def creative(
self,
prompt: str,
**kwargs: Any,
) -> str:
return self.model.creative_expert(prompt, **kwargs)
def code(
self,
prompt: str,
**kwargs: Any,
) -> str:
return self.model.code_expert(prompt, **kwargs)
def logic(
self,
prompt: str,
mode: str = "deep_then_answer",
**kwargs: Any,
) -> str:
return self.model.logic_expert(prompt, mode=mode, **kwargs)
def role(
self,
prompt: str,
**kwargs: Any,
) -> str:
return self.model.role_expert(prompt, **kwargs)
def affect(
self,
text: str,
**kwargs: Any,
) -> str:
return self.model.affect_expert(text, **kwargs)
def head(
self,
prompt: str,
image: Optional[str] = None,
**kwargs: Any,
) -> str:
return self.model.head_expert(prompt, image=image, **kwargs)
def vision(
self,
prompt: str,
image: Optional[str] = None,
**kwargs: Any,
) -> str:
return self.model.vision_expert(prompt, image=image, **kwargs)
def math(
self,
prompt: str,
**kwargs: Any,
) -> str:
text=self.model.math_expert(prompt, **kwargs)
text=collapse_repeated_lines(text)
return text
def tool(
self,
query: str,
tools: Optional[List[Dict[str, Any]]] = None,
**kwargs: Any,
) -> str:
return self.model.tool_expert(query, tools=tools, **kwargs)
def translation(self, text: str) -> str:
return self.model.translation_expert(text)
def reload_expert(self, expert_name: str) -> Any:
return self.model.reload_expert(expert_name)
def unload_expert(self, expert_name: str) -> None:
return self.model.unload_expert(expert_name)
def unload_all(self) -> None:
return self.model.unload_all()
def save_checkpoint(self, path: Union[str, Path] = "LM.pt") -> None:
return self.model.save_checkpoint(str(path))
def run_self_test(self, image_test_source: Optional[str] = None) -> Dict[str, Any]:
return self.model.run_self_test(image_test_source=image_test_source)
def __repr__(self) -> str:
model_name = type(self.model).__name__ if self.model is not None else "<unloaded>"
return f"PackedLMCheckpointRuntime(checkpoint_path={self.checkpoint_path!s}, model={model_name})"
def load_packedlm(
checkpoint_path: Union[str, Path] = "LM.pt",
packedlm_module: Optional[str] = None,
map_location: str = "cpu",
weights_only: bool = False,
strict_type_check: bool = True,
) -> PackedLMCheckpointRuntime:
"""Load a PackedLM checkpoint into a runtime wrapper."""
return PackedLMCheckpointRuntime(
checkpoint_path=checkpoint_path,
packedlm_module=packedlm_module,
map_location=map_location,
weights_only=weights_only,
strict_type_check=strict_type_check,
)
@dataclass
class RouteStep:
expert: str
sub_prompt: str
goal: str
kwargs: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ExecutionContext:
prompt: str
image: Optional[str] = None
tools: Optional[List[Dict[str, Any]]] = None
deep_think: bool = False
fast_think: bool = False
think_blocks: Dict[str, str] = field(default_factory=dict)
response_goal: Dict[str, Any] = field(default_factory=dict)
route: List["RouteStep"] = field(default_factory=list)
step_results: List[Dict[str, Any]] = field(default_factory=list)
base_response: str = ""
affective_state: Dict[str, Any] = field(default_factory=dict)
final_response: str = ""
final_review: Dict[str, Any] = field(default_factory=dict)
command_context: Dict[str, Any] = field(default_factory=dict)
class PackedLLM(nn.ModuleDict):
MODEL_EXPERTS: List[str] = [
"head_expert", "affect_expert", "role_expert", "creative_expert",
"code_expert", "logic_expert", "math_expert", "vision_expert",
"tool_expert", "translation_expert",
]
PIPELINE_EXPERTS: List[str] = ["action_expert", "web_expert"]
REQUIRED_EXPERTS: List[str] = MODEL_EXPERTS + PIPELINE_EXPERTS
MAX_STEP_RETRIES: int = 3
MAX_ACTION_ATTEMPTS: int = 3
MAX_WEB_ROUNDS: int = 3
_CHECKPOINT_FORMAT_VERSION: int = 3
_STAGE_SETTINGS: Dict[str, Dict[str, Any]] = {
"head_plan_response_goal": {"temperature": 0.2},
"head_build_route": {"temperature": 0.5},
"head_retry_or_reroute": {"temperature": 0.8, "top_p": 0.9},
"head_plan_detour": {"temperature": 1.0, "top_p": 0.95},
"head_synthesize_base": {"temperature": 1.0, "top_p": 0.95},
"head_review_final_response": {"temperature": 0.0},
"head_action_review": {"temperature": 1.0, "top_p": 0.95},
"head_web_queries": {"temperature": 0.5},
"head_web_answer_subquery": {"temperature": 0.5},
"head_web_review": {"temperature": 1.0, "top_p": 0.95},
"head_web_synthesis": {"temperature": 0.8, "top_p": 0.9},
"head_validate_response": {"temperature": 0.8, "top_p": 0.9},
"affect_evaluate_step": {"temperature": 0.2},
"affect_build_affective_state": {"temperature": 0.5},
"role_apply_persona": {"temperature": 1.0, "top_p": 0.95},
"logic_action_planning": {"temperature": 0.0},
"code_action_generation": {"temperature": 0.0},
"logic_action_repair": {"temperature": 0.0},
"deep_think": {"temperature": 0.0},
}
def __init__(
self,
bot_id: Optional[str] = None,
user_id: Optional[str] = None,
model_dir: str = "models",
memory_dir: Optional[str] = None,
web: bool = False,
hardware_probe: bool = True,
expert_modules: Optional[Dict[str, nn.Module]] = None,
packedlm_checkpoint: Optional[str] = "LM.pt",
packedlm_module: Optional[str] = "PackedLM",
):
super().__init__()
self.bot_id = bot_id
self.user_id = user_id
self.model_dir = model_dir
self.memory_dir = memory_dir
self._hardware_probe_enabled = hardware_probe
self.packedlm_checkpoint = self._resolve_local_path(packedlm_checkpoint)
self.packedlm_module = packedlm_module
self._memory_bank: Any = None
self._bot_profile: Dict[str, Any] = {}
self._user_profile: Dict[str, Any] = {}
self._hardware_state: Dict[str, Any] = {}
self._web: Any = None
self._codebox: Any = None
self._packedlm_runtime: Optional[Any] = None
self._runtime_expert_names: List[str] = []
self._init_packedlm_runtime()
if self._packedlm_runtime is None:
self._build_experts(expert_modules)
self._init_memory()
self._load_profiles()
if hardware_probe:
self._probe_hardware()
if web:
self._attach_web()
# ------------------------------------------------------------------
# Pickle support – live handles must never be serialised
# ------------------------------------------------------------------
def __getstate__(self):
state = self.__dict__.copy()
state["_packedlm_runtime"] = None
state["_web"] = None
state["_codebox"] = None
state["_memory_bank"] = None
return state
def __setstate__(self, state):
self.__dict__.update(state)
self._runtime_expert_names = self.__dict__.get("_runtime_expert_names", [])
self._packedlm_runtime = None
self._web = None
self._codebox = None
self._memory_bank = None
self._init_packedlm_runtime()
# ------------------------------------------------------------------
# Path helpers
# ------------------------------------------------------------------
def _resolve_local_path(self, path: Optional[str]) -> Optional[str]:
if not path:
return None
p = Path(path)
if p.is_absolute():
return str(p)
script_dir = Path(__file__).resolve().parent
candidate = script_dir / p
if candidate.exists():
return str(candidate)
return str(p.resolve())
def _project_root(self) -> Path:
try:
return Path(__file__).resolve().parent
except Exception:
return Path.cwd()
# ------------------------------------------------------------------
# Initialisation helpers
# ------------------------------------------------------------------
def _init_packedlm_runtime(self) -> None:
if not self.packedlm_checkpoint or PackedLMCheckpointRuntime is None:
return
if not os.path.exists(self.packedlm_checkpoint):
return
try:
self._packedlm_runtime = PackedLMCheckpointRuntime(
checkpoint_path=self.packedlm_checkpoint,
packedlm_module=self.packedlm_module,
map_location="cpu",
weights_only=False,
strict_type_check=True,
)
raw_names = list(self._packedlm_runtime.experts())
self._runtime_expert_names = _expert_names_canonical(raw_names)
except Exception as exc:
print(f"[PackedLLM] Warning: could not load PackedLM runtime: {exc}")
self._packedlm_runtime = None
self._runtime_expert_names = []
def _build_experts(self, expert_modules: Optional[Dict[str, nn.Module]]) -> None:
if expert_modules:
for key, module in expert_modules.items():
self[key] = module
return
class_map = {
"head_expert": ("HeadExpert", {"model_dir": self.model_dir}),
"affect_expert": ("AffectExpert", {"model_dir": self.model_dir}),
"role_expert": ("RoleExpert", {"model_dir": self.model_dir}),
"creative_expert": ("CreativeExpert", {"model_dir": self.model_dir}),
"code_expert": ("CodeExpert", {"model_dir": self.model_dir}),
"logic_expert": ("LogicExpert", {"model_dir": self.model_dir}),
"math_expert": ("MathExpert", {"model_dir": self.model_dir}),
"vision_expert": ("VisionExpert", {"model_dir": self.model_dir}),
"tool_expert": ("ToolExpert", {"model_dir": self.model_dir}),
"translation_expert": ("TranslationExpert", {"model_dir": self.model_dir}),
"action_expert": ("ActionExpert", {"model_dir": self.model_dir}),
"web_expert": ("WebExpert", {"model_dir": self.model_dir}),
}
frame_globals: Dict[str, Any] = {}
try:
frame = inspect.stack()[2].frame
frame_globals = frame.f_globals
except Exception:
pass
for key, (cls_name, kwargs) in class_map.items():
cls = frame_globals.get(cls_name) or builtins.__dict__.get(cls_name)
if cls is not None:
try:
self[key] = cls(**kwargs)
except Exception as exc:
print(f"[PackedLLM] Warning: could not instantiate {cls_name}: {exc}")
def _init_memory(self) -> None:
if MemoryBank is None:
return
try:
root = Path(self.memory_dir).expanduser().resolve() if self.memory_dir else self._project_root()
self._memory_bank = MemoryBank(
gator_location=root,
build_if_missing=False,
)
if hasattr(self._memory_bank, "gator") and hasattr(self._memory_bank.gator, "set_lazy"):
self._memory_bank.gator.set_lazy(True)
except Exception as exc:
print(f"[PackedLLM] Warning: MemoryBank degraded mode: {exc}")
self._memory_bank = None
def _load_profiles(self) -> None:
if self._memory_bank is None:
return
if self.bot_id:
try:
self._bot_profile = _safe_call(self._memory_bank, "get_profile", "bot", self.bot_id, default={}) or {}
except Exception:
self._bot_profile = {}
if self.user_id:
try:
self._user_profile = _safe_call(self._memory_bank, "get_profile", "user", self.user_id, default={}) or {}
except Exception:
self._user_profile = {}
def _probe_hardware(self) -> None:
state: Dict[str, Any] = {
"platform": platform.system(),
"python_version": platform.python_version(),
"cpu_count": os.cpu_count(),
}
if psutil is not None:
try:
vm = psutil.virtual_memory()
state["ram_total_gb"] = round(vm.total / 1e9, 1)
state["ram_available_gb"] = round(vm.available / 1e9, 1)
state["ram_percent_used"] = vm.percent
except Exception:
pass
if torch.cuda.is_available():
try:
state["gpu_name"] = torch.cuda.get_device_name(0)
props = torch.cuda.get_device_properties(0)
state["gpu_vram_total_gb"] = round(props.total_memory / 1e9, 1)
state["gpu_vram_used_gb"] = round(torch.cuda.memory_allocated(0) / 1e9, 2)
except Exception:
pass
else:
state["gpu"] = "none"
self._hardware_state = state
def _attach_web(self) -> None:
try:
if Web is None:
raise RuntimeError("CompileWeb.Web unavailable")
self._web = Web()
except Exception as exc:
print(f"[PackedLLM] Warning: Web module unavailable: {exc}")
self._web = None
def _get_codebox(self) -> Any:
if self._codebox is not None:
return self._codebox
if Box is not None:
try:
self._codebox = Box()
return self._codebox
except Exception as exc:
print(f"[PackedLLM] Warning: Box unavailable: {exc}")
if CodeBox is not None:
try:
self._codebox = CodeBox()
return self._codebox
except Exception as exc:
print(f"[PackedLLM] Warning: CodeBox unavailable: {exc}")
return None
def _stage_kwargs(self, stage: str) -> Dict[str, Any]:
return dict(self._STAGE_SETTINGS.get(stage, {}))
def _inject_think_blocks(
self,
ctx: ExecutionContext,
*,
stage: str,
target_expert: str,
task_prompt: str,
output_contract: str,
constraints: Optional[List[str]] = None,
) -> str:
if not ctx.deep_think or target_expert == "translation_expert":
return task_prompt
cache_key = f"{stage}::{target_expert}::{hash(task_prompt) & 0xFFFFFFFF}"
if cache_key in ctx.think_blocks:
think = ctx.think_blocks[cache_key]
else:
constraints_text = "\n".join(f"- {c}" for c in (constraints or [])) or "- none"
think_prompt = (
"You are LogicExpert. Produce PRIVATE planning blocks only.\n"
"Return ONLY <think>...</think> blocks.\n"
"Do not answer the task. Do not produce JSON. Do not produce prose.\n"
"Make the blocks task-specific, brief, and directly useful to the target expert.\n\n"
f"Stage: {stage}\n"
f"Target expert: {target_expert}\n"
f"Task prompt:\n{task_prompt}\n\n"
f"Output contract:\n{output_contract}\n\n"
f"Constraints:\n{constraints_text}\n\n"
"Generate 2 to 4 blocks covering: objective, risks, format, and the safest path."
)
raw = self._call_expert("logic_expert", think_prompt, **self._stage_kwargs("deep_think"))
think = self._extract_think_blocks(raw)
ctx.think_blocks[cache_key] = think
if think.strip():
return (
"<think_blocks>\n"
f"{think.strip()}\n"
"</think_blocks>\n\n"
f"{task_prompt}"
)
return task_prompt
def _extract_think_blocks(self, text: str) -> str:
if not text:
return ""
blocks = re.findall(
r"<think>.*?</think>",
text,
flags=re.IGNORECASE | re.DOTALL,
)
if blocks:
return "\n".join(
b.strip()
for b in blocks
if b.strip()
)
text = text.strip()
if not text:
return ""
return f"<think>{text}</think>"
def _route_allowed(self, ctx: ExecutionContext, expert: str) -> bool:
if ctx.fast_think:
return expert in {"head_expert", "role_expert", "translation_expert"}
return True
def _fast_path_response(self, ctx: ExecutionContext) -> str:
prompt = (
"Answer the user's request directly and as fast as possible.\n"
"Do not call tools. Do not browse. Do not write JSON.\n"
"Do not add meta commentary.\n\n"
f"User prompt: {ctx.prompt}"
)
return self._call_expert("head_expert", prompt, **self._stage_kwargs("head_synthesize_base")).strip()
def _fast_apply_persona(self, ctx: ExecutionContext, base_response: str) -> str:
if not self._bot_profile:
return base_response
character_card = self._bot_profile.get("character_card", "")
user_name = self._user_profile.get("name", "the user")
prompt = (
f"<instructions>\nRewrite the base response in the character voice while preserving facts.\n"
f"Stay concise and accurate.\n</instructions>\n\n"
f"<character_card>{character_card}</character_card>\n"
f"<recipient>{user_name}</recipient>\n"
f"<original_prompt>{ctx.prompt}</original_prompt>\n"
f"<base_response>{base_response}</base_response>"
)
return self._call_expert("role_expert", prompt,
**self._stage_kwargs("role_apply_persona")).strip() or base_response
# ------------------------------------------------------------------
# Forward pass
# ------------------------------------------------------------------
def forward(
self,
prompt: str,
image: Optional[str] = None,
tools: Optional[List[Dict[str, Any]]] = None,
stream: bool = False,
deep_think: bool = False,
fast_think: bool = False,
) -> Union[str, Generator[str, None, None]]:
ctx = ExecutionContext(
prompt=prompt,
image=image,
tools=tools,
deep_think=deep_think,
fast_think=fast_think,
)
# Fast path: minimal work, no memory, no action, no web.
if fast_think:
base = self._fast_path_response(ctx)
final = self._fast_apply_persona(ctx, base) if self._bot_profile else base
if stream:
return self._stream_response(final)
return final
self._plan_response_goal(ctx)
self._consult_commands(ctx)
self._build_route(ctx)
self._execute_route(ctx)
self._synthesize_base(ctx)
self._build_affective_state(ctx)
self._apply_persona(ctx)
self._review_final_response(ctx)
self._finalize(ctx)
if stream:
return self._stream_response(ctx.final_response)
return ctx.final_response
# ------------------------------------------------------------------
# Expert dispatch
# ------------------------------------------------------------------
def _call_expert(self, key: str, *args: Any, **kwargs: Any) -> str:
# Normalise key to snake_case so callers don't need to worry.
key = _normalise_expert_name(key)
if self._packedlm_runtime is not None:
dispatch = {
"creative_expert": self._packedlm_runtime.creative,
"code_expert": self._packedlm_runtime.code,
"logic_expert": self._packedlm_runtime.logic,
"role_expert": self._packedlm_runtime.role,
"affect_expert": self._packedlm_runtime.affect,
"head_expert": self._packedlm_runtime.head,
"vision_expert": self._packedlm_runtime.vision,
"math_expert": self._packedlm_runtime.math,
"tool_expert": self._packedlm_runtime.tool,
"translation_expert": self._packedlm_runtime.translation,
"action_expert": getattr(self._packedlm_runtime, "action", None),
"web_expert": getattr(self._packedlm_runtime, "web", None),
}
fn = dispatch.get(key)
if fn is None:
return ""
try:
result = fn(*args, **kwargs)
return str(result) if result is not None else ""
except Exception as exc:
print(f"[PackedLLM] _call_expert(runtime:{key}) error: {exc}")
return ""
expert = self._get_expert(key)
if expert is None:
return ""
try:
result = expert(*args, **kwargs)
print(result)
return str(result) if result is not None else ""
except Exception as exc:
print(f"[PackedLLM] _call_expert({key}) error: {exc}")
return ""
# ------------------------------------------------------------------
# Pipeline stages
# ------------------------------------------------------------------
def _plan_response_goal(self, ctx: ExecutionContext) -> None:
prompt = (
"You are a response planner.\n"
"Return ONLY a JSON object with exactly these keys:\n"
"intent (string), tone (string), success (string), constraints (array), "
"needs_vision (boolean), needs_web (boolean), needs_action (boolean).\n"
"Do not add commentary, markdown, or extra keys.\n"
"Always respond in English unless the task is explicitly a translation task.\n"
"TranslationExpert is Chinese→English only; only flag translation-related routing when "
"the source text is actually Chinese.\n\n"
f"User prompt: {ctx.prompt}"
)
prompt = self._inject_think_blocks(
ctx,
stage="head_plan_response_goal",
target_expert="head_expert",
task_prompt=prompt,
output_contract="JSON object with intent/tone/success/constraints/needs_vision/needs_web/needs_action.",
)
raw = self._call_expert("head_expert", prompt, image=ctx.image if ctx.image else None,
**self._stage_kwargs("head_plan_response_goal"))
ctx.response_goal = _parse_json_safe(raw) or {
"intent": ctx.prompt,
"tone": "helpful",
"success": "Answer the user helpfully.",
"constraints": [],
"needs_vision": False,
"needs_web": False,
"needs_action": False,
}
def _consult_commands(self, ctx: ExecutionContext) -> None:
if ctx.fast_think or self._memory_bank is None:
ctx.command_context = {"executed": [], "results": [], "coverage": "none"}
return
command_context = {
"executed": [],
"results": [],
"coverage": "none",
}
gator = getattr(self._memory_bank, "gator", None)
if gator is None:
ctx.command_context = command_context
return
maybe_execute = getattr(gator, "maybe_execute_commands", None)
if callable(maybe_execute):
try:
result = maybe_execute(ctx.prompt, ctx.response_goal)
if isinstance(result, dict):
command_context.update(result)
except Exception as exc:
command_context["results"].append({"error": str(exc)})
else:
try:
if hasattr(gator, "process_actions"):
routed = gator.process_actions(
ctx.prompt,
user_id=self.user_id or "",
bot_id=self.bot_id or "",
history="",
location="",
time_date="",
)
if isinstance(routed, dict):
command_context["executed"] = routed.get("commands", [])
command_context["results"] = routed.get("retrieved_data", {})
if routed.get("commands") or routed.get("retrieved_data"):
command_context["coverage"] = "partial"
except Exception as exc:
command_context["results"].append({"error": str(exc)})
if command_context["coverage"] == "none":
executed = command_context.get("executed") or []
results = command_context.get("results") or []
if executed or results:
command_context["coverage"] = "partial"
ctx.command_context = command_context
def _build_route(self, ctx: ExecutionContext) -> None:
if ctx.fast_think:
ctx.route = [RouteStep(expert="head_expert", sub_prompt=ctx.prompt,
goal="Produce the fastest useful answer possible.")]
if self._bot_profile:
ctx.route.append(RouteStep(expert="role_expert", sub_prompt=ctx.prompt, goal="Apply persona only."))
return
prompt = (
"You are a response router.\n"
"Return ONLY a JSON array of step objects.\n"
"Each step object must include: expert (string), sub_prompt (string), goal (string), "
"and optional kwargs (object).\n"
"Use the fewest steps needed. Prefer web_expert for live information, action_expert for "
"executable tasks, and head_expert for planning/validation.\n"
"Always write sub_prompt and goal in English.\n"
"Do not include duplicate or redundant steps.\n\n"
f"Response goal: {json.dumps(ctx.response_goal, ensure_ascii=False, default=str)}\n"
f"Command context: {json.dumps(ctx.command_context, ensure_ascii=False, default=str)}\n"
f"Original prompt: {ctx.prompt}\n"
f"Available experts: {', '.join(self.REQUIRED_EXPERTS)}"
)
prompt = self._inject_think_blocks(
ctx,
stage="head_build_route",
target_expert="head_expert",
task_prompt=prompt,
output_contract="JSON array of routing steps.",
)
raw = self._call_expert("head_expert", prompt, **self._stage_kwargs("head_build_route"))
steps_raw = _parse_json_safe(raw)
route: List[RouteStep] = []
if isinstance(steps_raw, list) and steps_raw:
for s in steps_raw:
if not isinstance(s, dict):
continue
expert = _normalise_expert_name(s.get("expert", "head_expert"))
if not self._route_allowed(ctx, expert):
continue
route.append(
RouteStep(
expert=expert,
sub_prompt=str(s.get("sub_prompt", ctx.prompt)),
goal=str(s.get("goal", "Complete the sub-task.")),
kwargs=s.get("kwargs", {}) if isinstance(s.get("kwargs", {}), dict) else {},
)
)
if not route:
goal = ctx.response_goal if isinstance(ctx.response_goal, dict) else {}
if goal.get("needs_web") and self._route_allowed(ctx, "web_expert"):
route.append(RouteStep(expert="web_expert", sub_prompt=ctx.prompt,
goal="Gather and synthesize fresh external information."))
if goal.get("needs_action") and self._route_allowed(ctx, "action_expert"):
route.append(RouteStep(expert="action_expert", sub_prompt=ctx.prompt,
goal="Execute the requested action or code workflow."))
if goal.get("needs_vision") and ctx.image and self._route_allowed(ctx, "vision_expert"):
route.append(
RouteStep(expert="vision_expert", sub_prompt=ctx.prompt, goal="Interpret the provided image.",
kwargs={"image": ctx.image}))
route.append(
RouteStep(expert="head_expert", sub_prompt=ctx.prompt, goal="Produce a complete, helpful response."))
# Remove redundant consecutive steps.
compressed: List[RouteStep] = []
for step in route:
if compressed and compressed[-1].expert == step.expert and step.expert != "translation_expert":
continue
compressed.append(step)
ctx.route = compressed
def _execute_route(self, ctx: ExecutionContext) -> None:
i = 0
while i < len(ctx.route):
step = ctx.route[i]
if step.expert == "action_expert":
result = self._run_action_pipeline(ctx, step)
ctx.step_results.append({
"expert": step.expert,
"sub_prompt": step.sub_prompt,
"result": result,
"passed": bool(result.get("passed", False)),
"action": result.get("action", "action_pipeline"),
})
i += 1
continue
if step.expert == "web_expert":
result = self._run_web_pipeline(ctx, step)
ctx.step_results.append({
"expert": step.expert,
"sub_prompt": step.sub_prompt,
"result": result,
"passed": bool(result.get("passed", False)),
"action": result.get("action", "web_pipeline"),
})
i += 1
continue
retries = 0
while True:
result = self._execute_step(step, ctx)
passed = self._evaluate_step(result, step, ctx)
if passed:
ctx.step_results.append({
"expert": step.expert,
"sub_prompt": step.sub_prompt,
"result": result,
"passed": True,
})
break
retries += 1
action = self._retry_or_reroute(step, retries, result, ctx)
if action == "retry" and retries < self.MAX_STEP_RETRIES:
continue
if action == "detour":
new_steps = self._plan_detour(step, result, ctx)
ctx.route = ctx.route[:i + 1] + new_steps + ctx.route[i + 1:]
ctx.step_results.append({
"expert": step.expert,
"sub_prompt": step.sub_prompt,
"result": result,
"passed": False,
"action": "detour",
})
break
ctx.step_results.append({
"expert": step.expert,
"sub_prompt": step.sub_prompt,
"result": result,
"passed": False,
"action": "skip",
})
break
i += 1
def _execute_step(self, step: RouteStep, ctx: ExecutionContext) -> str:
kwargs = dict(step.kwargs)
if step.expert in ("head_expert", "vision_expert") and ctx.image:
kwargs.setdefault("image", ctx.image)
if step.expert == "tool_expert" and ctx.tools:
kwargs.setdefault("tools", ctx.tools)
if step.expert == "translation_expert":
# Translation stays direct; no deep-think wrapper.
kwargs = {}
if not CHINESE_RE.search(step.sub_prompt or "") and not CHINESE_RE.search(ctx.prompt or ""):
return step.sub_prompt
prompt_text = step.sub_prompt
if ctx.deep_think and step.expert != "translation_expert":
prompt_text = self._inject_think_blocks(
ctx,
stage=f"{step.expert}:{step.goal}",
target_expert=step.expert,
task_prompt=step.sub_prompt,
output_contract="Task-specific expert response.",
constraints=[step.goal],
)
return self._call_expert(step.expert, prompt_text, **kwargs)
def _evaluate_step(self, result: str, step: RouteStep, ctx: ExecutionContext) -> bool:
meta = (
"Evaluate whether the following result meets the stated goal. "
"Return ONLY JSON: {\"pass\": true/false, \"reason\": \"...\"}.\n\n"
f"Goal: {step.goal}\n"
f"Result: {result[:500]}"
)
raw = self._call_expert("affect_expert", meta)
parsed = _parse_json_safe(raw)
if isinstance(parsed, dict):
return bool(parsed.get("pass", True))
return True
def _retry_or_reroute(self, step: RouteStep, retry_count: int, result: str, ctx: ExecutionContext) -> str:
options = ["detour", "skip"]
if retry_count < self.MAX_STEP_RETRIES:
options = ["retry"] + options
meta = (
"A pipeline step has failed its quality check. Decide what to do.\n"
f"Failed expert: {step.expert}\n"
f"Sub-prompt: {step.sub_prompt}\n"
f"Step goal: {step.goal}\n"
f"Result so far: {result[:300]}\n"
f"Retry count: {retry_count}\n"
f"Available options: {options}\n"
"Return ONLY JSON: {\"action\": \"<option>\"}"
)
raw = self._call_expert("head_expert", meta)
parsed = _parse_json_safe(raw)
if isinstance(parsed, dict):
action = parsed.get("action", "skip")
if action in options:
return action
return "skip"
def _plan_detour(self, failed_step: RouteStep, result: str, ctx: ExecutionContext) -> List[RouteStep]:
meta = (
"A pipeline step failed and a detour is needed. "
"Return ONLY a JSON array of replacement step objects "
"(each with: expert, sub_prompt, goal, kwargs optional).\n\n"
f"Failed step expert: {failed_step.expert}\n"
f"Failed step goal: {failed_step.goal}\n"
f"Partial result: {result[:300]}\n"
f"Original prompt: {ctx.prompt}\n"
f"Available experts: {', '.join(self.REQUIRED_EXPERTS)}"
)
raw = self._call_expert("head_expert", meta)
steps_raw = _parse_json_safe(raw)
if isinstance(steps_raw, list):
return [
RouteStep(
expert=_normalise_expert_name(s.get("expert", "head_expert")),
sub_prompt=s.get("sub_prompt", ctx.prompt),
goal=s.get("goal", "Complete the sub-task."),
kwargs=s.get("kwargs", {}) if isinstance(s.get("kwargs", {}), dict) else {},
)
for s in steps_raw
if isinstance(s, dict)
]
return [
RouteStep(
expert="head_expert",
sub_prompt=ctx.prompt,
goal="Produce a fallback response.",
)
]
def _synthesize_base(self, ctx: ExecutionContext) -> None:
results_summary = "\n".join(
f"[{r['expert']}]: {r['result']}"
for r in ctx.step_results
if r.get("result") is not None
)
prompt = (
"You are synthesizing the content-only base response.\n"
"Use the expert results and original prompt to produce a complete, concise answer.\n"
"Do NOT apply any persona or character voice.\n"
"Do NOT return JSON.\n"
"Always respond in English unless the task is explicitly a translation task.\n\n"
f"Original prompt: {ctx.prompt}\n\n"
f"Expert results:\n{results_summary}\n\n"
f"Response goal: {json.dumps(ctx.response_goal, ensure_ascii=False, default=str)}"
)
prompt = self._inject_think_blocks(
ctx,
stage="head_synthesize_base",
target_expert="head_expert",
task_prompt=prompt,
output_contract="Plain natural-language answer only.",
)
raw = self._call_expert("head_expert", prompt, **self._stage_kwargs("head_synthesize_base"))
ctx.base_response = raw.strip() if raw.strip() else (results_summary or ctx.prompt)
def _build_affective_state(self, ctx: ExecutionContext) -> None:
if ctx.fast_think or (not self.bot_id and not self.user_id):
ctx.affective_state = {}
return
prompt = (
"You are an affective state module.\n"
"Return ONLY a JSON object with two keys: emotional_state and physical_state.\n"
"emotional_state must include emotion, mood, sentiment, and disposition_toward_user.\n"
"physical_state should reflect the provided hardware metrics.\n\n"
f"Conversation summary: User said: {ctx.prompt}\n"
f"Base response: {ctx.base_response[:500]}\n"
f"User profile: {json.dumps(self._user_profile, ensure_ascii=False, default=str)}\n"
f"Bot profile: {json.dumps(self._bot_profile, ensure_ascii=False, default=str)}\n"
f"Hardware state: {json.dumps(self._hardware_state, ensure_ascii=False, default=str)}"
)
raw = self._call_expert("affect_expert", prompt, **self._stage_kwargs("affect_build_affective_state"))
ctx.affective_state = _parse_json_safe(raw) or {}
def _apply_persona(self, ctx: ExecutionContext) -> None:
if not self._bot_profile:
ctx.final_response = ctx.base_response
return
character_card = self._bot_profile.get("character_card", "")
user_name = self._user_profile.get("name", "the user")
prompt = (
f"You are {character_card}\n\n"
f"You are responding to {user_name}.\n\n"
f"Original prompt: {ctx.prompt}\n\n"
f"Base response to rewrite: {ctx.base_response}\n\n"
f"Your current emotional state: {json.dumps(ctx.affective_state.get('emotional_state', {}), ensure_ascii=False, default=str)}\n"
f"Your current physical state: {json.dumps(ctx.affective_state.get('physical_state', {}), ensure_ascii=False, default=str)}\n\n"
"Rewrite the base response in your own voice, naturally, as yourself.\n"
"Keep all factual content intact.\n"
"IMPORTANT: Respond in English unless the character card explicitly specifies another language."
)
prompt = self._inject_think_blocks(
ctx,
stage="role_apply_persona",
target_expert="role_expert",
task_prompt=prompt,
output_contract="Persona-rewritten natural-language answer.",
)
rewritten = self._call_expert("role_expert", prompt, character_card=character_card,
**self._stage_kwargs("role_apply_persona"))
if self._validate_response(rewritten, ctx):
ctx.final_response = rewritten
else:
ctx.final_response = ctx.base_response
def _review_final_response(self, ctx: ExecutionContext) -> None:
if ctx.fast_think:
ctx.final_review = {"verdict": "accept", "reason": "fast_think path", "revised_response": None}
return
prompt = (
"You are performing the final review.\n"
"Return ONLY a JSON object with keys:\n"
"verdict (accept|revise|reject), reason (string), revised_response (string|null),\n"
"memory_facts (array), user_profile_updates (object), bot_profile_updates (object).\n"
"Do not add extra keys or commentary.\n"
"The revised_response must be in English unless the task explicitly required another language.\n\n"
f"Original prompt: {ctx.prompt}\n"
f"Response goal: {json.dumps(ctx.response_goal, ensure_ascii=False, default=str)}\n"
f"Base response: {ctx.base_response}\n"
f"Persona response: {ctx.final_response}\n"
f"Affective state: {json.dumps(ctx.affective_state, ensure_ascii=False, default=str)}\n"
f"User profile: {json.dumps(self._user_profile, ensure_ascii=False, default=str)}\n"
f"Bot profile: {json.dumps(self._bot_profile, ensure_ascii=False, default=str)}\n"
f"Command context: {json.dumps(ctx.command_context, ensure_ascii=False, default=str)}"
)
prompt = self._inject_think_blocks(
ctx,
stage="head_review_final_response",
target_expert="head_expert",
task_prompt=prompt,
output_contract="JSON review object.",
)
raw = self._call_expert("head_expert", prompt, **self._stage_kwargs("head_review_final_response"))
review = _parse_json_safe(raw) or {}
if not isinstance(review, dict):
review = {}
verdict = str(review.get("verdict", "accept")).lower()
revised = review.get("revised_response")
if verdict == "revise" and isinstance(revised, str) and revised.strip():
ctx.final_response = revised.strip()
elif verdict == "reject":
ctx.final_response = ctx.base_response
ctx.final_review = review
def _finalize(self, ctx: ExecutionContext) -> None:
if ctx.fast_think:
return
self._write_memory(ctx)
self._update_user_profile(ctx)
self._update_bot_profile(ctx)
# ------------------------------------------------------------------
# Action pipeline
# ------------------------------------------------------------------
def _run_action_pipeline(self, ctx: ExecutionContext, step: RouteStep) -> Dict[str, Any]:
codebox = self._get_codebox()
if codebox is None:
return {
"passed": False,
"action": "unavailable",
"output": "CodeBox is unavailable.",
}
action_request = step.sub_prompt or ctx.prompt
imports_map = {
"PIL": "Pillow",
"bs4": "beautifulsoup4",
"cv2": "opencv-python",
"yaml": "PyYAML",
"sklearn": "scikit-learn",
}
last_result: Dict[str, Any] = {"passed": False, "action": "unavailable"}
for attempt in range(1, self.MAX_ACTION_ATTEMPTS + 1):
plan_prompt = self._prompt_action_planning(ctx, action_request, step.goal)
raw_plan = self._call_expert(
"logic_expert",
plan_prompt,
mode="deep_then_answer",
**self._stage_kwargs("logic_action_planning"),
)
plan = _parse_json_safe(raw_plan) or {}
if not isinstance(plan, dict):
plan = {}
architecture = plan.get("architecture", {})
if not isinstance(architecture, dict) or "main" not in architecture:
architecture = {
"main": {
"signature": "def main():",
"docstring": "Entry point.",
"instructions": action_request,
}
}
plan["architecture"] = architecture
imports = plan.get("imports", [])
if not isinstance(imports, list):
imports = []
required_pkgs: List[str] = []
for imp in imports:
if not isinstance(imp, str):
continue
root = imp.split(" as ", 1)[0].strip().split(".", 1)[0]
required_pkgs.append(imports_map.get(root, root))
venv_id = f"action_{abs(hash(action_request)) % 100000}_{attempt}"
try:
if required_pkgs:
codebox.create_venv(venv_id, requirements=required_pkgs)
else:
codebox.create_venv(venv_id)
generation_prompt = self._prompt_code_generation(ctx, action_request, step.goal, plan)
generated_script = self._call_expert(
"code_expert",
generation_prompt,
**self._stage_kwargs("code_action_generation"),
)
if not isinstance(generated_script, str) or not generated_script.strip():
raise RuntimeError("CodeExpert returned an empty script.")
generated_script = _extract_python_code(generated_script)
if not generated_script.strip():
raise RuntimeError("CodeExpert returned a script with no extractable valid Python.")
if "request_report.txt" not in generated_script:
generated_script += (
"\n\nif __name__ == '__main__':\n"
" with open('request_report.txt', 'w', encoding='utf-8') as f:\n"
" f.write('Action completed.')\n"
" print('Action completed.')\n"
)
if hasattr(codebox, "save_script"):
codebox.save_script(f"action_{attempt}", venv_id, generated_script)
run_result = codebox.run_code(
generated_script,
venv_id=venv_id,
requirements=None,
timeout=120,
max_ram_mb=4096,
)
review_prompt = self._prompt_action_review(ctx, action_request, plan, run_result)
review_raw = self._call_expert(
"head_expert",
review_prompt,
**self._stage_kwargs("head_action_review"),
)
review = _parse_json_safe(review_raw) or {}
verdict = str(review.get("verdict", "retry")).lower()
feedback = review.get("feedback", "")
if not (run_result.get("success") and verdict == "success"):
print(f"[PackedLLM] action attempt {attempt} did not pass:")
print(f" run_result.success={run_result.get('success')} verdict={verdict}")
print(f" stdout: {str(run_result.get('stdout', ''))[:600]}")
print(f" stderr: {str(run_result.get('stderr', ''))[:600]}")
print(f" exit_code: {run_result.get('exit_code')}")
print(f" review feedback: {feedback}")
if run_result.get("success") and verdict == "success":
return {
"passed": True,
"action": "success",
"attempt": attempt,
"plan": plan,
"result": run_result,
"review": review,
"output": run_result.get("stdout", "") or "Action completed.",
}
last_result = {
"passed": False,
"action": verdict if verdict in {"retry", "abandon"} else "retry",
"attempt": attempt,
"plan": plan,
"result": run_result,
"review": review,
"feedback": feedback,
"script": generated_script,
}
if verdict == "abandon":
break
except Exception as exc:
last_result = {
"passed": False,
"action": "retry" if attempt < self.MAX_ACTION_ATTEMPTS else "abandon",
"attempt": attempt,
"error": str(exc),
"plan": plan,
}
if attempt < self.MAX_ACTION_ATTEMPTS:
repair_prompt = self._prompt_action_repair(ctx, action_request, plan, last_result)
_ = self._call_expert(
"logic_expert",
repair_prompt,
mode="deep_then_answer",
**self._stage_kwargs("logic_action_repair"),
)
return last_result
# ------------------------------------------------------------------
# Web pipeline
# ------------------------------------------------------------------
def _run_web_pipeline(self, ctx: ExecutionContext, step: RouteStep) -> Dict[str, Any]:
if self._web is None:
self._attach_web()
if self._web is None:
return {
"passed": False,
"action": "unavailable",
"output": "Web module is unavailable.",
}
prompt = step.sub_prompt or ctx.prompt
overall: Dict[str, Any] = {
"passed": False,
"action": "partial",
"rounds": [],
}
queries_prompt = self._prompt_web_queries(ctx, prompt)
raw_queries = self._call_expert(
"head_expert",
queries_prompt,
**self._stage_kwargs("head_web_queries"),
)
subqueries = _parse_json_safe(raw_queries)
if not isinstance(subqueries, list) or not subqueries:
subqueries = [{"query": prompt, "description": "Direct search", "deep_search": False}]
rounds = 0
remaining_subqueries = subqueries
while rounds < self.MAX_WEB_ROUNDS and remaining_subqueries:
round_results = []
for item in remaining_subqueries:
if not isinstance(item, dict):
continue
query = str(item.get("query", prompt))
deep_search = bool(item.get("deep_search", False))
try:
raw = self._web.search(query, deep_search=deep_search)
except TypeError:
raw = self._web.search(query)
except Exception as exc:
raw = f"WEB_ERROR: {exc}"
text = raw if isinstance(raw, str) else _json_dumps(raw)
if len(text) > 8000 and hasattr(self._web, "summarize"):
try:
text = self._web.summarize(text)
except Exception:
pass
answer_prompt = self._prompt_web_answer_subquery(ctx, query, text, prompt)
answer = self._call_expert(
"head_expert",
answer_prompt,
**self._stage_kwargs("head_web_answer_subquery"),
)
round_results.append({
"query": query,
"description": item.get("description", ""),
"evidence": text[:4000],
"answer": answer,
"deep_search": deep_search,
})
overall["rounds"].append(round_results)
review_prompt = self._prompt_web_review(ctx, prompt, round_results)
review_raw = self._call_expert(
"head_expert",
review_prompt,
**self._stage_kwargs("head_web_review"),
)
review = _parse_json_safe(review_raw) or {}
verdict = str(review.get("verdict", "sufficient")).lower()
if verdict == "more_info":
remaining_subqueries = review.get("new_subqueries", [])
if not isinstance(remaining_subqueries, list):
remaining_subqueries = []
rounds += 1
continue
if verdict == "abandon":
break
overall["passed"] = True
overall["action"] = "sufficient"
break
if overall["passed"]:
synthesis_prompt = self._prompt_web_synthesis(ctx, prompt, overall["rounds"])
synthesis = self._call_expert(
"head_expert",
synthesis_prompt,
**self._stage_kwargs("head_web_synthesis"),
)
overall["output"] = synthesis
else:
overall["output"] = "Partial web research completed."
return overall
# ------------------------------------------------------------------
# Validation / finalisation helpers
# ------------------------------------------------------------------
def _validate_response(self, response: str, ctx: ExecutionContext) -> bool:
meta = (
"Validate this response against the criteria below. "
"Return ONLY JSON: {\"valid\": true/false, \"reason\": \"...\"}.\n\n"
f"Success criterion: {ctx.response_goal.get('success', 'Be helpful.')}\n"
f"Character card: {self._bot_profile.get('character_card', 'None')}\n"
f"Response to validate: {response[:600]}"
)
raw = self._call_expert("head_expert", meta)
parsed = _parse_json_safe(raw)
if isinstance(parsed, dict):
return bool(parsed.get("valid", True))
return True
def _extract_memory_facts(self, ctx: ExecutionContext) -> List[str]:
facts: List[str] = []
review_facts = ctx.final_review.get("memory_facts", [])
if isinstance(review_facts, list):
facts.extend([f for f in review_facts if isinstance(f, str) and f.strip()])
for item in ctx.step_results:
if not isinstance(item, dict):
continue
result = item.get("result")
if isinstance(result, dict) and result.get("output"):
facts.append(str(result["output"]))
elif isinstance(result, str) and result.strip():
facts.append(result.strip()[:300])
deduped: List[str] = []
seen = set()
for fact in facts:
key = fact.strip().lower()
if key and key not in seen:
seen.add(key)
deduped.append(fact.strip())
return deduped[:20]
def _update_user_profile(self, ctx: ExecutionContext) -> None:
if self._memory_bank is None or not self.user_id:
return
updates = ctx.final_review.get("user_profile_updates", {})
if not isinstance(updates, dict) or not updates:
return
try:
self._user_profile.update(updates)
if hasattr(self._memory_bank, "set_profile"):
_safe_call(self._memory_bank, "set_profile", "user", self.user_id, self._user_profile)
elif hasattr(self._memory_bank.gator, "set_profile"):
_safe_call(self._memory_bank.gator, "set_profile", "user", self.user_id, self._user_profile)
except Exception as exc:
print(f"[PackedLLM] _update_user_profile failed (non-fatal): {exc}")
def _update_bot_profile(self, ctx: ExecutionContext) -> None:
if self._memory_bank is None or not self.bot_id:
return
updates = ctx.final_review.get("bot_profile_updates", {})
if not isinstance(updates, dict):
updates = {}
emotional = ctx.affective_state.get("emotional_state", {})
physical = ctx.affective_state.get("physical_state", {})
try:
self._bot_profile.update(updates)
if emotional:
self._bot_profile["last_emotional_state"] = emotional
if physical:
self._bot_profile["last_physical_state"] = physical
if hasattr(self._memory_bank, "set_profile"):
_safe_call(self._memory_bank, "set_profile", "bot", self.bot_id, self._bot_profile)
elif hasattr(self._memory_bank.gator, "set_profile"):
_safe_call(self._memory_bank.gator, "set_profile", "bot", self.bot_id, self._bot_profile)
except Exception as exc:
print(f"[PackedLLM] _update_bot_profile failed (non-fatal): {exc}")
def _write_memory(self, ctx: ExecutionContext) -> None:
if self._memory_bank is None:
return
facts = self._extract_memory_facts(ctx)
if not facts:
return
try:
for fact in facts:
if hasattr(self._memory_bank, "store"):
_safe_call(
self._memory_bank,
"store",
text=fact,
metadata={
"bot_id": self.bot_id,
"user_id": self.user_id,
"timestamp": time.time(),
},
)
elif hasattr(self._memory_bank, "store_knowledge"):
_safe_call(
self._memory_bank,
"store_knowledge",
[fact],
tags=["conversation_memory"],
source="PackedLLM",
importance=0.7,
)
except Exception as exc:
print(f"[PackedLLM] _write_memory failed (non-fatal): {exc}")
def _stream_response(self, text: str) -> Generator[str, None, None]:
for token in text.split(" "):
yield token + " "
def _prompt_action_planning(self, ctx: ExecutionContext, action_request: str, step_goal: str) -> str:
prompt = (
"You are LogicExpert planning an executable workflow.\n"
"Return ONLY JSON with keys: thought, architecture, imports, global.\n"
"architecture must include a main entry point.\n"
"Be concise, deterministic, and implementation-oriented.\n\n"
f"Action request: {action_request}\n"
f"Original prompt: {ctx.prompt}\n"
f"Goal: {step_goal}"
)
return self._inject_think_blocks(
ctx,
stage="logic_action_planning",
target_expert="logic_expert",
task_prompt=prompt,
output_contract="JSON object with thought, architecture, imports, global.",
)
def _prompt_code_generation(self, ctx: ExecutionContext, action_request: str, step_goal: str,
plan: Dict[str, Any]) -> str:
prompt = (
"You are CodeExpert writing a complete, runnable Python script.\n"
"Requirements:\n"
"1) Complete the user's request.\n"
"2) Create request_report.txt in the working directory.\n"
"3) Write the final result into that file and print it.\n"
"4) Use only requested imports and stdlib unless absolutely necessary.\n"
"5) The script must be self-contained and executable as-is.\n"
"6) Output ONLY raw Python source code, no fences, no commentary.\n\n"
f"Action request: {action_request}\n"
f"Original prompt: {ctx.prompt}\n"
f"Goal: {step_goal}\n"
f"Plan: {json.dumps(plan, ensure_ascii=False, default=str)}"
)
return self._inject_think_blocks(
ctx,
stage="code_action_generation",
target_expert="code_expert",
task_prompt=prompt,
output_contract="Raw Python source code only.",
)
def _prompt_action_review(self, ctx: ExecutionContext, action_request: str, plan: Dict[str, Any],
run_result: Dict[str, Any]) -> str:
prompt = (
"You are HeadExpert reviewing an ActionExpert run.\n"
"Return ONLY JSON with keys: verdict (success|retry|abandon) and feedback.\n"
"Be strict and prefer retry only for fixable failures.\n\n"
f"Original request: {action_request}\n"
f"Plan: {json.dumps(plan, ensure_ascii=False, default=str)}\n"
f"Stdout: {run_result.get('stdout', '')}\n"
f"Stderr: {run_result.get('stderr', '')}\n"
f"Exit code: {run_result.get('exit_code', '')}\n"
f"Success: {run_result.get('success', False)}"
)
return self._inject_think_blocks(
ctx,
stage="head_action_review",
target_expert="head_expert",
task_prompt=prompt,
output_contract="JSON object with verdict and feedback.",
)
def _prompt_action_repair(self, ctx: ExecutionContext, action_request: str, previous_plan: Dict[str, Any],
failure_details: Dict[str, Any]) -> str:
prompt = (
"You are LogicExpert repairing a failed ActionExpert plan.\n"
"Return ONLY JSON with keys: changed_functions and notes.\n"
"Focus on the smallest targeted fix.\n\n"
f"Original request: {action_request}\n"
f"Previous plan: {json.dumps(previous_plan, ensure_ascii=False, default=str)}\n"
f"Failure details: {json.dumps(failure_details, ensure_ascii=False, default=str)}"
)
return self._inject_think_blocks(
ctx,
stage="logic_action_repair",
target_expert="logic_expert",
task_prompt=prompt,
output_contract="JSON repair object.",
)
def _prompt_web_queries(self, ctx: ExecutionContext, prompt: str) -> str:
prompt_text = (
"Generate a concise list of web search subqueries.\n"
"Return ONLY a JSON array of objects with keys: query, description, deep_search.\n"
"Use the fewest queries needed and keep them high-signal.\n"
"All query and description fields must be in English.\n\n"
f"Prompt: {prompt}\n"
f"Response goal: {json.dumps(ctx.response_goal, ensure_ascii=False, default=str)}"
)
return self._inject_think_blocks(
ctx,
stage="head_web_queries",
target_expert="head_expert",
task_prompt=prompt_text,
output_contract="JSON array of web search queries.",
)
def _prompt_web_answer_subquery(self, ctx: ExecutionContext, query: str, evidence: str, prompt: str) -> str:
prompt_text = (
"Answer the web subquery using only the provided evidence.\n"
"Return a concise plain-text answer grounded in the evidence.\n"
"Do NOT return JSON, code, or emotion metadata.\n\n"
f"Subquery: {query}\n"
f"Evidence: {evidence[:8000]}\n"
f"Prompt: {prompt}"
)
return self._inject_think_blocks(
ctx,
stage="head_web_answer_subquery",
target_expert="head_expert",
task_prompt=prompt_text,
output_contract="Concise evidence-grounded answer.",
)
def _prompt_web_review(self, ctx: ExecutionContext, prompt: str, round_results: List[Dict[str, Any]]) -> str:
prompt_text = (
"Review the web research result and decide whether more information is needed.\n"
"Return ONLY JSON with keys: verdict (sufficient|more_info|abandon) and new_subqueries (array).\n"
"Return an empty array when no additional searches are needed.\n\n"
f"Prompt: {prompt}\n"
f"Round results: {json.dumps(round_results, ensure_ascii=False, default=str)}\n"
f"Response goal: {json.dumps(ctx.response_goal, ensure_ascii=False, default=str)}"
)
return self._inject_think_blocks(
ctx,
stage="head_web_review",
target_expert="head_expert",
task_prompt=prompt_text,
output_contract="JSON review object for web research.",
)
def _prompt_web_synthesis(self, ctx: ExecutionContext, prompt: str,
research_rounds: List[List[Dict[str, Any]]]) -> str:
prompt_text = (
"Synthesize the final web answer from the research results.\n"
"Return a direct, readable answer in English with no extra commentary.\n\n"
f"Prompt: {prompt}\n"
f"Research rounds: {json.dumps(research_rounds, ensure_ascii=False, default=str)}"
)
return self._inject_think_blocks(
ctx,
stage="head_web_synthesis",
target_expert="head_expert",
task_prompt=prompt_text,
output_contract="Final synthesized web answer.",
)
# ------------------------------------------------------------------
# Public expert convenience methods
# ------------------------------------------------------------------
def creative_expert(self, prompt: str, **kwargs: Any) -> str:
return self._call_expert("creative_expert", prompt, **kwargs)
def code_expert(self, prompt: str, **kwargs: Any) -> str:
return self._call_expert("code_expert", prompt, **kwargs)
def logic_expert(self, prompt: str, mode: str = "deep_then_answer", **kwargs: Any) -> str:
return self._call_expert("logic_expert", prompt, mode=mode, **kwargs)
def role_expert(self, prompt: str, **kwargs: Any) -> str:
return self._call_expert("role_expert", prompt, **kwargs)
def affect_expert(self, text: str, **kwargs: Any) -> str:
return self._call_expert("affect_expert", text, **kwargs)
def head_expert(self, prompt: str, image: Optional[str] = None, **kwargs: Any) -> str:
return self._call_expert("head_expert", prompt, image=image, **kwargs)
def vision_expert(self, prompt: str, image: Optional[str] = None, **kwargs: Any) -> str:
return self._call_expert("vision_expert", prompt, image=image, **kwargs)
def math_expert(self, prompt: str, **kwargs: Any) -> str:
return self._call_expert("math_expert", prompt, **kwargs)
def tool_expert(self, query: str, tools: Optional[List[Dict[str, Any]]] = None, **kwargs: Any) -> str:
return self._call_expert("tool_expert", query, tools=tools, **kwargs)
def translation_expert(self, text: str, **kwargs: Any) -> str:
_ = kwargs
if not CHINESE_RE.search(text or ""):
return text
return self._call_expert("translation_expert", text)
def action_expert(self, prompt: str, **kwargs: Any) -> str:
ctx = ExecutionContext(prompt=prompt, tools=kwargs.get("tools"))
step = RouteStep(
expert="action_expert",
sub_prompt=prompt,
goal=kwargs.get("goal", prompt),
kwargs={k: v for k, v in kwargs.items() if k not in {"tools", "goal"}},
)
result = self._run_action_pipeline(ctx, step)
return str(result.get("output", ""))
def web_expert(self, prompt: str, **kwargs: Any) -> str:
ctx = ExecutionContext(prompt=prompt, tools=kwargs.get("tools"))
step = RouteStep(
expert="web_expert",
sub_prompt=prompt,
goal=kwargs.get("goal", prompt),
kwargs={k: v for k, v in kwargs.items() if k not in {"tools", "goal"}},
)
result = self._run_web_pipeline(ctx, step)
return str(result.get("output", ""))
def _get_expert(self, key: str) -> Optional[nn.Module]:
# Try exact key first, then normalised variant.
try:
return self[key]
except Exception:
pass
norm = _normalise_expert_name(key)
if norm != key:
try:
return self[norm]
except Exception:
pass
return None
def _review_and_finalize(self, ctx: ExecutionContext) -> None:
self._review_final_response(ctx)
self._write_memory(ctx)
self._update_user_profile(ctx)
self._update_bot_profile(ctx)
# ------------------------------------------------------------------
# Summary / expert management
# ------------------------------------------------------------------
def summary(self) -> Dict[str, Any]:
if self._packedlm_runtime is not None:
runtime_summary = _safe_call(self._packedlm_runtime, "summary", default={}) or {}
raw_loaded = runtime_summary.get("experts") or self._runtime_expert_names or list(self.keys())
else:
raw_loaded = list(self.keys()) or self._runtime_expert_names
loaded = _expert_names_canonical(raw_loaded)
missing_model = [k for k in self.MODEL_EXPERTS if k not in loaded]
pipeline_status = {
"action_expert": self._get_codebox() is not None,
"web_expert": self._web is not None,
}
missing_pipeline = [k for k, ok in pipeline_status.items() if not ok]
return {
"model_class": "PackedLLM",
"bot_id": self.bot_id,
"user_id": self.user_id,
"experts": loaded + [k for k, ok in pipeline_status.items() if ok],
"missing_experts": missing_model + missing_pipeline,
"pipeline_status": pipeline_status,
"memory_mounted": self._memory_bank is not None,
"web_mounted": self._web is not None,
"hardware": self._hardware_state,
"packedlm_runtime_loaded": self._packedlm_runtime is not None,
"packedlm_checkpoint": self.packedlm_checkpoint,
}
def reload_expert(self, expert_name: str) -> Any:
if self._packedlm_runtime is not None:
return self._packedlm_runtime.reload_expert(expert_name)
expert = self._get_expert(expert_name)
if expert is not None and hasattr(expert, "reload"):
expert.reload()
return expert
raise KeyError(f"Expert '{expert_name}' not loaded or has no reload() method.")
def unload_expert(self, expert_name: str) -> None:
if self._packedlm_runtime is not None:
self._packedlm_runtime.unload_expert(expert_name)
return
expert = self._get_expert(expert_name)
if expert is not None:
if hasattr(expert, "unload"):
try:
expert.unload()
except Exception:
pass
key = _normalise_expert_name(expert_name)
try:
del self[key]
except Exception:
try:
del self[expert_name]
except Exception:
pass
def unload_all(self) -> None:
if self._packedlm_runtime is not None:
self._packedlm_runtime.unload_all()
return
for key in list(self.keys()):
self.unload_expert(key)
# ------------------------------------------------------------------
# Source collection helpers (used by save_checkpoint)
# ------------------------------------------------------------------
def _collect_project_sources(self) -> Dict[str, str]:
sources: Dict[str, str] = {}
module_names = ["PackedLLM", "GATOR", "CompileWeb", "CodeBox", "PackedLM"]
for name in module_names:
mod = sys.modules.get(name)
if mod is None:
continue
try:
src = inspect.getsource(mod)
except Exception:
src = None
if not src:
file_path = getattr(mod, "__file__", None)
if file_path and os.path.exists(file_path):
try:
with open(file_path, "r", encoding="utf-8") as f:
src = f.read()
except Exception:
src = None
if src:
sources[name] = src
return sources
def _collect_vendor_sources(self) -> Dict[str, Dict[str, str]]:
"""
Collect pure-Python third-party modules that are already loaded.
Conservative: only source-backed modules outside the project and stdlib.
"""
sources: Dict[str, Dict[str, str]] = {}
project_root = Path(__file__).resolve().parent
stdlib_root = Path(sys.base_prefix).resolve()
for name, mod in list(sys.modules.items()):
if mod is None:
continue
if name in {"PackedLLM", "GATOR", "CompileWeb", "CodeBox", "PackedLM"}:
continue
file_path = getattr(mod, "__file__", None)
if not file_path:
continue
try:
p = Path(file_path).resolve()
except Exception:
continue
if not p.exists() or p.suffix.lower() != ".py":
continue
try:
src = inspect.getsource(mod)
except Exception:
try:
src = p.read_text(encoding="utf-8")
except Exception:
continue
try:
if str(p).startswith(str(project_root)):
continue
if str(p).startswith(str(stdlib_root)) and "site-packages" not in str(p):
continue
except Exception:
pass
sources[name] = {"file": str(p), "source": src}
return sources
# ------------------------------------------------------------------
# Checkpoint: save
# ------------------------------------------------------------------
def save_checkpoint(self, path: Union[str, Path] = "LM.pt") -> None:
"""
Save a fully self-contained checkpoint.
Large binary blobs (PackedLM weights, memory bank, web index, CodeBox)
are split into fixed-size chunks before pickling so that no single
allocation exceeds _CHUNK_BYTES. All source code required to reconstruct
the model is embedded as plain text. After loading, the only external
dependency is PyTorch.
Layout (zip container written atomically):
manifest.pt – metadata, profiles, expert names, source maps
lm_chunk_N.bin – PackedLM weight file slices (N = 0, 1, …)
mem_chunk_N.bin – MemoryBank checkpoint slices
web_chunk_N.bin – Web index slices
box_chunk_N.bin – CodeBox .pt slices
"""
path = Path(path)
tmp_path = path.with_suffix(".tmp_save")
memory_checkpoint_path: Optional[str] = None
if self._memory_bank is not None:
cp = getattr(self._memory_bank, "checkpoint_path", None)
if cp is not None:
memory_checkpoint_path = str(cp)
codebox_checkpoint_path: Optional[str] = None
if self._codebox is not None:
cp = getattr(self._codebox, "_model_path", None)
if cp is not None and os.path.exists(str(cp)):
codebox_checkpoint_path = str(cp)
else:
bd = getattr(self._codebox, "base_dir", None)
if bd and os.path.isdir(str(bd)):
_tmp_box = str(tmp_path) + ".codebox.pt"
try:
torch.save(self._codebox, _tmp_box, pickle_protocol=5)
codebox_checkpoint_path = _tmp_box
except Exception:
codebox_checkpoint_path = None
# Web checkpoint
web_checkpoint_path: Optional[str] = None
if self._web is not None:
wp = getattr(self._web, "web_path", None)
if wp and os.path.exists(str(wp)):
web_checkpoint_path = str(wp)
manifest = {
"format_version": self._CHECKPOINT_FORMAT_VERSION,
"class_name": self.__class__.__name__,
"init_kwargs": {
"bot_id": self.bot_id,
"user_id": self.user_id,
"model_dir": self.model_dir,
"memory_dir": self.memory_dir,
"web": self._web is not None,
"hardware_probe": self._hardware_probe_enabled,
"packedlm_module": self.packedlm_module,
},
"state_dict": self.state_dict(),
"bot_profile": self._bot_profile,
"user_profile": self._user_profile,
"hardware_state": self._hardware_state,
"runtime_expert_names": self._runtime_expert_names,
"project_sources": self._collect_project_sources(),
"vendor_sources": self._collect_vendor_sources(),
"lm_chunk_count": 0,
"mem_chunk_count": 0,
"web_chunk_count": 0,
"box_chunk_count": 0,
}
def _write_chunks_to_zip(
zf: zipfile.ZipFile,
file_path: Optional[str],
prefix: str,
manifest_key: str,
) -> None:
"""Stream a file into the zip as fixed-size chunks, updating manifest."""
if not file_path or not os.path.exists(file_path):
return
count = 0
with open(file_path, "rb") as fh:
while True:
chunk = fh.read(_CHUNK_BYTES)
if not chunk:
break
zf.writestr(f"{prefix}{count}.bin", chunk)
count += 1
manifest[manifest_key] = count
try:
with zipfile.ZipFile(str(tmp_path), "w", compression=zipfile.ZIP_STORED, allowZip64=True) as zf:
# Write large binary blobs first (streaming, low peak RAM)
_write_chunks_to_zip(zf, self.packedlm_checkpoint, "lm_chunk_", "lm_chunk_count")
_write_chunks_to_zip(zf, memory_checkpoint_path, "mem_chunk_", "mem_chunk_count")
_write_chunks_to_zip(zf, web_checkpoint_path, "web_chunk_", "web_chunk_count")
_write_chunks_to_zip(zf, codebox_checkpoint_path, "box_chunk_", "box_chunk_count")
# Write manifest last (small; contains all metadata + source code)
buf = io.BytesIO()
torch.save(manifest, buf, pickle_protocol=5)
zf.writestr("manifest.pt", buf.getvalue())
# Atomic replace
if path.exists():
path.unlink()
tmp_path.rename(path)
finally:
# Clean up any temp files we created
for _tmp in [str(tmp_path), str(tmp_path) + ".codebox.pt"]:
if os.path.exists(_tmp):
try:
os.unlink(_tmp)
except Exception:
pass
# ------------------------------------------------------------------
# Checkpoint: load
# ------------------------------------------------------------------
@classmethod
def load_checkpoint(cls, path: Union[str, Path] = "LM.pt", map_location: str = "cpu") -> "PackedLLM":
"""
Load a checkpoint produced by save_checkpoint.
Handles both the new zip-container format (v3) and the legacy flat
torch.save format (v1/v2) for backwards compatibility.
"""
path = Path(path)
# ------ Detect format ------
if zipfile.is_zipfile(str(path)):
return cls._load_checkpoint_v3(path, map_location)
# Legacy flat format
payload = torch.load(str(path), map_location=map_location, weights_only=False)
if isinstance(payload, cls):
return payload
if not isinstance(payload, dict):
raise TypeError(f"Unsupported checkpoint payload: {type(payload).__name__}")
return cls._load_from_flat_payload(payload, map_location)
@classmethod
def _load_checkpoint_v3(cls, path: Path, map_location: str) -> "PackedLLM":
"""Load a v3 zip-container checkpoint, streaming binary blobs straight to disk."""
with zipfile.ZipFile(str(path), "r") as zf:
manifest_bytes = zf.read("manifest.pt")
buf = io.BytesIO(manifest_bytes)
manifest = torch.load(buf, map_location=map_location, weights_only=False)
def _stream_to_temp(prefix: str, count: int, suffix: str, file_prefix: str) -> Optional[str]:
if count == 0:
return None
fd, out_path = tempfile.mkstemp(prefix=file_prefix, suffix=suffix)
os.close(fd)
try:
with open(out_path, "wb") as out_fh:
for i in range(count):
with zf.open(f"{prefix}{i}.bin") as entry:
while True:
chunk = entry.read(_CHUNK_BYTES)
if not chunk:
break
out_fh.write(chunk)
except Exception:
try:
os.unlink(out_path)
except Exception:
pass
return None
return out_path
temp_lm_path = _stream_to_temp("lm_chunk_", manifest.get("lm_chunk_count", 0), ".pt", "packedlm_")
temp_web_path = _stream_to_temp("web_chunk_", manifest.get("web_chunk_count", 0), ".pt", "websearch_")
temp_box_path = _stream_to_temp("box_chunk_", manifest.get("box_chunk_count", 0), ".pt", "codebox_")
temp_mem_root: Optional[str] = None
mem_count = manifest.get("mem_chunk_count", 0)
if mem_count:
temp_mem_root = tempfile.mkdtemp(prefix="packedllm_memory_")
try:
with open(os.path.join(temp_mem_root, "GATOR.pt"), "wb") as out_fh:
for i in range(mem_count):
with zf.open(f"mem_chunk_{i}.bin") as entry:
while True:
chunk = entry.read(_CHUNK_BYTES)
if not chunk:
break
out_fh.write(chunk)
except Exception:
temp_mem_root = None
return cls._reconstruct_from_manifest(
manifest, map_location,
lm_path=temp_lm_path,
mem_root=temp_mem_root,
web_path=temp_web_path,
box_path=temp_box_path,
)
@classmethod
def _load_from_flat_payload(cls, payload: Dict[str, Any], map_location: str) -> "PackedLLM":
"""Load a legacy v1/v2 flat torch.save payload."""
def _get_bytes(key: str) -> Optional[bytes]:
val = payload.get(key)
if val is None:
return None
if isinstance(val, (bytes, bytearray)):
return bytes(val)
if isinstance(val, list):
return _chunks_to_bytes(val)
return None
lm_path = _write_chunks_to_temp(
[b] if (b := _get_bytes("packedlm_checkpoint_bytes")) else None, suffix=".pt", prefix="packedlm_"
)
web_path = _write_chunks_to_temp(
[b] if (b := _get_bytes("web_checkpoint_bytes")) else None, suffix=".pt", prefix="websearch_"
)
mem_root: Optional[str] = None
mem_bytes = _get_bytes("memory_checkpoint_bytes")
if mem_bytes is not None:
mem_root = tempfile.mkdtemp(prefix="packedllm_memory_")
try:
with open(os.path.join(mem_root, "GATOR.pt"), "wb") as f:
f.write(mem_bytes)
except Exception:
mem_root = None
return cls._reconstruct_from_manifest(
payload, map_location,
lm_path=lm_path, mem_root=mem_root, web_path=web_path, box_path=None,
)
@classmethod
def _reconstruct_from_manifest(
cls,
manifest: Dict[str, Any],
map_location: str,
lm_path: Optional[str] = None,
mem_root: Optional[str] = None,
web_path: Optional[str] = None,
box_path: Optional[str] = None,
) -> "PackedLLM":
"""
Shared reconstruction logic. Takes paths to already-materialized blobs
(written by the caller) rather than in-memory bytes, so loading never
needs to hold a full multi-GB checkpoint in RAM at once.
"""
init_kwargs = dict(manifest.get("init_kwargs", {}))
temp_vendor_dir = tempfile.mkdtemp(prefix="packedllm_vendor_")
def _write_module_source(module_name: str, src: str, file_hint: Optional[str] = None) -> None:
module_rel = module_name.replace(".", os.sep)
if file_hint and os.path.basename(file_hint) == "__init__.py":
target = os.path.join(temp_vendor_dir, module_rel, "__init__.py")
else:
target = os.path.join(temp_vendor_dir, f"{module_rel}.py")
os.makedirs(os.path.dirname(target), exist_ok=True)
with open(target, "w", encoding="utf-8") as f:
f.write(src)
for name, src in (manifest.get("project_sources") or {}).items():
try:
_write_module_source(name, src)
except Exception:
pass
for name, meta in (manifest.get("vendor_sources") or {}).items():
try:
if isinstance(meta, dict):
_write_module_source(name, meta.get("source", ""), meta.get("file"))
elif isinstance(meta, str):
_write_module_source(name, meta)
except Exception:
pass
if temp_vendor_dir not in sys.path:
sys.path.insert(0, temp_vendor_dir)
if lm_path:
init_kwargs["packedlm_checkpoint"] = lm_path
if mem_root:
init_kwargs["memory_dir"] = mem_root
init_kwargs["web"] = False
obj = cls(**init_kwargs)
try:
sd = manifest.get("state_dict")
if sd:
obj.load_state_dict(sd, strict=False)
except Exception:
pass
obj._bot_profile = manifest.get("bot_profile", {}) or {}
obj._user_profile = manifest.get("user_profile", {}) or {}
obj._hardware_state = manifest.get("hardware_state", {}) or {}
obj._runtime_expert_names = _expert_names_canonical(
manifest.get("runtime_expert_names", []) or []
)
if web_path and Web is not None:
try:
obj._web = Web(web_location=web_path, auto_create=False)
except Exception:
obj._web = None
if box_path and os.path.exists(box_path):
try:
codebox_obj = torch.load(box_path, map_location=map_location, weights_only=False)
obj._codebox = codebox_obj
except Exception:
obj._codebox = None
return obj
# ------------------------------------------------------------------
# Convenience aliases
# ------------------------------------------------------------------
def save(self, path: Union[str, Path] = "LM.pt") -> None:
self.save_checkpoint(path)
# ------------------------------------------------------------------
# Representation / containment
# ------------------------------------------------------------------
def __repr__(self) -> str:
loaded = _expert_names_canonical(
list(self.keys()) if len(self.keys()) else self._runtime_expert_names
)
missing = [k for k in self.REQUIRED_EXPERTS if k not in loaded]
parts = [f"bot_id={self.bot_id!r}", f"user_id={self.user_id!r}"]
parts.append(f"experts=[{', '.join(loaded)}]")
if missing:
parts.append(f"missing=[{', '.join(missing)}]")
if self._packedlm_runtime is not None:
parts.append("packedlm_runtime=True")
return f"PackedLLM({', '.join(parts)})"
def __contains__(self, key: object) -> bool: # type: ignore[override]
try:
norm = _normalise_expert_name(str(key))
return (
key in self._modules
or norm in self._modules
or key in self._runtime_expert_names
or norm in self._runtime_expert_names
)
except Exception:
return False
# ---------------------------------------------------------------------------
# Factory
# ---------------------------------------------------------------------------
def build_packedlm(
bot_id: Optional[str] = None,
user_id: Optional[str] = None,
model_dir: str = "models",
memory_dir: Optional[str] = None,
web: bool = False,
hardware_probe: bool = True,
expert_modules: Optional[Dict[str, nn.Module]] = None,
packedlm_checkpoint: Optional[str] = "LM.pt",
packedlm_module: Optional[str] = "PackedLM",
) -> PackedLLM:
return PackedLLM(
bot_id=bot_id,
user_id=user_id,
model_dir=model_dir,
memory_dir=memory_dir,
web=web,
hardware_probe=hardware_probe,
expert_modules=expert_modules,
packedlm_checkpoint=packedlm_checkpoint,
packedlm_module=packedlm_module,
)
'''
# ---------------------------------------------------------------------------
# __main__ integration test
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import traceback
SEP = "=" * 64
parser = argparse.ArgumentParser(description="PackedLLM integration test")
parser.add_argument("--checkpoint", type=str, default="LM.pt")
parser.add_argument("--image", type=str, default=None)
parser.add_argument("--bot-id", type=str, default="pip")
parser.add_argument("--user-id", type=str, default="test_user")
parser.add_argument("--compile-only", action="store_true")
parser.add_argument("--packedlm-module", type=str, default="PackedLM")
parser.add_argument("--save-path", type=str, default="PackedLLM.pt")
args = parser.parse_args()
print(SEP)
print(" PackedLLM — Expert integration test")
print(SEP)
def fail(msg: str, exc: Optional[BaseException] = None):
print(f"FAILED: {msg}")
if exc is not None:
traceback.print_exc()
raise SystemExit(1)
try:
lm = PackedLLM(
bot_id=args.bot_id,
user_id=args.user_id,
model_dir="models",
memory_dir=None,
web=True,
hardware_probe=True,
expert_modules=None,
packedlm_checkpoint=args.checkpoint,
packedlm_module=args.packedlm_module,
)
print(f"Loaded: {lm}")
except Exception as exc:
fail("could not initialize PackedLLM", exc)
print("\n[1] summary()")
try:
s = lm.summary()
print(f"experts loaded : {s['experts']}")
print(f"missing experts : {s['missing_experts']}")
print(f"memory mounted : {s['memory_mounted']}")
print(f"web mounted : {s['web_mounted']}")
print(f"runtime loaded : {s['packedlm_runtime_loaded']}")
if not s["memory_mounted"]:
fail("Memory bank did not mount")
if not s["web_mounted"]:
fail("Web module did not mount")
if not s["packedlm_runtime_loaded"]:
fail(f"PackedLM runtime was not loaded from {args.checkpoint}")
if s["missing_experts"]:
fail(f"Missing experts after load: {s['missing_experts']}")
print("OK")
except SystemExit:
raise
except Exception as exc:
fail("summary()", exc)
if not args.compile_only:
print("\n[2] direct expert smoke tests")
tests = [
("creative_expert", lambda: lm.creative_expert("Write a one-sentence haiku about rivers.")),
("code_expert", lambda: lm.code_expert("Write a Python one-liner to reverse a string.")),
("logic_expert", lambda: lm.logic_expert("Is 97 a prime number? Explain briefly.", mode="deep_then_answer")),
("math_expert", lambda: lm.math_expert("What is 17 * 23?")),
("translation_expert", lambda: lm.translation_expert("把这句中文翻译成英文:今天天气很好。")),
("affect_expert", lambda: lm.affect_expert("I'm extremely frustrated and need a calm response.")),
("role_expert", lambda: lm.role_expert("Greet the user warmly.", character_card="You are a cheerful assistant named Pip.")),
("head_expert", lambda: lm.head_expert("Plan a two-step response to: What is the capital of France?")),
("tool_expert", lambda: lm.tool_expert(
"Get the weather for ZIP 90210.",
tools=[{
"name": "get_weather",
"description": "Fetch weather by ZIP.",
"parameters": {
"type": "object",
"properties": {"zip": {"type": "string"}},
"required": ["zip"],
},
}],
)),
("web_expert", lambda: lm.web_expert("What is the current prime minister of the UK?")),
("action_expert", lambda: lm.action_expert(
"Create request_report.txt containing exactly: Action pipeline OK"
)),
]
if args.image and os.path.exists(args.image):
tests.append(("vision_expert", lambda: lm.vision_expert("Describe what you see.", image=args.image)))
else:
print("Vision smoke test skipped (pass --image /path/to/image).")
for name, fn in tests:
try:
t0 = time.perf_counter()
result = fn()
elapsed = (time.perf_counter() - t0) * 1000
preview = str(result)[:180].replace("\n", " ")
print(f" ✓ {name:<22} [{elapsed:>7.1f}ms] {preview}")
if not result or not str(result).strip():
fail(f"{name} returned empty output")
except SystemExit:
raise
except Exception as exc:
fail(name, exc)
print("\n[3] pipeline forward() smoke tests")
forward_prompts = [
"Write a Python function to calculate compound interest.",
"把这句中文翻译成英文:今天的天气很好。",
"Explain the main idea of Newton's second law in one paragraph.",
"Search the web and summarize the latest open source AI model releases.",
"Create a small action report file saying the pipeline is working.",
]
for prompt in forward_prompts:
try:
t0 = time.perf_counter()
response = lm.forward(prompt, image=args.image if args.image and os.path.exists(args.image) else None)
elapsed = (time.perf_counter() - t0) * 1000
preview = response[:200].replace("\n", " ")
print(f" ✓ [{elapsed:>6.0f}ms] {prompt[:55]}...")
print(f" → {preview}")
if not response or not str(response).strip():
fail(f"forward() returned empty response for: {prompt}")
except SystemExit:
raise
except Exception as exc:
fail(f"forward() for prompt: {prompt}", exc)
print("\n[4] checkpoint round-trip")
temp_ckpt = os.path.join(tempfile.gettempdir(), f"PackedLLM_roundtrip_{int(time.time())}.pt")
try:
print(f" Saving to {temp_ckpt} ...")
lm.save_checkpoint(temp_ckpt)
print(" Unloading original model before reload (avoids two full models resident at once)...")
try:
lm.unload_all()
except Exception:
pass
lm._packedlm_runtime = None
import gc
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
time.sleep(1) # give the OS/driver a moment to actually reclaim memory
print(f" Loading back ...")
lm2 = PackedLLM.load_checkpoint(temp_ckpt)
if not isinstance(lm2, PackedLLM):
fail("Loaded object is not PackedLLM")
s2 = lm2.summary()
if s2["missing_experts"]:
fail(f"Reloaded model missing experts: {s2['missing_experts']}")
print(f" ✓ saved to {temp_ckpt} and reloaded successfully")
print(f" ✓ reloaded repr: {lm2}")
# Quick sanity – run one forward on the reloaded model
r = lm2.head_expert("Say hello.")
if not r or not r.strip():
fail("Reloaded model returned empty head_expert output")
print(f" ✓ reloaded head_expert sanity: {r[:80]}")
except SystemExit:
raise
except Exception as exc:
fail("checkpoint round-trip", exc)
finally:
if os.path.exists(temp_ckpt):
try:
os.unlink(temp_ckpt)
except Exception:
pass
print("\n[5] final save")
try:
lm.save_checkpoint(args.save_path)
sz_mb = os.path.getsize(args.save_path) / 1e6
print(f" ✓ final checkpoint saved to {args.save_path} ({sz_mb:.1f} MB)")
except SystemExit:
raise
except Exception as exc:
fail("final save", exc)
print(f"\n{SEP}")
print(" Test complete.")
print(f"{SEP}\n")
'''
"""
PackedLLMRunner.py
A thin, ergonomic wrapper around a saved PackedLLM checkpoint (e.g. PackedLLM.pt).
Loading a PackedLLM checkpoint directly with PackedLLM.load_checkpoint() gets you
the model, but every expert is lazily warmed on first real use, and the only way
to use the embedded modules (MemoryBank/GATOR, Web, CodeBox) is to route a prompt
through the full plan -> route -> execute -> persona -> review pipeline. This class
sits on top of that and gives you:
- one-call loading + optional warmup (so first-request latency is paid at
startup instead of in front of a real user)
- the full orchestrated pipeline via .chat(...)
- every one of the 12 experts directly, bypassing the planner/router entirely
- direct access to MemoryBank (store/recall facts, read/write profiles),
Web (raw search, no multi-round research pipeline), and CodeBox (run
arbitrary code, no plan/generate/review wrapped around it)
- status reporting, expert reload/unload, and re-saving
Usage:
from PackedLLMRunner import PackedLLMRunner
bot = PackedLLMRunner("PackedLLM.pt", bot_id="pip", user_id="alice")
print(bot.chat("What's a clever way to sort a list in Python?")) # full pipeline
print(bot.code("Write a function that reverses a string.")) # expert directly
print(bot.memory_recall("alice's favorite color")) # embedded module directly
print(bot.run_code("print(2 + 2)")) # CodeBox directly
bot.unload_all()
"""
class PackedLLMRunner:
"""
Loads a saved PackedLLM checkpoint and exposes everything it can do —
the orchestrated pipeline, each individual expert, and the embedded
modules (memory, web, code execution) — through one friendly interface.
"""
def __init__(
self,
checkpoint_path: Union[str, Path] = "PackedLLM.pt",
map_location: str = "cpu",
bot_id: Optional[str] = None,
user_id: Optional[str] = None,
warmup: bool = False,
warmup_web: bool = False,
warmup_vision: bool = False,
warmup_action: bool = False,
verbose: bool = True,
):
self.checkpoint_path = str(checkpoint_path)
self.verbose = verbose
removed = self._cleanup_temp_files()
self._log(f"Loading checkpoint from {self.checkpoint_path} ...")
t0 = time.perf_counter()
self.model: PackedLLM = PackedLLM.load_checkpoint(
self.checkpoint_path, map_location=map_location
)
self._load_seconds = time.perf_counter() - t0
self._log(f"Loaded in {self._load_seconds:.1f}s")
if bot_id:
self.model.bot_id = bot_id
if user_id:
self.model.user_id = user_id
self.warmup_report: Dict[str, Any] = {}
if warmup:
self.warmup_report = self.warmup(
include_web=warmup_web,
include_vision=warmup_vision,
include_action=warmup_action,
)
@staticmethod
def _cleanup_temp_files() -> dict:
import shutil
import tempfile
from pathlib import Path
temp_root = Path(tempfile.gettempdir())
file_patterns = [
"*.gguf",
"*.pt",
]
dir_patterns = [
"packedlm_*",
"packedllm_*",
"codebox_*",
"websearch_*",
]
removed_files = 0
removed_dirs = 0
freed_bytes = 0
errors = []
for pattern in file_patterns:
for path in temp_root.rglob(pattern):
try:
if not path.is_file():
continue
size = path.stat().st_size
path.unlink(missing_ok=True)
removed_files += 1
freed_bytes += size
except Exception as e:
errors.append(f"{path}: {e}")
for pattern in dir_patterns:
for path in temp_root.rglob(pattern):
try:
if not path.is_dir():
continue
dir_size = 0
for f in path.rglob("*"):
try:
if f.is_file():
dir_size += f.stat().st_size
except Exception:
pass
shutil.rmtree(path, ignore_errors=False)
removed_dirs += 1
freed_bytes += dir_size
except Exception as e:
errors.append(f"{path}: {e}")
return {
"removed_files": removed_files,
"removed_dirs": removed_dirs,
"freed_gb": round(freed_bytes / (1024 ** 3), 3),
"errors": len(errors),
"error_details": errors[:20], # keep return size reasonable
}
def __enter__(self) -> "PackedLLMRunner":
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.unload_all()
def _log(self, msg: str) -> None:
if self.verbose:
print(f"[PackedLLMRunner] {msg}")
def warmup(
self,
include_web: bool = False,
include_vision: bool = False,
include_action: bool = False,
) -> Dict[str, Any]:
"""
Touch each expert once so its weights/context are paged in and any
lazy hydration (e.g. TranslationExpert's embedded bundle) happens
now instead of on the user's first real request.
web_expert, vision_expert, and action_expert are skipped by default
since they involve network calls, require a real image, or have
actual side effects (running code, writing files). Pass the
matching include_* flag to warm those up too.
"""
report: Dict[str, Any] = {}
probes = {
"head_expert": lambda: self.model.head_expert("Say OK."),
"creative_expert": lambda: self.model.creative_expert("Say OK in one short sentence."),
"code_expert": lambda: self.model.code_expert("Write a one-line Python comment."),
"logic_expert": lambda: self.model.logic_expert("Is 2 a prime number?", mode="deep_then_answer"),
"math_expert": lambda: self.model.math_expert("What is 1 + 1?"),
"affect_expert": lambda: self.model.affect_expert("I feel fine."),
"role_expert": lambda: self.model.role_expert(
"Say hi.", character_card="You are a helpful assistant."
),
"translation_expert": lambda: self.model.translation_expert("你好"),
"tool_expert": lambda: self.model.tool_expert(
"Say hi.",
tools=[{
"name": "noop",
"description": "No-op warmup tool.",
"parameters": {"type": "object", "properties": {}},
}],
),
}
if include_vision:
probes["vision_expert"] = lambda: self.model.vision_expert(
"Warmup probe; no real image provided.", image=None
)
if include_web:
probes["web_expert"] = lambda: self.model.web_expert("What is today's date?")
if include_action:
probes["action_expert"] = lambda: self.model.action_expert(
"Create request_report.txt containing exactly: warmup ok"
)
for name, probe in probes.items():
t0 = time.perf_counter()
try:
probe()
report[name] = round(time.perf_counter() - t0, 2)
self._log(f"warmed {name} ({report[name]}s)")
except Exception as exc:
report[name] = f"error: {exc}"
self._log(f"warmup failed for {name}: {exc}")
return report
def chat(
self,
prompt: str,
image: Optional[str] = None,
tools: Optional[List[Dict[str, Any]]] = None,
stream: bool = False,
deep_think: Optional[bool] = False,
fast_think: Optional[bool] = False,
):
return self.model.forward(prompt, image=image, tools=tools, stream=stream, deep_think=deep_think, fast_think=fast_think)
def creative(self, prompt: str, **kwargs: Any) -> str:
return self.model.creative_expert(prompt, **kwargs)
def code(self, prompt: str, **kwargs: Any) -> str:
return self.model.code_expert(prompt, **kwargs)
def logic(self, prompt: str, mode: str = "deep_then_answer", **kwargs: Any) -> str:
return self.model.logic_expert(prompt, mode=mode, **kwargs)
def math(self, prompt: str, **kwargs: Any) -> str:
return self.model.math_expert(prompt, **kwargs)
def translate(self, text: str, **kwargs: Any) -> str:
return self.model.translation_expert(text, **kwargs)
def affect(self, text: str, **kwargs: Any) -> str:
return self.model.affect_expert(text, **kwargs)
def role(self, prompt: str, character_card: Optional[str] = None, **kwargs: Any) -> str:
if character_card is not None:
kwargs.setdefault("character_card", character_card)
return self.model.role_expert(prompt, **kwargs)
def head(self, prompt: str, image: Optional[str] = None, **kwargs: Any) -> str:
return self.model.head_expert(prompt, image=image, **kwargs)
def vision(self, prompt: str, image: str, **kwargs: Any) -> str:
return self.model.vision_expert(prompt, image=image, **kwargs)
def tool(self, query: str, tools: List[Dict[str, Any]], **kwargs: Any) -> str:
return self.model.tool_expert(query, tools=tools, **kwargs)
def web(self, prompt: str, **kwargs: Any) -> str:
return self.model.web_expert(prompt, **kwargs)
def action(self, prompt: str, **kwargs: Any) -> str:
return self.model.action_expert(prompt, **kwargs)
@property
def memory(self) -> Any:
return self.model._memory_bank
def memory_store(self, text: str, tags: Optional[List[str]] = None, importance: float = 0.7) -> Any:
mb = self.memory
if mb is None:
raise RuntimeError("Memory bank is not mounted on this model.")
if hasattr(mb, "store"):
return mb.store(
text=text,
metadata={
"bot_id": self.model.bot_id,
"user_id": self.model.user_id,
"timestamp": time.time(),
},
)
if hasattr(mb, "store_knowledge"):
return mb.store_knowledge(
[text], tags=tags or ["manual"], source="PackedLLMRunner", importance=importance
)
raise RuntimeError("Memory bank has neither store() nor store_knowledge().")
def memory_recall(self, query: str, top_k: int = 5, **kwargs: Any) -> Any:
mb = self.memory
if mb is None:
return []
for method_name in ("recall", "search", "query", "retrieve"):
fn = getattr(mb, method_name, None)
if callable(fn):
try:
return fn(query, top_k=top_k, **kwargs)
except TypeError:
return fn(query)
return []
def get_user_profile(self) -> Dict[str, Any]:
return dict(self.model._user_profile)
def get_bot_profile(self) -> Dict[str, Any]:
return dict(self.model._bot_profile)
def set_user_profile(self, updates: Dict[str, Any]) -> None:
self.model._user_profile.update(updates)
mb = self.memory
if mb is None or not self.model.user_id:
return
if hasattr(mb, "set_profile"):
mb.set_profile("user", self.model.user_id, self.model._user_profile)
elif hasattr(getattr(mb, "gator", None), "set_profile"):
mb.gator.set_profile("user", self.model.user_id, self.model._user_profile)
def set_bot_profile(self, updates: Dict[str, Any]) -> None:
self.model._bot_profile.update(updates)
mb = self.memory
if mb is None or not self.model.bot_id:
return
if hasattr(mb, "set_profile"):
mb.set_profile("bot", self.model.bot_id, self.model._bot_profile)
elif hasattr(getattr(mb, "gator", None), "set_profile"):
mb.gator.set_profile("bot", self.model.bot_id, self.model._bot_profile)
@property
def web_module(self) -> Any:
return self.model._web
def web_search(self, query: str, deep_search: bool = False, **kwargs: Any) -> Any:
"""Run a single raw web search with no LLM planning/synthesis wrapped around it."""
if self.model._web is None:
self.model._attach_web()
if self.model._web is None:
raise RuntimeError("Web module is unavailable.")
try:
return self.model._web.search(query, deep_search=deep_search, **kwargs)
except TypeError:
return self.model._web.search(query)
@property
def codebox(self) -> Any:
return self.model._get_codebox()
def run_code(
self,
code: str,
venv_id: str = "runner_default",
requirements: Optional[List[str]] = None,
timeout: int = 120,
max_ram_mb: int = 4096,
ensure_venv: bool = True,
) -> Dict[str, Any]:
cb = self.codebox
if cb is None:
raise RuntimeError("CodeBox is unavailable.")
if ensure_venv:
try:
if requirements:
cb.create_venv(venv_id, requirements=requirements)
else:
cb.create_venv(venv_id)
except Exception:
pass # venv likely already exists
return cb.run_code(
code, venv_id=venv_id, requirements=None, timeout=timeout, max_ram_mb=max_ram_mb
)
def status(self) -> Dict[str, Any]:
s = self.model.summary()
s["load_seconds"] = round(self._load_seconds, 2)
s["warmup_report"] = self.warmup_report
return s
def reload_expert(self, expert_name: str) -> Any:
return self.model.reload_expert(expert_name)
def unload_expert(self, expert_name: str) -> None:
self.model.unload_expert(expert_name)
def unload_all(self) -> None:
self.model.unload_all()
def save(self, path: Optional[Union[str, Path]] = None) -> None:
self.model.save_checkpoint(str(path) if path else self.checkpoint_path)
def __repr__(self) -> str:
return f"PackedLLMRunner(checkpoint={self.checkpoint_path!r}, model={self.model!r})"
if __name__ == "__main__":
TEST_PROMPT = """
You are managing a disaster response operation.
A hurricane has hit a coastal city of 850,000 people.
Available resources:
- 120 medical teams
- 80 search-and-rescue teams
- 60 engineering crews
- 40 water purification units
- 25 helicopters
- 15 cargo aircraft
Situation:
- 3 hospitals are operating at 140% capacity.
- 2 hospitals are offline.
- 18 bridges are damaged.
- 120,000 people are without drinking water.
- 75,000 people require evacuation.
- Fuel reserves will last 6 days.
- Weather forecasts predict another storm in 72 hours.
Requirements:
1. Create a 7-day response plan.
2. Prioritize all resource allocation.
3. Identify the three largest risks.
4. Estimate where bottlenecks will occur.
5. Explain tradeoffs between evacuation, medical care, and infrastructure repair.
6. Give a contingency plan if 30% of resources become unavailable.
7. Output a final executive summary under 150 words.
Think carefully and provide detailed reasoning.
"""
bot = PackedLLMRunner(
"PackedLLM.pt",
bot_id="pip",
user_id="test_user"
)
try:
print("=" * 80)
print("STATUS")
print("=" * 80)
print(bot.status())
tests = [
("BASELINE", {}),
("FAST_THINK", {"fast_think": True}),
("DEEP_THINK", {"deep_think": True}),
("FAST+DEEP", {"fast_think": True, "deep_think": True}),
]
for name, kwargs in tests:
print("\n" + "=" * 80)
print(name)
print("=" * 80)
response = bot.chat(
TEST_PROMPT,
**kwargs
)
print(response)
finally:
try:
bot.unload_all()
except Exception:
pass