Spaces:
Running
Running
| """ | |
| Redis Cache Adapter β Smart Layered Caching | |
| Cache layers with different TTLs: | |
| Layer 1 β Intent cache : 1 hour (same query = same intent) | |
| Layer 2 β Live search cache : 10 min (DuckDuckGo/NewsAPI results) | |
| Layer 3 β Translation cache : 1 hour (LLM translation is expensive) | |
| Layer 4 β Full response cache: 5 min (complete RAG answer) | |
| Key naming convention: | |
| intent_v2:{query_hash} β IntentResult dict | |
| live_search:{query_hash} β list of live results | |
| translation:{query_hash} β translation + expanded query dict | |
| rag_response:{query_hash} β full RAG response dict | |
| All keys use SHA-256 of the normalized query (lowercase, stripped). | |
| """ | |
| import json | |
| import logging | |
| import hashlib | |
| import time | |
| from typing import Optional, Dict, Any, List | |
| import redis | |
| from src.core.ports.cache_port import CachePort | |
| from src.core.config import settings | |
| logger = logging.getLogger(__name__) | |
| # ββ TTL constants (seconds) βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| TTL_INTENT = 3600 # 1 hour β intent rarely changes for same query | |
| TTL_LIVE_SEARCH = 600 # 10 min β live news stays fresh enough | |
| TTL_TRANSLATION = 3600 # 1 hour β translations don't change | |
| TTL_RESPONSE = 300 # 5 min β full RAG response (temporal queries need freshness) | |
| TTL_RESPONSE_HISTORICAL = 1800 # 30 min β historical answers change less often | |
| class RedisAdapter(CachePort): | |
| """ | |
| Redis cache adapter with smart layered caching. | |
| Falls back gracefully when Redis is unavailable β all methods | |
| return None/False instead of raising exceptions. | |
| """ | |
| def __init__(self): | |
| self.client = None | |
| self._connect() | |
| def _connect(self): | |
| try: | |
| if hasattr(settings, "REDIS_URL") and settings.REDIS_URL: | |
| url = settings.REDIS_URL | |
| # Upstash requires TLS | |
| if url.startswith("redis://") and "upstash.io" in url: | |
| url = "rediss://" + url[len("redis://"):] | |
| self.client = redis.from_url(url, decode_responses=True) | |
| else: | |
| pool = redis.ConnectionPool( | |
| host=settings.REDIS_HOST, | |
| port=settings.REDIS_PORT, | |
| db=settings.REDIS_DB, | |
| password=settings.REDIS_PASSWORD or None, | |
| decode_responses=True, | |
| ) | |
| self.client = redis.Redis(connection_pool=pool) | |
| self.client.ping() | |
| logger.info("β Connected to Redis cache.") | |
| except Exception as e: | |
| logger.warning(f"Redis unavailable: {e}. All cache operations will be no-ops.") | |
| self.client = None | |
| # ββ CachePort interface βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get(self, key: str) -> Optional[Any]: | |
| if not self.client: | |
| return None | |
| try: | |
| data = self.client.get(key) | |
| return json.loads(data) if data else None | |
| except Exception as e: | |
| logger.debug(f"Redis get error for key '{key}': {e}") | |
| return None | |
| def set(self, key: str, value: Any, expiration: int = 3600) -> bool: | |
| if not self.client: | |
| return False | |
| try: | |
| self.client.setex(key, expiration, json.dumps(value, default=str)) | |
| return True | |
| except Exception as e: | |
| logger.debug(f"Redis set error for key '{key}': {e}") | |
| return False | |
| def delete(self, key: str) -> bool: | |
| if not self.client: | |
| return False | |
| try: | |
| self.client.delete(key) | |
| return True | |
| except Exception as e: | |
| logger.debug(f"Redis delete error for key '{key}': {e}") | |
| return False | |
| def search_similar(self, query_vector: list, threshold: float = 0.95) -> Optional[Dict[str, Any]]: | |
| """Vector similarity search β not implemented (requires RedisSearch module).""" | |
| return None | |
| # ββ Key generation ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def generate_exact_hash(self, text: str) -> str: | |
| """SHA-256 hash of normalized text for exact-match cache keys.""" | |
| normalized = text.lower().strip() | |
| return hashlib.sha256(normalized.encode("utf-8")).hexdigest() | |
| def _make_key(self, prefix: str, query: str) -> str: | |
| """Build a namespaced cache key from query text.""" | |
| return f"{prefix}:{self.generate_exact_hash(query)}" | |
| # ββ Layer 1: Intent cache βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_intent(self, query: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Retrieve cached intent result for a query. | |
| Returns dict with keys: intent, confidence, method | |
| """ | |
| key = self._make_key("intent_v2", query) | |
| result = self.get(key) | |
| if result: | |
| logger.debug(f"[Cache] Intent HIT for '{query[:50]}'") | |
| return result | |
| def set_intent(self, query: str, intent_data: Dict[str, Any]) -> bool: | |
| """Cache intent result for 1 hour.""" | |
| key = self._make_key("intent_v2", query) | |
| success = self.set(key, intent_data, expiration=TTL_INTENT) | |
| if success: | |
| logger.debug(f"[Cache] Intent SET for '{query[:50]}' (TTL={TTL_INTENT}s)") | |
| return success | |
| # ββ Layer 2: Live search cache ββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_live_search(self, query: str) -> Optional[List[Dict[str, Any]]]: | |
| """ | |
| Retrieve cached live search results (DuckDuckGo + NewsAPI). | |
| Returns list of result dicts or None if not cached. | |
| """ | |
| key = self._make_key("live_search", query) | |
| result = self.get(key) | |
| if result: | |
| age = result.get("_cached_at", 0) | |
| elapsed = int(time.time()) - age if age else 0 | |
| logger.info(f"[Cache] Live search HIT for '{query[:50]}' (age={elapsed}s)") | |
| return result.get("results", []) | |
| return None | |
| def set_live_search(self, query: str, results: List[Dict[str, Any]]) -> bool: | |
| """Cache live search results for 10 minutes.""" | |
| key = self._make_key("live_search", query) | |
| payload = { | |
| "results": results, | |
| "_cached_at": int(time.time()), | |
| "_query": query[:100], | |
| } | |
| success = self.set(key, payload, expiration=TTL_LIVE_SEARCH) | |
| if success: | |
| logger.info(f"[Cache] Live search SET for '{query[:50]}' ({len(results)} results, TTL={TTL_LIVE_SEARCH}s)") | |
| return success | |
| # ββ Layer 3: Translation cache ββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_translation(self, query: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Retrieve cached translation + query expansion result. | |
| Returns dict with keys: expanded_query, translations, days_back, etc. | |
| """ | |
| key = self._make_key("translation", query) | |
| result = self.get(key) | |
| if result: | |
| logger.debug(f"[Cache] Translation HIT for '{query[:50]}'") | |
| return result | |
| def set_translation(self, query: str, translation_data: Dict[str, Any]) -> bool: | |
| """Cache translation result for 1 hour.""" | |
| key = self._make_key("translation", query) | |
| success = self.set(key, translation_data, expiration=TTL_TRANSLATION) | |
| if success: | |
| logger.debug(f"[Cache] Translation SET for '{query[:50]}' (TTL={TTL_TRANSLATION}s)") | |
| return success | |
| # ββ Layer 4: Full response cache ββββββββββββββββββββββββββββββββββββββββββ | |
| def get_response(self, query: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Retrieve cached full RAG response. | |
| Returns complete response dict or None if not cached. | |
| """ | |
| key = self._make_key("rag_response", query) | |
| result = self.get(key) | |
| if result: | |
| age = result.get("_cached_at", 0) | |
| elapsed = int(time.time()) - age if age else 0 | |
| logger.info(f"[Cache] Response HIT for '{query[:50]}' (age={elapsed}s)") | |
| return result | |
| def set_response( | |
| self, | |
| query: str, | |
| response: Dict[str, Any], | |
| intent: str = "NEWS_GENERAL" | |
| ) -> bool: | |
| """ | |
| Cache full RAG response. | |
| TTL depends on intent: | |
| - NEWS_TEMPORAL β 5 min (fresh news changes fast) | |
| - NEWS_HISTORICAL β 30 min (historical facts are stable) | |
| - NEWS_GENERAL β 5 min (default) | |
| """ | |
| key = self._make_key("rag_response", query) | |
| ttl = TTL_RESPONSE_HISTORICAL if intent == "NEWS_HISTORICAL" else TTL_RESPONSE | |
| payload = { | |
| **response, | |
| "_cached_at": int(time.time()), | |
| "_intent": intent, | |
| } | |
| success = self.set(key, payload, expiration=ttl) | |
| if success: | |
| logger.info(f"[Cache] Response SET for '{query[:50]}' (intent={intent}, TTL={ttl}s)") | |
| return success | |
| # ββ Cache stats βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_stats(self) -> Dict[str, Any]: | |
| """Return cache statistics.""" | |
| if not self.client: | |
| return {"status": "disconnected"} | |
| try: | |
| info = self.client.info("stats") | |
| keyspace = self.client.info("keyspace") | |
| return { | |
| "status": "connected", | |
| "hits": info.get("keyspace_hits", 0), | |
| "misses": info.get("keyspace_misses", 0), | |
| "hit_rate": round( | |
| info.get("keyspace_hits", 0) / | |
| max(1, info.get("keyspace_hits", 0) + info.get("keyspace_misses", 0)) | |
| * 100, 1 | |
| ), | |
| "total_keys": sum( | |
| v.get("keys", 0) for v in keyspace.values() | |
| if isinstance(v, dict) | |
| ), | |
| "memory_used": self.client.info("memory").get("used_memory_human", "?"), | |
| } | |
| except Exception as e: | |
| return {"status": "error", "error": str(e)} | |
| def is_available(self) -> bool: | |
| """Check if Redis is connected.""" | |
| if not self.client: | |
| return False | |
| try: | |
| self.client.ping() | |
| return True | |
| except Exception: | |
| return False | |