File size: 4,275 Bytes
ef4f590
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""
Predictive Cache — anticipates future requests and pre-caches results.

Ultra-lightweight: uses compressed execution signatures and access
pattern tracking. Predicts what will be needed next and pre-computes.
"""
import os
import json
import time
import hashlib
from collections import defaultdict
from typing import Optional, Any

_CACHE_PATH = os.getenv("ADAM_PREDICTIVE_CACHE_PATH", "/tmp/adam_pcache.json")
_MAX_ENTRIES = int(os.getenv("ADAM_PREDICTIVE_CACHE_MAX", "200"))
_TTL_SECONDS = int(os.getenv("ADAM_PREDICTIVE_CACHE_TTL", "600"))


class PredictiveCache:
    """
    Predicts future cache needs and pre-populates.

    Tracks access patterns to predict what will be needed next.
    Uses markov-chain-like prediction over access sequences.
    """

    def __init__(self):
        self._cache: dict[str, dict] = {}
        self._access_patterns: dict[str, list[str]] = defaultdict(list)
        self._access_sequence: list[str] = []
        self._load()

    def _load(self):
        """Load cache from disk."""
        try:
            if os.path.exists(_CACHE_PATH):
                with open(_CACHE_PATH, "r") as f:
                    data = json.load(f)
                    self._cache = {k: v for k, v in data.get("cache", {}).items()
                                   if v.get("ts", 0) + _TTL_SECONDS > time.time()}
                    self._access_patterns = defaultdict(list, data.get("patterns", {}))
        except Exception:
            pass

    def _save(self):
        """Persist cache to disk."""
        try:
            os.makedirs(os.path.dirname(_CACHE_PATH) or ".", exist_ok=True)
            # Evict expired
            now = time.time()
            self._cache = {k: v for k, v in self._cache.items()
                           if v.get("ts", 0) + _TTL_SECONDS > now}
            with open(_CACHE_PATH, "w") as f:
                json.dump({
                    "cache": dict(list(self._cache.items())[:_MAX_ENTRIES]),
                    "patterns": dict(self._access_patterns),
                }, f)
        except Exception:
            pass

    async def get(self, key: str) -> Optional[Any]:
        """Get a cached value."""
        entry = self._cache.get(key)
        if entry and entry.get("ts", 0) + _TTL_SECONDS > time.time():
            self._record_access(key)
            return entry.get("value")
        return None

    async def set(self, key: str, value: Any, ttl: int = None):
        """Set a cached value."""
        self._cache[key] = {
            "value": value,
            "ts": time.time(),
            "ttl": ttl or _TTL_SECONDS,
        }
        if len(self._cache) > _MAX_ENTRIES:
            oldest = min(self._cache.keys(), key=lambda k: self._cache[k].get("ts", 0))
            del self._cache[oldest]
        self._save()

    def _record_access(self, key: str):
        """Record cache access for pattern learning."""
        self._access_sequence.append(key)
        if len(self._access_sequence) > 100:
            self._access_sequence = self._access_sequence[-50:]

        if len(self._access_sequence) >= 2:
            prev = self._access_sequence[-2]
            if key not in self._access_patterns[prev]:
                self._access_patterns[prev].append(key)
                if len(self._access_patterns[prev]) > 10:
                    self._access_patterns[prev] = self._access_patterns[prev][-5:]

    def predict_next(self) -> Optional[str]:
        """Predict the most likely next cache key."""
        if not self._access_sequence:
            return None

        last_key = self._access_sequence[-1]
        if last_key in self._access_patterns:
            candidates = self._access_patterns[last_key]
            if candidates:
                return candidates[0]
        return None

    async def clear(self):
        """Clear all cached entries."""
        self._cache.clear()
        self._access_patterns.clear()
        self._access_sequence.clear()
        self._save()

    def get_stats(self) -> dict:
        """Get cache statistics."""
        return {
            "size": len(self._cache),
            "patterns": sum(len(v) for v in self._access_patterns.values()),
            "ttl_seconds": _TTL_SECONDS,
            "max_entries": _MAX_ENTRIES,
        }