File size: 11,156 Bytes
daf250b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a63c61f
 
 
daf250b
 
 
 
a63c61f
 
 
 
 
 
daf250b
 
 
 
 
 
 
 
a63c61f
daf250b
 
 
 
 
 
 
a63c61f
daf250b
 
 
 
a63c61f
daf250b
a63c61f
daf250b
a63c61f
 
 
 
 
 
 
 
 
daf250b
a63c61f
 
 
daf250b
a63c61f
daf250b
a63c61f
 
daf250b
a63c61f
 
daf250b
 
a63c61f
 
 
 
daf250b
a63c61f
 
 
daf250b
 
 
 
 
 
 
 
 
 
 
 
a63c61f
daf250b
a63c61f
 
daf250b
a63c61f
 
 
daf250b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a63c61f
daf250b
 
a63c61f
daf250b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
"""
Redis Cache Adapter β€” Smart Layered Caching

Cache layers with different TTLs:
  Layer 1 β€” Intent cache       : 1 hour   (same query = same intent)
  Layer 2 β€” Live search cache  : 10 min   (DuckDuckGo/NewsAPI results)
  Layer 3 β€” Translation cache  : 1 hour   (LLM translation is expensive)
  Layer 4 β€” Full response cache: 5 min    (complete RAG answer)

Key naming convention:
  intent_v2:{query_hash}          β†’ IntentResult dict
  live_search:{query_hash}        β†’ list of live results
  translation:{query_hash}        β†’ translation + expanded query dict
  rag_response:{query_hash}       β†’ full RAG response dict

All keys use SHA-256 of the normalized query (lowercase, stripped).
"""

import json
import logging
import hashlib
import time
from typing import Optional, Dict, Any, List

import redis

from src.core.ports.cache_port import CachePort
from src.core.config import settings

logger = logging.getLogger(__name__)

# ── TTL constants (seconds) ───────────────────────────────────────────────────
TTL_INTENT      = 3600   # 1 hour  β€” intent rarely changes for same query
TTL_LIVE_SEARCH = 600    # 10 min  β€” live news stays fresh enough
TTL_TRANSLATION = 3600   # 1 hour  β€” translations don't change
TTL_RESPONSE    = 300    # 5 min   β€” full RAG response (temporal queries need freshness)
TTL_RESPONSE_HISTORICAL = 1800  # 30 min β€” historical answers change less often


class RedisAdapter(CachePort):
    """
    Redis cache adapter with smart layered caching.

    Falls back gracefully when Redis is unavailable β€” all methods
    return None/False instead of raising exceptions.
    """

    def __init__(self):
        self.client = None
        self._connect()

    def _connect(self):
        try:
            if hasattr(settings, "REDIS_URL") and settings.REDIS_URL:
                url = settings.REDIS_URL
                # Upstash requires TLS
                if url.startswith("redis://") and "upstash.io" in url:
                    url = "rediss://" + url[len("redis://"):]
                self.client = redis.from_url(url, decode_responses=True)
            else:
                pool = redis.ConnectionPool(
                    host=settings.REDIS_HOST,
                    port=settings.REDIS_PORT,
                    db=settings.REDIS_DB,
                    password=settings.REDIS_PASSWORD or None,
                    decode_responses=True,
                )
                self.client = redis.Redis(connection_pool=pool)
            self.client.ping()
            logger.info("βœ… Connected to Redis cache.")
        except Exception as e:
            logger.warning(f"Redis unavailable: {e}. All cache operations will be no-ops.")
            self.client = None

    # ── CachePort interface ───────────────────────────────────────────────────

    def get(self, key: str) -> Optional[Any]:
        if not self.client:
            return None
        try:
            data = self.client.get(key)
            return json.loads(data) if data else None
        except Exception as e:
            logger.debug(f"Redis get error for key '{key}': {e}")
            return None

    def set(self, key: str, value: Any, expiration: int = 3600) -> bool:
        if not self.client:
            return False
        try:
            self.client.setex(key, expiration, json.dumps(value, default=str))
            return True
        except Exception as e:
            logger.debug(f"Redis set error for key '{key}': {e}")
            return False

    def delete(self, key: str) -> bool:
        if not self.client:
            return False
        try:
            self.client.delete(key)
            return True
        except Exception as e:
            logger.debug(f"Redis delete error for key '{key}': {e}")
            return False

    def search_similar(self, query_vector: list, threshold: float = 0.95) -> Optional[Dict[str, Any]]:
        """Vector similarity search β€” not implemented (requires RedisSearch module)."""
        return None

    # ── Key generation ────────────────────────────────────────────────────────

    def generate_exact_hash(self, text: str) -> str:
        """SHA-256 hash of normalized text for exact-match cache keys."""
        normalized = text.lower().strip()
        return hashlib.sha256(normalized.encode("utf-8")).hexdigest()

    def _make_key(self, prefix: str, query: str) -> str:
        """Build a namespaced cache key from query text."""
        return f"{prefix}:{self.generate_exact_hash(query)}"

    # ── Layer 1: Intent cache ─────────────────────────────────────────────────

    def get_intent(self, query: str) -> Optional[Dict[str, Any]]:
        """
        Retrieve cached intent result for a query.
        Returns dict with keys: intent, confidence, method
        """
        key = self._make_key("intent_v2", query)
        result = self.get(key)
        if result:
            logger.debug(f"[Cache] Intent HIT for '{query[:50]}'")
        return result

    def set_intent(self, query: str, intent_data: Dict[str, Any]) -> bool:
        """Cache intent result for 1 hour."""
        key = self._make_key("intent_v2", query)
        success = self.set(key, intent_data, expiration=TTL_INTENT)
        if success:
            logger.debug(f"[Cache] Intent SET for '{query[:50]}' (TTL={TTL_INTENT}s)")
        return success

    # ── Layer 2: Live search cache ────────────────────────────────────────────

    def get_live_search(self, query: str) -> Optional[List[Dict[str, Any]]]:
        """
        Retrieve cached live search results (DuckDuckGo + NewsAPI).
        Returns list of result dicts or None if not cached.
        """
        key = self._make_key("live_search", query)
        result = self.get(key)
        if result:
            age = result.get("_cached_at", 0)
            elapsed = int(time.time()) - age if age else 0
            logger.info(f"[Cache] Live search HIT for '{query[:50]}' (age={elapsed}s)")
            return result.get("results", [])
        return None

    def set_live_search(self, query: str, results: List[Dict[str, Any]]) -> bool:
        """Cache live search results for 10 minutes."""
        key = self._make_key("live_search", query)
        payload = {
            "results": results,
            "_cached_at": int(time.time()),
            "_query": query[:100],
        }
        success = self.set(key, payload, expiration=TTL_LIVE_SEARCH)
        if success:
            logger.info(f"[Cache] Live search SET for '{query[:50]}' ({len(results)} results, TTL={TTL_LIVE_SEARCH}s)")
        return success

    # ── Layer 3: Translation cache ────────────────────────────────────────────

    def get_translation(self, query: str) -> Optional[Dict[str, Any]]:
        """
        Retrieve cached translation + query expansion result.
        Returns dict with keys: expanded_query, translations, days_back, etc.
        """
        key = self._make_key("translation", query)
        result = self.get(key)
        if result:
            logger.debug(f"[Cache] Translation HIT for '{query[:50]}'")
        return result

    def set_translation(self, query: str, translation_data: Dict[str, Any]) -> bool:
        """Cache translation result for 1 hour."""
        key = self._make_key("translation", query)
        success = self.set(key, translation_data, expiration=TTL_TRANSLATION)
        if success:
            logger.debug(f"[Cache] Translation SET for '{query[:50]}' (TTL={TTL_TRANSLATION}s)")
        return success

    # ── Layer 4: Full response cache ──────────────────────────────────────────

    def get_response(self, query: str) -> Optional[Dict[str, Any]]:
        """
        Retrieve cached full RAG response.
        Returns complete response dict or None if not cached.
        """
        key = self._make_key("rag_response", query)
        result = self.get(key)
        if result:
            age = result.get("_cached_at", 0)
            elapsed = int(time.time()) - age if age else 0
            logger.info(f"[Cache] Response HIT for '{query[:50]}' (age={elapsed}s)")
        return result

    def set_response(
        self,
        query: str,
        response: Dict[str, Any],
        intent: str = "NEWS_GENERAL"
    ) -> bool:
        """
        Cache full RAG response.
        TTL depends on intent:
          - NEWS_TEMPORAL   β†’ 5 min  (fresh news changes fast)
          - NEWS_HISTORICAL β†’ 30 min (historical facts are stable)
          - NEWS_GENERAL    β†’ 5 min  (default)
        """
        key = self._make_key("rag_response", query)
        ttl = TTL_RESPONSE_HISTORICAL if intent == "NEWS_HISTORICAL" else TTL_RESPONSE
        payload = {
            **response,
            "_cached_at": int(time.time()),
            "_intent": intent,
        }
        success = self.set(key, payload, expiration=ttl)
        if success:
            logger.info(f"[Cache] Response SET for '{query[:50]}' (intent={intent}, TTL={ttl}s)")
        return success

    # ── Cache stats ───────────────────────────────────────────────────────────

    def get_stats(self) -> Dict[str, Any]:
        """Return cache statistics."""
        if not self.client:
            return {"status": "disconnected"}
        try:
            info = self.client.info("stats")
            keyspace = self.client.info("keyspace")
            return {
                "status": "connected",
                "hits": info.get("keyspace_hits", 0),
                "misses": info.get("keyspace_misses", 0),
                "hit_rate": round(
                    info.get("keyspace_hits", 0) /
                    max(1, info.get("keyspace_hits", 0) + info.get("keyspace_misses", 0))
                    * 100, 1
                ),
                "total_keys": sum(
                    v.get("keys", 0) for v in keyspace.values()
                    if isinstance(v, dict)
                ),
                "memory_used": self.client.info("memory").get("used_memory_human", "?"),
            }
        except Exception as e:
            return {"status": "error", "error": str(e)}

    def is_available(self) -> bool:
        """Check if Redis is connected."""
        if not self.client:
            return False
        try:
            self.client.ping()
            return True
        except Exception:
            return False