""" Redis Cache Adapter — Smart Layered Caching Cache layers with different TTLs: Layer 1 — Intent cache : 1 hour (same query = same intent) Layer 2 — Live search cache : 10 min (DuckDuckGo/NewsAPI results) Layer 3 — Translation cache : 1 hour (LLM translation is expensive) Layer 4 — Full response cache: 5 min (complete RAG answer) Key naming convention: intent_v2:{query_hash} → IntentResult dict live_search:{query_hash} → list of live results translation:{query_hash} → translation + expanded query dict rag_response:{query_hash} → full RAG response dict All keys use SHA-256 of the normalized query (lowercase, stripped). """ import json import logging import hashlib import time from typing import Optional, Dict, Any, List import redis from src.core.ports.cache_port import CachePort from src.core.config import settings logger = logging.getLogger(__name__) # ── TTL constants (seconds) ─────────────────────────────────────────────────── TTL_INTENT = 3600 # 1 hour — intent rarely changes for same query TTL_LIVE_SEARCH = 600 # 10 min — live news stays fresh enough TTL_TRANSLATION = 3600 # 1 hour — translations don't change TTL_RESPONSE = 300 # 5 min — full RAG response (temporal queries need freshness) TTL_RESPONSE_HISTORICAL = 1800 # 30 min — historical answers change less often class RedisAdapter(CachePort): """ Redis cache adapter with smart layered caching. Falls back gracefully when Redis is unavailable — all methods return None/False instead of raising exceptions. """ def __init__(self): self.client = None self._connect() def _connect(self): try: if hasattr(settings, "REDIS_URL") and settings.REDIS_URL: url = settings.REDIS_URL # Upstash requires TLS if url.startswith("redis://") and "upstash.io" in url: url = "rediss://" + url[len("redis://"):] self.client = redis.from_url(url, decode_responses=True) else: pool = redis.ConnectionPool( host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB, password=settings.REDIS_PASSWORD or None, decode_responses=True, ) self.client = redis.Redis(connection_pool=pool) self.client.ping() logger.info("✅ Connected to Redis cache.") except Exception as e: logger.warning(f"Redis unavailable: {e}. All cache operations will be no-ops.") self.client = None # ── CachePort interface ─────────────────────────────────────────────────── def get(self, key: str) -> Optional[Any]: if not self.client: return None try: data = self.client.get(key) return json.loads(data) if data else None except Exception as e: logger.debug(f"Redis get error for key '{key}': {e}") return None def set(self, key: str, value: Any, expiration: int = 3600) -> bool: if not self.client: return False try: self.client.setex(key, expiration, json.dumps(value, default=str)) return True except Exception as e: logger.debug(f"Redis set error for key '{key}': {e}") return False def delete(self, key: str) -> bool: if not self.client: return False try: self.client.delete(key) return True except Exception as e: logger.debug(f"Redis delete error for key '{key}': {e}") return False def search_similar(self, query_vector: list, threshold: float = 0.95) -> Optional[Dict[str, Any]]: """Vector similarity search — not implemented (requires RedisSearch module).""" return None # ── Key generation ──────────────────────────────────────────────────────── def generate_exact_hash(self, text: str) -> str: """SHA-256 hash of normalized text for exact-match cache keys.""" normalized = text.lower().strip() return hashlib.sha256(normalized.encode("utf-8")).hexdigest() def _make_key(self, prefix: str, query: str) -> str: """Build a namespaced cache key from query text.""" return f"{prefix}:{self.generate_exact_hash(query)}" # ── Layer 1: Intent cache ───────────────────────────────────────────────── def get_intent(self, query: str) -> Optional[Dict[str, Any]]: """ Retrieve cached intent result for a query. Returns dict with keys: intent, confidence, method """ key = self._make_key("intent_v2", query) result = self.get(key) if result: logger.debug(f"[Cache] Intent HIT for '{query[:50]}'") return result def set_intent(self, query: str, intent_data: Dict[str, Any]) -> bool: """Cache intent result for 1 hour.""" key = self._make_key("intent_v2", query) success = self.set(key, intent_data, expiration=TTL_INTENT) if success: logger.debug(f"[Cache] Intent SET for '{query[:50]}' (TTL={TTL_INTENT}s)") return success # ── Layer 2: Live search cache ──────────────────────────────────────────── def get_live_search(self, query: str) -> Optional[List[Dict[str, Any]]]: """ Retrieve cached live search results (DuckDuckGo + NewsAPI). Returns list of result dicts or None if not cached. """ key = self._make_key("live_search", query) result = self.get(key) if result: age = result.get("_cached_at", 0) elapsed = int(time.time()) - age if age else 0 logger.info(f"[Cache] Live search HIT for '{query[:50]}' (age={elapsed}s)") return result.get("results", []) return None def set_live_search(self, query: str, results: List[Dict[str, Any]]) -> bool: """Cache live search results for 10 minutes.""" key = self._make_key("live_search", query) payload = { "results": results, "_cached_at": int(time.time()), "_query": query[:100], } success = self.set(key, payload, expiration=TTL_LIVE_SEARCH) if success: logger.info(f"[Cache] Live search SET for '{query[:50]}' ({len(results)} results, TTL={TTL_LIVE_SEARCH}s)") return success # ── Layer 3: Translation cache ──────────────────────────────────────────── def get_translation(self, query: str) -> Optional[Dict[str, Any]]: """ Retrieve cached translation + query expansion result. Returns dict with keys: expanded_query, translations, days_back, etc. """ key = self._make_key("translation", query) result = self.get(key) if result: logger.debug(f"[Cache] Translation HIT for '{query[:50]}'") return result def set_translation(self, query: str, translation_data: Dict[str, Any]) -> bool: """Cache translation result for 1 hour.""" key = self._make_key("translation", query) success = self.set(key, translation_data, expiration=TTL_TRANSLATION) if success: logger.debug(f"[Cache] Translation SET for '{query[:50]}' (TTL={TTL_TRANSLATION}s)") return success # ── Layer 4: Full response cache ────────────────────────────────────────── def get_response(self, query: str) -> Optional[Dict[str, Any]]: """ Retrieve cached full RAG response. Returns complete response dict or None if not cached. """ key = self._make_key("rag_response", query) result = self.get(key) if result: age = result.get("_cached_at", 0) elapsed = int(time.time()) - age if age else 0 logger.info(f"[Cache] Response HIT for '{query[:50]}' (age={elapsed}s)") return result def set_response( self, query: str, response: Dict[str, Any], intent: str = "NEWS_GENERAL" ) -> bool: """ Cache full RAG response. TTL depends on intent: - NEWS_TEMPORAL → 5 min (fresh news changes fast) - NEWS_HISTORICAL → 30 min (historical facts are stable) - NEWS_GENERAL → 5 min (default) """ key = self._make_key("rag_response", query) ttl = TTL_RESPONSE_HISTORICAL if intent == "NEWS_HISTORICAL" else TTL_RESPONSE payload = { **response, "_cached_at": int(time.time()), "_intent": intent, } success = self.set(key, payload, expiration=ttl) if success: logger.info(f"[Cache] Response SET for '{query[:50]}' (intent={intent}, TTL={ttl}s)") return success # ── Cache stats ─────────────────────────────────────────────────────────── def get_stats(self) -> Dict[str, Any]: """Return cache statistics.""" if not self.client: return {"status": "disconnected"} try: info = self.client.info("stats") keyspace = self.client.info("keyspace") return { "status": "connected", "hits": info.get("keyspace_hits", 0), "misses": info.get("keyspace_misses", 0), "hit_rate": round( info.get("keyspace_hits", 0) / max(1, info.get("keyspace_hits", 0) + info.get("keyspace_misses", 0)) * 100, 1 ), "total_keys": sum( v.get("keys", 0) for v in keyspace.values() if isinstance(v, dict) ), "memory_used": self.client.info("memory").get("used_memory_human", "?"), } except Exception as e: return {"status": "error", "error": str(e)} def is_available(self) -> bool: """Check if Redis is connected.""" if not self.client: return False try: self.client.ping() return True except Exception: return False