File size: 14,589 Bytes
954cf8a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 | """
ENGRAM Protocol β Demo Agent Session
End-to-end demonstration:
1. Load model via llama-cpp-python (D1)
2. Generate with a prompt β measure cold TTFT
3. Extract KV cache β compress β serialize to .eng
4. Index in EGR manifold index
5. Reset model β restore from .eng β measure cached TTFT
6. Print speedup ratio
D6: Target >10x TTFT reduction at 16K context on Llama 3.1 8B.
Cold baseline: ~1,500-5,000ms. Cached target: <500ms.
Anything below 4x at 16K is a failure.
"""
from __future__ import annotations
import argparse
import sys
import time
from pathlib import Path
def _run_dry_run(args: argparse.Namespace) -> int:
"""Run full pipeline with synthetic tensors β no model file needed."""
import os
import tempfile
import torch
from kvcos.core.cache_spec import LLAMA_3_1_8B
from kvcos.core.serializer import EngramSerializer
from kvcos.core.types import CompressionMethod, StateExtractionMode
from kvcos.core.manifold_index import IndexEntry, ManifoldIndex
from kvcos.core.state_extractor import MARStateExtractor
from kvcos.storage.local import LocalStorageBackend
spec = LLAMA_3_1_8B
ctx_len = args.context
model_name = spec["model_id"]
# ββ Synthetic KV tensors ββββββββββββββββββββββββββββββββββ
torch.manual_seed(42)
shape = (spec["n_layers"], spec["n_kv_heads"], ctx_len, spec["head_dim"])
keys = torch.randn(shape, dtype=torch.float16)
values = torch.randn(shape, dtype=torch.float16)
tensor_mb = keys.numel() * keys.element_size() / 1024 / 1024
with tempfile.TemporaryDirectory() as tmp:
tmp_dir = Path(tmp)
# ββ Serialize to .eng ββββββββββββββββββββββββββββββββ
serializer = EngramSerializer()
eng_path = tmp_dir / "dry_run.eng"
t0 = time.perf_counter()
result = serializer.serialize(
keys=keys, values=values,
agent_id="dry-run-agent",
task_description="dry run benchmark",
model_id=model_name,
output_path=eng_path,
compression=CompressionMethod.Q8_0,
)
serialize_ms = (time.perf_counter() - t0) * 1000
# ββ Load back ββββββββββββββββββββββββββββββββββββββββ
t0 = time.perf_counter()
k_out, v_out, meta = serializer.deserialize(eng_path)
deserialize_ms = (time.perf_counter() - t0) * 1000
assert k_out.shape == keys.shape, f"Shape mismatch: {k_out.shape} vs {keys.shape}"
# ββ EGR granular timing ββββββββββββββββββββββββββββββ
extractor = MARStateExtractor(
mode=StateExtractionMode.SVD_PROJECT,
rank=min(160, spec["head_dim"]),
)
dim = extractor.output_dim(spec)
index = ManifoldIndex(dim=dim)
storage = LocalStorageBackend(data_dir=tmp_dir)
# Index: extract + serialize + store + add
t0 = time.perf_counter()
extraction = extractor.extract(keys, spec)
t_extract = time.perf_counter()
eng2 = tmp_dir / "indexed.eng"
serializer.serialize(
keys=keys, values=values,
agent_id="dry-run-agent",
task_description="dry run benchmark",
model_id=model_name,
output_path=eng2,
compression=CompressionMethod.Q8_0,
cache_id="dry-run-001",
)
t_serialize = time.perf_counter()
idx_meta = serializer.read_metadata_only(eng2)
storage.store_file("dry-run-001", eng2, idx_meta)
t_store = time.perf_counter()
from datetime import datetime, timezone
entry = IndexEntry(
cache_id="dry-run-001",
task_description="dry run benchmark",
model_id=model_name,
created_at=datetime.now(timezone.utc).isoformat(),
context_len=ctx_len,
l2_norm=extraction.l2_norm,
)
index.add(extraction.state_vec, entry)
t_add = time.perf_counter()
extract_ms = (t_extract - t0) * 1000
ser_ms = (t_serialize - t_extract) * 1000
store_ms = (t_store - t_serialize) * 1000
add_ms = (t_add - t_store) * 1000
index_ms = (t_add - t0) * 1000
# Retrieve: extract query + search + load
torch.manual_seed(99)
query_keys = torch.randn(shape, dtype=torch.float16)
t0 = time.perf_counter()
q_ext = extractor.extract(query_keys, spec)
t_qext = time.perf_counter()
results = index.search(q_ext.state_vec, top_k=1)
t_search = time.perf_counter()
# Load matched engram
stored_path = storage.get_path("dry-run-001")
k_loaded, v_loaded, _ = serializer.deserialize(stored_path)
t_load = time.perf_counter()
q_extract_ms = (t_qext - t0) * 1000
search_ms = (t_search - t_qext) * 1000
load_ms = (t_load - t_search) * 1000
retrieve_ms = (t_load - t0) * 1000
# ββ Simulate TTFT estimates ββββββββββββββββββββββββββ
cold_ms = ctx_len * 0.1 # simulated
cached_ms = deserialize_ms
egr_overhead = extract_ms + search_ms # overhead added to warm path
speedup = cold_ms / cached_ms if cached_ms > 0 else float("inf")
eng_size_mb = os.path.getsize(eng_path) / 1024 / 1024
# ββ Output βββββββββββββββββββββββββββββββββββββββββββ
sep = "=" * 35
print(sep)
print("ENGRAM Protocol \u2014 EGR Demo")
print(f"Model: {model_name}")
print(f"Context: {ctx_len} tokens")
print(sep)
print(f"Cold TTFT: {cold_ms:.1f}ms (simulated)")
print(f"Cached TTFT: {cached_ms:.1f}ms (deserialize)")
print(f"Speedup: {speedup:.1f}x")
print(f"D6 target: >10x at 16K tokens")
status = "PASS" if speedup > 10 else "FAIL"
print(f"Status: {status}")
print(f"EGR overhead: {egr_overhead:.1f}ms (extract+search)")
print(f".eng file: {eng_path.name} ({eng_size_mb:.1f}MB)")
print(f"Tensor shape: {list(shape)} ({tensor_mb:.0f}MB per K/V)")
print(sep)
print()
print("Index breakdown:")
print(f" SVD extract: {extract_ms:8.1f}ms")
print(f" Serialize .eng: {ser_ms:8.1f}ms")
print(f" Store backend: {store_ms:8.1f}ms")
print(f" FAISS add(): {add_ms:8.1f}ms")
print(f" TOTAL: {index_ms:8.1f}ms")
print()
print("Retrieve breakdown:")
print(f" SVD extract: {q_extract_ms:8.1f}ms")
print(f" FAISS search(): {search_ms:8.1f}ms")
print(f" Load+deser: {load_ms:8.1f}ms")
print(f" TOTAL: {retrieve_ms:8.1f}ms")
print()
print("Verification:")
print(f" Round-trip shape: {'OK' if k_out.shape == keys.shape else 'FAIL'}")
print(f" Retrieval result: {'OK' if len(results) >= 1 else 'FAIL'}")
print(f" .eng valid: {'OK' if eng_path.exists() else 'FAIL'}")
return 0 if speedup > 10 else 1
def main():
parser = argparse.ArgumentParser(
description="ENGRAM Protocol β Demo Agent Session",
epilog="D6: >10x TTFT reduction at 16K context on Llama 3.1 8B",
)
parser.add_argument(
"--model", "-m", default=None,
help="Path to GGUF model file (required unless --dry-run)",
)
parser.add_argument(
"--context", "-c", type=int, default=4096,
help="Context length to fill (tokens). Default: 4096",
)
parser.add_argument(
"--n-ctx", type=int, default=16384,
help="Max context window for model. Default: 16384",
)
parser.add_argument(
"--data-dir", type=str, default=None,
help="ENGRAM data directory. Default: ~/.engram/data",
)
parser.add_argument(
"--dry-run", action="store_true",
help="Run full pipeline with synthetic tensors (no model needed)",
)
parser.add_argument(
"--verbose", "-v", action="store_true",
help="Enable verbose output",
)
args = parser.parse_args()
if args.dry_run:
return _run_dry_run(args)
if not args.model:
parser.error("--model is required unless --dry-run is specified")
print("=" * 70)
print("ENGRAM Protocol β Demo Agent Session")
print("KV cache fingerprinting for persistent semantic retrieval")
print("=" * 70)
print()
# ββ Setup βββββββββββββββββββββββββββββββββββββββββββββββββ
from kvcos.core.config import get_config
from kvcos.core.serializer import EngramSerializer
from kvcos.core.types import CompressionMethod, StateExtractionMode
from kvcos.core.manifold_index import ManifoldIndex
from kvcos.core.retriever import EGRRetriever
from kvcos.core.state_extractor import MARStateExtractor
from kvcos.storage.local import LocalStorageBackend
from integrations.llama_cpp_bridge import LlamaCppBridge
config = get_config()
data_dir = Path(args.data_dir) if args.data_dir else config.data_dir
# ββ Step 1: Load Model ββββββββββββββββββββββββββββββββββββ
print(f"[1/6] Loading model: {args.model}")
bridge = LlamaCppBridge(
model_path=args.model,
n_ctx=args.n_ctx,
n_gpu_layers=0, # D1
verbose=args.verbose,
)
spec = bridge.load_model()
print(f" Model: {spec['model_id']}")
print(f" Architecture: {spec['n_layers']}L / {spec['n_heads']}H / {spec['n_kv_heads']}KV / {spec['head_dim']}D")
print(f" Context window: {args.n_ctx}")
print()
# ββ Step 2: Generate + Cold TTFT ββββββββββββββββββββββββββ
filler = "The quick brown fox jumps over the lazy dog. " * 100
target_tokens = args.context
prompt = filler[:target_tokens * 4]
print(f"[2/6] Cold prefill ({target_tokens} target tokens)...")
t0 = time.perf_counter()
cold = bridge.measure_cold_ttft(prompt)
print(f" Cold TTFT: {cold.ttft_ms:.1f}ms ({cold.context_len} tokens)")
print()
# ββ Step 3: Extract + Serialize βββββββββββββββββββββββββββ
print("[3/6] Extracting KV cache...")
try:
parsed = bridge.extract_kv_cache()
print(f" Keys shape: {list(parsed.keys.shape)}")
print(f" Values shape: {list(parsed.values.shape)}")
print(f" Cells: {parsed.n_cells}")
except Exception as e:
print(f" KV extraction failed: {e}")
print(" This is expected if the blob format doesn't match.")
print(" Falling back to save_state/load_state raw blob path.")
parsed = None
print()
print("[3b/6] Saving raw state blob...")
raw_state = bridge.llm.save_state()
raw_blob = bytes(raw_state.llama_state)
print(f" Raw state size: {len(raw_blob) / 1024 / 1024:.1f} MB")
if parsed is not None:
print("[3c/6] Serializing to .eng format...")
serializer = EngramSerializer()
eng_path = data_dir / "demo" / "session_001.eng"
result = serializer.serialize(
keys=parsed.keys,
values=parsed.values,
agent_id="demo-agent",
task_description="demo session - cold prefill benchmark",
model_id=spec["model_id"],
output_path=eng_path,
compression=CompressionMethod.Q8_0,
)
print(f" .eng file: {result['path']}")
print(f" Size: {result['size_bytes'] / 1024 / 1024:.1f} MB")
print(f" Compression ratio: {result['compression_ratio']:.2f}x")
print()
# ββ Step 4: Index in EGR ββββββββββββββββββββββββββββββββββ
if parsed is not None:
print("[4/6] Indexing in EGR manifold index...")
storage = LocalStorageBackend(data_dir=data_dir)
extractor = MARStateExtractor(
mode=StateExtractionMode.SVD_PROJECT,
rank=min(160, spec["head_dim"]),
)
dim = extractor.output_dim(spec)
index = ManifoldIndex(dim=dim)
retriever = EGRRetriever(extractor, index, storage)
cache_id = retriever.index_engram(
keys=parsed.keys,
values=parsed.values,
spec=spec,
agent_id="demo-agent",
task_description="demo session - cold prefill benchmark",
model_id=spec["model_id"],
)
print(f" Indexed: {cache_id}")
print(f" State vector dim: {dim}")
print(f" Index entries: {index.n_entries}")
else:
print("[4/6] Skipped (KV extraction failed)")
print()
# ββ Step 5: Restore + Cached TTFT βββββββββββββββββββββββββ
print("[5/6] Restoring from cached state...")
t0 = time.perf_counter()
cached = bridge.measure_cached_ttft(raw_blob)
print(f" Cached TTFT: {cached.ttft_ms:.1f}ms")
print()
# ββ Step 6: Results βββββββββββββββββββββββββββββββββββββββ
cold_ms = cold.ttft_ms
cached_ms = cached.ttft_ms
speedup = cold_ms / cached_ms if cached_ms > 0 else float("inf")
eng_path_str = result["path"] if parsed else "N/A"
eng_size_kb = result["size_bytes"] / 1024 if parsed else 0
sep = "=" * 35
print(sep)
print("ENGRAM Protocol β EGR Demo")
print(f"Model: {spec['model_id']}")
print(f"Context: {cold.context_len} tokens")
print(sep)
print(f"Cold TTFT: {cold_ms:.1f}ms")
print(f"Cached TTFT: {cached_ms:.1f}ms")
print(f"Speedup: {speedup:.1f}x")
print(f"D6 target: >10x at 16K tokens")
status = "PASS" if speedup > 10 else "FAIL"
print(f"Status: {status}")
print(f".eng file: {eng_path_str} ({eng_size_kb:.1f}KB)")
print(sep)
return 0 if speedup >= 4 else 1
if __name__ == "__main__":
sys.exit(main())
|