File size: 5,406 Bytes
0769ff3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""
ENGRAM Protocol β€” ENGRAM Python Client


Async HTTP client wrapping all ENGRAM API endpoints.
This is what agents import to interact with the engram store.
"""

from __future__ import annotations

from pathlib import Path
from typing import Any

import httpx


class EngramClient:
    """Python client for the ENGRAM REST API.

    Usage:
        client = EngramClient("http://localhost:8080")
        result = client.store_file(path, agent_id="worker", task="analyze code", model_id="llama-3.1-8b")
        matches = client.search(task_description="debug auth error", top_k=3)
        data = client.get(matches[0]["cache_id"])
    """

    def __init__(self, base_url: str = "http://localhost:8080", timeout: float = 30.0):
        self.base_url = base_url.rstrip("/")
        self._client = httpx.Client(base_url=f"{self.base_url}/v1", timeout=timeout)

    def close(self) -> None:
        self._client.close()

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.close()

    # ── Health ────────────────────────────────────────────────

    def health(self) -> dict[str, Any]:
        """Check ENGRAM server health."""
        resp = self._client.get("/health")
        resp.raise_for_status()
        return resp.json()

    # ── Store ─────────────────────────────────────────────────

    def store_file(
        self,
        file_path: Path,
        agent_id: str,
        task_description: str,
        model_id: str,
        compression: str = "q8_0",
    ) -> dict[str, Any]:
        """Upload a .eng file to the engram store.

        Args:
            file_path: Path to the .eng file
            agent_id: Agent identifier
            task_description: Human-readable description
            model_id: Model identifier
            compression: Compression method used

        Returns:
            Dict with cache_id, size_bytes, compression_ratio, path
        """
        with open(file_path, "rb") as f:
            resp = self._client.post(
                "/cache",
                params={
                    "agent_id": agent_id,
                    "task_description": task_description,
                    "model_id": model_id,
                    "compression": compression,
                },
                files={"file": (file_path.name, f, "application/octet-stream")},
            )
        resp.raise_for_status()
        return resp.json()

    def store_bytes(
        self,
        data: bytes,
        agent_id: str,
        task_description: str,
        model_id: str,
        compression: str = "q8_0",
        filename: str = "cache.eng",
    ) -> dict[str, Any]:
        """Upload raw .eng bytes to the engram store."""
        resp = self._client.post(
            "/cache",
            params={
                "agent_id": agent_id,
                "task_description": task_description,
                "model_id": model_id,
                "compression": compression,
            },
            files={"file": (filename, data, "application/octet-stream")},
        )
        resp.raise_for_status()
        return resp.json()

    # ── Retrieve ──────────────────────────────────────────────

    def get(self, cache_id: str) -> bytes:
        """Retrieve a .eng file by cache ID.

        Returns raw bytes of the .eng file.
        """
        resp = self._client.get(f"/cache/{cache_id}")
        resp.raise_for_status()
        return resp.content

    # ── Search ────────────────────────────────────────────────

    def search(
        self,
        task_description: str,
        model_id: str | None = None,
        top_k: int = 5,
        min_similarity: float | None = None,
    ) -> list[dict[str, Any]]:
        """Search for similar engram states.

        Returns list of search result dicts with cache_id, similarity, etc.
        """
        body: dict[str, Any] = {
            "task_description": task_description,
            "top_k": top_k,
        }
        if model_id:
            body["model_id"] = model_id
        if min_similarity is not None:
            body["min_similarity"] = min_similarity

        resp = self._client.post("/cache/search", json=body)
        resp.raise_for_status()
        return resp.json()["results"]

    # ── Delete ────────────────────────────────────────────────

    def delete(self, cache_id: str) -> bool:
        """Delete an engram from storage and index."""
        resp = self._client.delete(f"/cache/{cache_id}")
        resp.raise_for_status()
        return resp.json()["deleted"]

    # ── Stats ─────────────────────────────────────────────────

    def stats(self) -> dict[str, Any]:
        """Get aggregate engram store statistics."""
        resp = self._client.get("/cache/stats")
        resp.raise_for_status()
        return resp.json()