File size: 5,363 Bytes
f373e2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""
Cache manager for RAG system optimization.

Based on rag-service skill pattern for improved performance.
Caches embeddings and question-answer pairs with TTL.
"""
import hashlib
import time
from typing import Any, Optional, Dict
from functools import lru_cache
from pathlib import Path
import json


class CacheManager:
    """
    Multi-level caching for RAG system components.
    
    Features:
    - Embedding cache (LRU in-memory)
    - Q&A response cache (with TTL)
    - Query cache for deduplication
    """
    
    def __init__(
        self,
        cache_dir: str = "data/cache",
        ttl_seconds: int = 3600,  # 1 hour default
        max_memory_items: int = 1000
    ):
        """
        Initialize cache manager.
        
        Args:
            cache_dir: Directory for persistent cache
            ttl_seconds: Time-to-live for cached items
            max_memory_items: Max items in memory cache
        """
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        self.ttl_seconds = ttl_seconds
        self.max_memory_items = max_memory_items
        
        # In-memory caches
        self._query_cache: Dict[str, Dict] = {}
        self._embedding_cache: Dict[str, Any] = {}
    
    def _hash_key(self, key: str) -> str:
        """Generate hash for cache key."""
        return hashlib.md5(key.encode()).hexdigest()
    
    def get_cached_response(self, query: str) -> Optional[Dict]:
        """
        Get cached Q&A response for query.
        
        Args:
            query: User query
            
        Returns:
            Cached response dict or None
        """
        key = self._hash_key(query.lower().strip())
        
        # Check memory cache first
        if key in self._query_cache:
            entry = self._query_cache[key]
            if time.time() - entry['timestamp'] < self.ttl_seconds:
                return entry['response']
            else:
                # Expired
                del self._query_cache[key]
        
        # Check disk cache
        cache_file = self.cache_dir / f"qa_{key}.json"
        if cache_file.exists():
            try:
                with open(cache_file, 'r') as f:
                    entry = json.load(f)
                if time.time() - entry['timestamp'] < self.ttl_seconds:
                    # Store in memory for fast access
                    self._query_cache[key] = entry
                    return entry['response']
                else:
                    # Expired
                    cache_file.unlink()
            except Exception:
                pass
        
        return None
    
    def cache_response(self, query: str, response: Dict) -> None:
        """
        Cache Q&A response.
        
        Args:
            query: User query
            response: Response dict to cache
        """
        key = self._hash_key(query.lower().strip())
        entry = {
            'query': query,
            'response': response,
            'timestamp': time.time()
        }
        
        # Store in memory
        if len(self._query_cache) >= self.max_memory_items:
            # Remove oldest entry
            oldest_key = min(
                self._query_cache.keys(),
                key=lambda k: self._query_cache[k]['timestamp']
            )
            del self._query_cache[oldest_key]
        
        self._query_cache[key] = entry
        
        # Store on disk
        cache_file = self.cache_dir / f"qa_{key}.json"
        try:
            with open(cache_file, 'w') as f:
                json.dump(entry, f)
        except Exception:
            pass  # Fail silently for cache writes
    
    def get_cached_embedding(self, text: str) -> Optional[Any]:
        """
        Get cached embedding for text.
        
        Args:
            text: Text to get embedding for
            
        Returns:
            Cached embedding or None
        """
        key = self._hash_key(text)
        return self._embedding_cache.get(key)
    
    def cache_embedding(self, text: str, embedding: Any) -> None:
        """
        Cache text embedding.
        
        Args:
            text: Original text
            embedding: Generated embedding
        """
        key = self._hash_key(text)
        
        if len(self._embedding_cache) >= self.max_memory_items:
            # Clear half the cache (simple LRU approximation)
            keys_to_remove = list(self._embedding_cache.keys())[:self.max_memory_items // 2]
            for k in keys_to_remove:
                del self._embedding_cache[k]
        
        self._embedding_cache[key] = embedding
    
    def invalidate_cache(self) -> None:
        """Clear all caches (call when knowledge base updates)."""
        self._query_cache.clear()
        self._embedding_cache.clear()
        
        # Clear disk cache
        for cache_file in self.cache_dir.glob("qa_*.json"):
            try:
                cache_file.unlink()
            except Exception:
                pass
    
    def get_cache_stats(self) -> Dict:
        """Get cache statistics."""
        return {
            'query_cache_size': len(self._query_cache),
            'embedding_cache_size': len(self._embedding_cache),
            'cache_dir': str(self.cache_dir),
            'ttl_seconds': self.ttl_seconds
        }