engram / kvcos /api /routes.py
eigengram's picture
feat: upload core kvcos library
0769ff3 verified
"""
ENGRAM Protocol β€” API Routes
FastAPI route handlers for the ENGRAM REST API.
All endpoints under /v1/ prefix.
"""
from __future__ import annotations
from fastapi import APIRouter, HTTPException, UploadFile, File
from kvcos.api.schemas import (
DeleteResponse,
HealthResponse,
SearchRequest,
SearchResponse,
SearchResultItem,
StatsResponse,
StoreResponse,
)
from kvcos.core.types import ENGRAM_VERSION
router = APIRouter(prefix="/v1")
# ── Dependency stubs ──────────────────────────────────────────────────────────
# These are replaced by real instances in server.py lifespan.
# Using module-level state that the server sets during startup.
_retriever = None
_storage = None
_index = None
def _get_retriever():
if _retriever is None:
raise HTTPException(503, "ENGRAM not initialized. Server starting up.")
return _retriever
def _get_storage():
if _storage is None:
raise HTTPException(503, "ENGRAM not initialized. Server starting up.")
return _storage
def _get_index():
if _index is None:
raise HTTPException(503, "ENGRAM not initialized. Server starting up.")
return _index
# ── Health ────────────────────────────────────────────────────────────────────
@router.get("/health", response_model=HealthResponse)
async def health():
"""Health check endpoint."""
index = _get_index()
storage = _get_storage()
return HealthResponse(
status="ok",
version=ENGRAM_VERSION,
index_entries=index.n_entries,
storage_backend="local",
)
# ── Stats (must come before /cache/{cache_id} to avoid route shadowing) ──────
@router.get("/cache/stats", response_model=StatsResponse)
async def cache_stats():
"""Get aggregate statistics for the engram store."""
storage = _get_storage()
stats = storage.stats()
return StatsResponse(
total_entries=stats["total_entries"],
total_size_bytes=stats["total_size_bytes"],
total_size_mb=round(stats["total_size_bytes"] / (1024 * 1024), 2),
avg_compression_ratio=stats["avg_compression_ratio"],
model_breakdown=stats["model_breakdown"],
)
# ── Store ─────────────────────────────────────────────────────────────────────
@router.post("/cache", response_model=StoreResponse)
async def store_cache(
agent_id: str,
task_description: str,
model_id: str,
file: UploadFile = File(...),
compression: str = "q8_0",
):
"""Store a .eng file in the engram store.
Accepts a pre-serialized .eng file upload.
The file is stored and its metadata indexed for EGR retrieval.
"""
storage = _get_storage()
data = await file.read()
if len(data) == 0:
raise HTTPException(400, "Empty file upload")
import uuid
cache_id = str(uuid.uuid4())
from kvcos.core.types import EngramMetadata
from datetime import datetime, timezone
metadata: EngramMetadata = {
"engram_version": ENGRAM_VERSION,
"cache_id": cache_id,
"compression": compression,
"model_id": model_id,
"model_family": "",
"n_layers": "0",
"n_heads": "0",
"n_kv_heads": "0",
"head_dim": "0",
"context_len": "0",
"agent_id": agent_id,
"task_description": task_description,
"created_at": datetime.now(timezone.utc).isoformat(),
}
path = storage.store(cache_id, data, metadata)
return StoreResponse(
cache_id=cache_id,
size_bytes=len(data),
compression_ratio=1.0,
path=path,
)
# ── Retrieve by ID ────────────────────────────────────────────────────────────
@router.get("/cache/{cache_id}")
async def get_cache(cache_id: str):
"""Retrieve a .eng file by cache ID.
Returns the raw .eng file bytes (application/octet-stream).
"""
storage = _get_storage()
data = storage.get(cache_id)
if data is None:
raise HTTPException(404, f"Cache entry not found: {cache_id}")
from fastapi.responses import Response
return Response(
content=data,
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="{cache_id}.eng"'},
)
# ── Search ────────────────────────────────────────────────────────────────────
@router.post("/cache/search", response_model=SearchResponse)
async def search_cache(req: SearchRequest):
"""Search for similar engram states via EGR manifold search.
Uses inner product similarity (MIPS) in the model's pre-RoPE
key manifold. D2: K→K retrieval only.
"""
index = _get_index()
# For text-only search without a KV query vector, we need the
# retriever to extract a state vector first. This endpoint
# currently returns index entries matching by metadata filter.
# Full EGR vector search requires a query KV cache (via /egr/retrieve).
# Metadata-based listing with optional filters
storage = _get_storage()
entries = storage.list_entries(model_family=None, limit=req.top_k)
results = [
SearchResultItem(
cache_id=e.get("cache_id", ""),
similarity=0.0,
task_description=e.get("task_description", ""),
model_id=e.get("model_id", ""),
created_at=e.get("created_at", ""),
context_len=int(e.get("context_len", "0")),
)
for e in entries
if (req.model_id is None or e.get("model_id") == req.model_id)
]
return SearchResponse(results=results[:req.top_k], n_searched=index.n_entries)
# ── Delete ────────────────────────────────────────────────────────────────────
@router.delete("/cache/{cache_id}", response_model=DeleteResponse)
async def delete_cache(cache_id: str):
"""Delete an engram from storage and index."""
retriever = _get_retriever()
deleted = retriever.delete_engram(cache_id)
return DeleteResponse(deleted=deleted, cache_id=cache_id)