granite-code-3b / shared /agent /memory /predictive_cache.py
AjinkyaPagare's picture
ADAM v2.0: Advanced Agentic Mesh — DAG orchestrator, cognition, knowledge web, forge tools, runtime optimization
ba2ada2
"""
Predictive Cache — anticipates future requests and pre-caches results.
Ultra-lightweight: uses compressed execution signatures and access
pattern tracking. Predicts what will be needed next and pre-computes.
"""
import os
import json
import time
import hashlib
from collections import defaultdict
from typing import Optional, Any
_CACHE_PATH = os.getenv("ADAM_PREDICTIVE_CACHE_PATH", "/tmp/adam_pcache.json")
_MAX_ENTRIES = int(os.getenv("ADAM_PREDICTIVE_CACHE_MAX", "200"))
_TTL_SECONDS = int(os.getenv("ADAM_PREDICTIVE_CACHE_TTL", "600"))
class PredictiveCache:
"""
Predicts future cache needs and pre-populates.
Tracks access patterns to predict what will be needed next.
Uses markov-chain-like prediction over access sequences.
"""
def __init__(self):
self._cache: dict[str, dict] = {}
self._access_patterns: dict[str, list[str]] = defaultdict(list)
self._access_sequence: list[str] = []
self._load()
def _load(self):
"""Load cache from disk."""
try:
if os.path.exists(_CACHE_PATH):
with open(_CACHE_PATH, "r") as f:
data = json.load(f)
self._cache = {k: v for k, v in data.get("cache", {}).items()
if v.get("ts", 0) + _TTL_SECONDS > time.time()}
self._access_patterns = defaultdict(list, data.get("patterns", {}))
except Exception:
pass
def _save(self):
"""Persist cache to disk."""
try:
os.makedirs(os.path.dirname(_CACHE_PATH) or ".", exist_ok=True)
# Evict expired
now = time.time()
self._cache = {k: v for k, v in self._cache.items()
if v.get("ts", 0) + _TTL_SECONDS > now}
with open(_CACHE_PATH, "w") as f:
json.dump({
"cache": dict(list(self._cache.items())[:_MAX_ENTRIES]),
"patterns": dict(self._access_patterns),
}, f)
except Exception:
pass
async def get(self, key: str) -> Optional[Any]:
"""Get a cached value."""
entry = self._cache.get(key)
if entry and entry.get("ts", 0) + _TTL_SECONDS > time.time():
self._record_access(key)
return entry.get("value")
return None
async def set(self, key: str, value: Any, ttl: int = None):
"""Set a cached value."""
self._cache[key] = {
"value": value,
"ts": time.time(),
"ttl": ttl or _TTL_SECONDS,
}
if len(self._cache) > _MAX_ENTRIES:
oldest = min(self._cache.keys(), key=lambda k: self._cache[k].get("ts", 0))
del self._cache[oldest]
self._save()
def _record_access(self, key: str):
"""Record cache access for pattern learning."""
self._access_sequence.append(key)
if len(self._access_sequence) > 100:
self._access_sequence = self._access_sequence[-50:]
if len(self._access_sequence) >= 2:
prev = self._access_sequence[-2]
if key not in self._access_patterns[prev]:
self._access_patterns[prev].append(key)
if len(self._access_patterns[prev]) > 10:
self._access_patterns[prev] = self._access_patterns[prev][-5:]
def predict_next(self) -> Optional[str]:
"""Predict the most likely next cache key."""
if not self._access_sequence:
return None
last_key = self._access_sequence[-1]
if last_key in self._access_patterns:
candidates = self._access_patterns[last_key]
if candidates:
return candidates[0]
return None
async def clear(self):
"""Clear all cached entries."""
self._cache.clear()
self._access_patterns.clear()
self._access_sequence.clear()
self._save()
def get_stats(self) -> dict:
"""Get cache statistics."""
return {
"size": len(self._cache),
"patterns": sum(len(v) for v in self._access_patterns.values()),
"ttl_seconds": _TTL_SECONDS,
"max_entries": _MAX_ENTRIES,
}