File size: 6,939 Bytes
0769ff3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
"""
ENGRAM Protocol — Local Disk Storage Backend


Directory layout:
    {data_dir}/{model_family}/{agent_id}/{date}/{cache_id}.eng

Phase 1 production backend. Zero infrastructure dependencies.
Uses safetensors header-only read for metadata operations.
D7: One safetensors file per 256-token block.
"""

from __future__ import annotations

import logging
import shutil
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path

logger = logging.getLogger(__name__)

from kvcos.core.serializer import EngramSerializer
from kvcos.core.types import ENG_FILE_EXTENSION, CacheStats, EngramMetadata
from kvcos.storage.backends import StorageBackend


class LocalStorageBackend(StorageBackend):
    """Local filesystem storage for .eng files.

    Files organized by model family, agent ID, and date.
    """

    def __init__(self, data_dir: Path):
        self.data_dir = data_dir
        self.data_dir.mkdir(parents=True, exist_ok=True)
        self._serializer = EngramSerializer()
        self._index: dict[str, Path] = {}  # cache_id → file path
        self._rebuild_index()

    def _rebuild_index(self) -> None:
        """Scan data directory and rebuild in-memory path index."""
        self._index.clear()
        for eng_file in self.data_dir.rglob(f"*{ENG_FILE_EXTENSION}"):
            cache_id = eng_file.stem
            try:
                meta = self._serializer.read_metadata_only(eng_file)
                if "cache_id" in meta:
                    cache_id = meta["cache_id"]
            except Exception as e:
                logger.debug("Skipping metadata for %s: %s", eng_file.name, e)
            self._index[cache_id] = eng_file

    def _resolve_path(self, metadata: EngramMetadata) -> Path:
        """Determine storage path from metadata."""
        model_family = metadata.get("model_family", "unknown")
        agent_id = metadata.get("agent_id", "default")
        date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
        cache_id = metadata.get("cache_id", "unknown")
        path = self.data_dir / model_family / agent_id / date_str / f"{cache_id}{ENG_FILE_EXTENSION}"
        path.parent.mkdir(parents=True, exist_ok=True)
        return path

    def store(self, cache_id: str, data: bytes, metadata: EngramMetadata) -> str:
        metadata_copy = dict(metadata)
        metadata_copy["cache_id"] = cache_id
        path = self._resolve_path(metadata_copy)  # type: ignore[arg-type]

        tmp_path = path.with_suffix(f"{ENG_FILE_EXTENSION}.tmp")
        try:
            tmp_path.write_bytes(data)
            tmp_path.rename(path)
        except Exception:
            tmp_path.unlink(missing_ok=True)
            raise

        self._index[cache_id] = path
        return str(path)

    def store_file(self, cache_id: str, source_path: Path, metadata: EngramMetadata) -> str:
        metadata_copy = dict(metadata)
        metadata_copy["cache_id"] = cache_id
        dest_path = self._resolve_path(metadata_copy)  # type: ignore[arg-type]

        if source_path == dest_path:
            self._index[cache_id] = dest_path
            return str(dest_path)

        tmp_path = dest_path.with_suffix(f"{ENG_FILE_EXTENSION}.tmp")
        try:
            shutil.copy2(str(source_path), str(tmp_path))
            tmp_path.rename(dest_path)
        except Exception:
            tmp_path.unlink(missing_ok=True)
            raise

        self._index[cache_id] = dest_path
        return str(dest_path)

    def get(self, cache_id: str) -> bytes | None:
        path = self._index.get(cache_id)
        if path is None or not path.exists():
            return None
        return path.read_bytes()

    def get_path(self, cache_id: str) -> Path | None:
        path = self._index.get(cache_id)
        if path is None or not path.exists():
            return None
        return path

    def get_metadata(self, cache_id: str) -> EngramMetadata | None:
        path = self._index.get(cache_id)
        if path is None or not path.exists():
            return None
        try:
            return self._serializer.read_metadata_only(path)
        except Exception as e:
            logger.warning("Failed to read metadata for %s: %s", cache_id, e)
            return None

    def delete(self, cache_id: str) -> bool:
        path = self._index.pop(cache_id, None)
        if path is None or not path.exists():
            return False

        path.unlink()

        parent = path.parent
        try:
            while parent != self.data_dir:
                if not any(parent.iterdir()):
                    parent.rmdir()
                    parent = parent.parent
                else:
                    break
        except OSError:
            pass

        return True

    def list_entries(
        self,
        agent_id: str | None = None,
        model_family: str | None = None,
        limit: int = 100,
    ) -> list[EngramMetadata]:
        results: list[EngramMetadata] = []

        for cache_id, path in self._index.items():
            if len(results) >= limit:
                break
            if not path.exists():
                continue
            try:
                meta = self._serializer.read_metadata_only(path)
            except Exception as e:
                logger.debug("Skipping %s in list_entries: %s", cache_id, e)
                continue
            if agent_id and meta.get("agent_id") != agent_id:
                continue
            if model_family and meta.get("model_family") != model_family:
                continue
            results.append(meta)

        results.sort(key=lambda m: m.get("created_at", ""), reverse=True)
        return results[:limit]

    def exists(self, cache_id: str) -> bool:
        path = self._index.get(cache_id)
        return path is not None and path.exists()

    def stats(self) -> CacheStats:
        total_entries = 0
        total_size = 0
        model_counts: dict[str, int] = defaultdict(int)

        for cache_id, path in self._index.items():
            if not path.exists():
                continue
            total_entries += 1
            total_size += path.stat().st_size
            try:
                meta = self._serializer.read_metadata_only(path)
                model_counts[meta.get("model_family", "unknown")] += 1
            except Exception as e:
                logger.debug("Metadata read failed for %s: %s", cache_id, e)
                model_counts["unknown"] += 1

        return CacheStats(
            total_entries=total_entries,
            total_size_bytes=total_size,
            avg_compression_ratio=0.0,
            model_breakdown=dict(model_counts),
        )

    def vacuum(self) -> int:
        """Remove stale index entries for deleted files."""
        stale = [cid for cid, path in self._index.items() if not path.exists()]
        for cid in stale:
            del self._index[cid]
        return len(stale)