| """ |
| Predictive Cache — anticipates future requests and pre-caches results. |
| |
| Ultra-lightweight: uses compressed execution signatures and access |
| pattern tracking. Predicts what will be needed next and pre-computes. |
| """ |
| import os |
| import json |
| import time |
| import hashlib |
| from collections import defaultdict |
| from typing import Optional, Any |
|
|
| _CACHE_PATH = os.getenv("ADAM_PREDICTIVE_CACHE_PATH", "/tmp/adam_pcache.json") |
| _MAX_ENTRIES = int(os.getenv("ADAM_PREDICTIVE_CACHE_MAX", "200")) |
| _TTL_SECONDS = int(os.getenv("ADAM_PREDICTIVE_CACHE_TTL", "600")) |
|
|
|
|
| class PredictiveCache: |
| """ |
| Predicts future cache needs and pre-populates. |
| |
| Tracks access patterns to predict what will be needed next. |
| Uses markov-chain-like prediction over access sequences. |
| """ |
|
|
| def __init__(self): |
| self._cache: dict[str, dict] = {} |
| self._access_patterns: dict[str, list[str]] = defaultdict(list) |
| self._access_sequence: list[str] = [] |
| self._load() |
|
|
| def _load(self): |
| """Load cache from disk.""" |
| try: |
| if os.path.exists(_CACHE_PATH): |
| with open(_CACHE_PATH, "r") as f: |
| data = json.load(f) |
| self._cache = {k: v for k, v in data.get("cache", {}).items() |
| if v.get("ts", 0) + _TTL_SECONDS > time.time()} |
| self._access_patterns = defaultdict(list, data.get("patterns", {})) |
| except Exception: |
| pass |
|
|
| def _save(self): |
| """Persist cache to disk.""" |
| try: |
| os.makedirs(os.path.dirname(_CACHE_PATH) or ".", exist_ok=True) |
| |
| now = time.time() |
| self._cache = {k: v for k, v in self._cache.items() |
| if v.get("ts", 0) + _TTL_SECONDS > now} |
| with open(_CACHE_PATH, "w") as f: |
| json.dump({ |
| "cache": dict(list(self._cache.items())[:_MAX_ENTRIES]), |
| "patterns": dict(self._access_patterns), |
| }, f) |
| except Exception: |
| pass |
|
|
| async def get(self, key: str) -> Optional[Any]: |
| """Get a cached value.""" |
| entry = self._cache.get(key) |
| if entry and entry.get("ts", 0) + _TTL_SECONDS > time.time(): |
| self._record_access(key) |
| return entry.get("value") |
| return None |
|
|
| async def set(self, key: str, value: Any, ttl: int = None): |
| """Set a cached value.""" |
| self._cache[key] = { |
| "value": value, |
| "ts": time.time(), |
| "ttl": ttl or _TTL_SECONDS, |
| } |
| if len(self._cache) > _MAX_ENTRIES: |
| oldest = min(self._cache.keys(), key=lambda k: self._cache[k].get("ts", 0)) |
| del self._cache[oldest] |
| self._save() |
|
|
| def _record_access(self, key: str): |
| """Record cache access for pattern learning.""" |
| self._access_sequence.append(key) |
| if len(self._access_sequence) > 100: |
| self._access_sequence = self._access_sequence[-50:] |
|
|
| if len(self._access_sequence) >= 2: |
| prev = self._access_sequence[-2] |
| if key not in self._access_patterns[prev]: |
| self._access_patterns[prev].append(key) |
| if len(self._access_patterns[prev]) > 10: |
| self._access_patterns[prev] = self._access_patterns[prev][-5:] |
|
|
| def predict_next(self) -> Optional[str]: |
| """Predict the most likely next cache key.""" |
| if not self._access_sequence: |
| return None |
|
|
| last_key = self._access_sequence[-1] |
| if last_key in self._access_patterns: |
| candidates = self._access_patterns[last_key] |
| if candidates: |
| return candidates[0] |
| return None |
|
|
| async def clear(self): |
| """Clear all cached entries.""" |
| self._cache.clear() |
| self._access_patterns.clear() |
| self._access_sequence.clear() |
| self._save() |
|
|
| def get_stats(self) -> dict: |
| """Get cache statistics.""" |
| return { |
| "size": len(self._cache), |
| "patterns": sum(len(v) for v in self._access_patterns.values()), |
| "ttl_seconds": _TTL_SECONDS, |
| "max_entries": _MAX_ENTRIES, |
| } |
|
|