File size: 5,406 Bytes
0769ff3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 | """
ENGRAM Protocol β ENGRAM Python Client
Async HTTP client wrapping all ENGRAM API endpoints.
This is what agents import to interact with the engram store.
"""
from __future__ import annotations
from pathlib import Path
from typing import Any
import httpx
class EngramClient:
"""Python client for the ENGRAM REST API.
Usage:
client = EngramClient("http://localhost:8080")
result = client.store_file(path, agent_id="worker", task="analyze code", model_id="llama-3.1-8b")
matches = client.search(task_description="debug auth error", top_k=3)
data = client.get(matches[0]["cache_id"])
"""
def __init__(self, base_url: str = "http://localhost:8080", timeout: float = 30.0):
self.base_url = base_url.rstrip("/")
self._client = httpx.Client(base_url=f"{self.base_url}/v1", timeout=timeout)
def close(self) -> None:
self._client.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
# ββ Health ββββββββββββββββββββββββββββββββββββββββββββββββ
def health(self) -> dict[str, Any]:
"""Check ENGRAM server health."""
resp = self._client.get("/health")
resp.raise_for_status()
return resp.json()
# ββ Store βββββββββββββββββββββββββββββββββββββββββββββββββ
def store_file(
self,
file_path: Path,
agent_id: str,
task_description: str,
model_id: str,
compression: str = "q8_0",
) -> dict[str, Any]:
"""Upload a .eng file to the engram store.
Args:
file_path: Path to the .eng file
agent_id: Agent identifier
task_description: Human-readable description
model_id: Model identifier
compression: Compression method used
Returns:
Dict with cache_id, size_bytes, compression_ratio, path
"""
with open(file_path, "rb") as f:
resp = self._client.post(
"/cache",
params={
"agent_id": agent_id,
"task_description": task_description,
"model_id": model_id,
"compression": compression,
},
files={"file": (file_path.name, f, "application/octet-stream")},
)
resp.raise_for_status()
return resp.json()
def store_bytes(
self,
data: bytes,
agent_id: str,
task_description: str,
model_id: str,
compression: str = "q8_0",
filename: str = "cache.eng",
) -> dict[str, Any]:
"""Upload raw .eng bytes to the engram store."""
resp = self._client.post(
"/cache",
params={
"agent_id": agent_id,
"task_description": task_description,
"model_id": model_id,
"compression": compression,
},
files={"file": (filename, data, "application/octet-stream")},
)
resp.raise_for_status()
return resp.json()
# ββ Retrieve ββββββββββββββββββββββββββββββββββββββββββββββ
def get(self, cache_id: str) -> bytes:
"""Retrieve a .eng file by cache ID.
Returns raw bytes of the .eng file.
"""
resp = self._client.get(f"/cache/{cache_id}")
resp.raise_for_status()
return resp.content
# ββ Search ββββββββββββββββββββββββββββββββββββββββββββββββ
def search(
self,
task_description: str,
model_id: str | None = None,
top_k: int = 5,
min_similarity: float | None = None,
) -> list[dict[str, Any]]:
"""Search for similar engram states.
Returns list of search result dicts with cache_id, similarity, etc.
"""
body: dict[str, Any] = {
"task_description": task_description,
"top_k": top_k,
}
if model_id:
body["model_id"] = model_id
if min_similarity is not None:
body["min_similarity"] = min_similarity
resp = self._client.post("/cache/search", json=body)
resp.raise_for_status()
return resp.json()["results"]
# ββ Delete ββββββββββββββββββββββββββββββββββββββββββββββββ
def delete(self, cache_id: str) -> bool:
"""Delete an engram from storage and index."""
resp = self._client.delete(f"/cache/{cache_id}")
resp.raise_for_status()
return resp.json()["deleted"]
# ββ Stats βββββββββββββββββββββββββββββββββββββββββββββββββ
def stats(self) -> dict[str, Any]:
"""Get aggregate engram store statistics."""
resp = self._client.get("/cache/stats")
resp.raise_for_status()
return resp.json()
|