""" Predictive Cache — anticipates future requests and pre-caches results. Ultra-lightweight: uses compressed execution signatures and access pattern tracking. Predicts what will be needed next and pre-computes. """ import os import json import time import hashlib from collections import defaultdict from typing import Optional, Any _CACHE_PATH = os.getenv("ADAM_PREDICTIVE_CACHE_PATH", "/tmp/adam_pcache.json") _MAX_ENTRIES = int(os.getenv("ADAM_PREDICTIVE_CACHE_MAX", "200")) _TTL_SECONDS = int(os.getenv("ADAM_PREDICTIVE_CACHE_TTL", "600")) class PredictiveCache: """ Predicts future cache needs and pre-populates. Tracks access patterns to predict what will be needed next. Uses markov-chain-like prediction over access sequences. """ def __init__(self): self._cache: dict[str, dict] = {} self._access_patterns: dict[str, list[str]] = defaultdict(list) self._access_sequence: list[str] = [] self._load() def _load(self): """Load cache from disk.""" try: if os.path.exists(_CACHE_PATH): with open(_CACHE_PATH, "r") as f: data = json.load(f) self._cache = {k: v for k, v in data.get("cache", {}).items() if v.get("ts", 0) + _TTL_SECONDS > time.time()} self._access_patterns = defaultdict(list, data.get("patterns", {})) except Exception: pass def _save(self): """Persist cache to disk.""" try: os.makedirs(os.path.dirname(_CACHE_PATH) or ".", exist_ok=True) # Evict expired now = time.time() self._cache = {k: v for k, v in self._cache.items() if v.get("ts", 0) + _TTL_SECONDS > now} with open(_CACHE_PATH, "w") as f: json.dump({ "cache": dict(list(self._cache.items())[:_MAX_ENTRIES]), "patterns": dict(self._access_patterns), }, f) except Exception: pass async def get(self, key: str) -> Optional[Any]: """Get a cached value.""" entry = self._cache.get(key) if entry and entry.get("ts", 0) + _TTL_SECONDS > time.time(): self._record_access(key) return entry.get("value") return None async def set(self, key: str, value: Any, ttl: int = None): """Set a cached value.""" self._cache[key] = { "value": value, "ts": time.time(), "ttl": ttl or _TTL_SECONDS, } if len(self._cache) > _MAX_ENTRIES: oldest = min(self._cache.keys(), key=lambda k: self._cache[k].get("ts", 0)) del self._cache[oldest] self._save() def _record_access(self, key: str): """Record cache access for pattern learning.""" self._access_sequence.append(key) if len(self._access_sequence) > 100: self._access_sequence = self._access_sequence[-50:] if len(self._access_sequence) >= 2: prev = self._access_sequence[-2] if key not in self._access_patterns[prev]: self._access_patterns[prev].append(key) if len(self._access_patterns[prev]) > 10: self._access_patterns[prev] = self._access_patterns[prev][-5:] def predict_next(self) -> Optional[str]: """Predict the most likely next cache key.""" if not self._access_sequence: return None last_key = self._access_sequence[-1] if last_key in self._access_patterns: candidates = self._access_patterns[last_key] if candidates: return candidates[0] return None async def clear(self): """Clear all cached entries.""" self._cache.clear() self._access_patterns.clear() self._access_sequence.clear() self._save() def get_stats(self) -> dict: """Get cache statistics.""" return { "size": len(self._cache), "patterns": sum(len(v) for v in self._access_patterns.values()), "ttl_seconds": _TTL_SECONDS, "max_entries": _MAX_ENTRIES, }