File size: 6,779 Bytes
0769ff3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 | """
ENGRAM Protocol β API Routes
FastAPI route handlers for the ENGRAM REST API.
All endpoints under /v1/ prefix.
"""
from __future__ import annotations
from fastapi import APIRouter, HTTPException, UploadFile, File
from kvcos.api.schemas import (
DeleteResponse,
HealthResponse,
SearchRequest,
SearchResponse,
SearchResultItem,
StatsResponse,
StoreResponse,
)
from kvcos.core.types import ENGRAM_VERSION
router = APIRouter(prefix="/v1")
# ββ Dependency stubs ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# These are replaced by real instances in server.py lifespan.
# Using module-level state that the server sets during startup.
_retriever = None
_storage = None
_index = None
def _get_retriever():
if _retriever is None:
raise HTTPException(503, "ENGRAM not initialized. Server starting up.")
return _retriever
def _get_storage():
if _storage is None:
raise HTTPException(503, "ENGRAM not initialized. Server starting up.")
return _storage
def _get_index():
if _index is None:
raise HTTPException(503, "ENGRAM not initialized. Server starting up.")
return _index
# ββ Health ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@router.get("/health", response_model=HealthResponse)
async def health():
"""Health check endpoint."""
index = _get_index()
storage = _get_storage()
return HealthResponse(
status="ok",
version=ENGRAM_VERSION,
index_entries=index.n_entries,
storage_backend="local",
)
# ββ Stats (must come before /cache/{cache_id} to avoid route shadowing) ββββββ
@router.get("/cache/stats", response_model=StatsResponse)
async def cache_stats():
"""Get aggregate statistics for the engram store."""
storage = _get_storage()
stats = storage.stats()
return StatsResponse(
total_entries=stats["total_entries"],
total_size_bytes=stats["total_size_bytes"],
total_size_mb=round(stats["total_size_bytes"] / (1024 * 1024), 2),
avg_compression_ratio=stats["avg_compression_ratio"],
model_breakdown=stats["model_breakdown"],
)
# ββ Store βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@router.post("/cache", response_model=StoreResponse)
async def store_cache(
agent_id: str,
task_description: str,
model_id: str,
file: UploadFile = File(...),
compression: str = "q8_0",
):
"""Store a .eng file in the engram store.
Accepts a pre-serialized .eng file upload.
The file is stored and its metadata indexed for EGR retrieval.
"""
storage = _get_storage()
data = await file.read()
if len(data) == 0:
raise HTTPException(400, "Empty file upload")
import uuid
cache_id = str(uuid.uuid4())
from kvcos.core.types import EngramMetadata
from datetime import datetime, timezone
metadata: EngramMetadata = {
"engram_version": ENGRAM_VERSION,
"cache_id": cache_id,
"compression": compression,
"model_id": model_id,
"model_family": "",
"n_layers": "0",
"n_heads": "0",
"n_kv_heads": "0",
"head_dim": "0",
"context_len": "0",
"agent_id": agent_id,
"task_description": task_description,
"created_at": datetime.now(timezone.utc).isoformat(),
}
path = storage.store(cache_id, data, metadata)
return StoreResponse(
cache_id=cache_id,
size_bytes=len(data),
compression_ratio=1.0,
path=path,
)
# ββ Retrieve by ID ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@router.get("/cache/{cache_id}")
async def get_cache(cache_id: str):
"""Retrieve a .eng file by cache ID.
Returns the raw .eng file bytes (application/octet-stream).
"""
storage = _get_storage()
data = storage.get(cache_id)
if data is None:
raise HTTPException(404, f"Cache entry not found: {cache_id}")
from fastapi.responses import Response
return Response(
content=data,
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="{cache_id}.eng"'},
)
# ββ Search ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@router.post("/cache/search", response_model=SearchResponse)
async def search_cache(req: SearchRequest):
"""Search for similar engram states via EGR manifold search.
Uses inner product similarity (MIPS) in the model's pre-RoPE
key manifold. D2: KβK retrieval only.
"""
index = _get_index()
# For text-only search without a KV query vector, we need the
# retriever to extract a state vector first. This endpoint
# currently returns index entries matching by metadata filter.
# Full EGR vector search requires a query KV cache (via /egr/retrieve).
# Metadata-based listing with optional filters
storage = _get_storage()
entries = storage.list_entries(model_family=None, limit=req.top_k)
results = [
SearchResultItem(
cache_id=e.get("cache_id", ""),
similarity=0.0,
task_description=e.get("task_description", ""),
model_id=e.get("model_id", ""),
created_at=e.get("created_at", ""),
context_len=int(e.get("context_len", "0")),
)
for e in entries
if (req.model_id is None or e.get("model_id") == req.model_id)
]
return SearchResponse(results=results[:req.top_k], n_searched=index.n_entries)
# ββ Delete ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@router.delete("/cache/{cache_id}", response_model=DeleteResponse)
async def delete_cache(cache_id: str):
"""Delete an engram from storage and index."""
retriever = _get_retriever()
deleted = retriever.delete_engram(cache_id)
return DeleteResponse(deleted=deleted, cache_id=cache_id)
|