rag-api-node-1 / src /infrastructure /adapters /redis_adapter.py
Peterase's picture
feat: hybrid RAG pipeline upgrade
daf250b
"""
Redis Cache Adapter β€” Smart Layered Caching
Cache layers with different TTLs:
Layer 1 β€” Intent cache : 1 hour (same query = same intent)
Layer 2 β€” Live search cache : 10 min (DuckDuckGo/NewsAPI results)
Layer 3 β€” Translation cache : 1 hour (LLM translation is expensive)
Layer 4 β€” Full response cache: 5 min (complete RAG answer)
Key naming convention:
intent_v2:{query_hash} β†’ IntentResult dict
live_search:{query_hash} β†’ list of live results
translation:{query_hash} β†’ translation + expanded query dict
rag_response:{query_hash} β†’ full RAG response dict
All keys use SHA-256 of the normalized query (lowercase, stripped).
"""
import json
import logging
import hashlib
import time
from typing import Optional, Dict, Any, List
import redis
from src.core.ports.cache_port import CachePort
from src.core.config import settings
logger = logging.getLogger(__name__)
# ── TTL constants (seconds) ───────────────────────────────────────────────────
TTL_INTENT = 3600 # 1 hour β€” intent rarely changes for same query
TTL_LIVE_SEARCH = 600 # 10 min β€” live news stays fresh enough
TTL_TRANSLATION = 3600 # 1 hour β€” translations don't change
TTL_RESPONSE = 300 # 5 min β€” full RAG response (temporal queries need freshness)
TTL_RESPONSE_HISTORICAL = 1800 # 30 min β€” historical answers change less often
class RedisAdapter(CachePort):
"""
Redis cache adapter with smart layered caching.
Falls back gracefully when Redis is unavailable β€” all methods
return None/False instead of raising exceptions.
"""
def __init__(self):
self.client = None
self._connect()
def _connect(self):
try:
if hasattr(settings, "REDIS_URL") and settings.REDIS_URL:
url = settings.REDIS_URL
# Upstash requires TLS
if url.startswith("redis://") and "upstash.io" in url:
url = "rediss://" + url[len("redis://"):]
self.client = redis.from_url(url, decode_responses=True)
else:
pool = redis.ConnectionPool(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DB,
password=settings.REDIS_PASSWORD or None,
decode_responses=True,
)
self.client = redis.Redis(connection_pool=pool)
self.client.ping()
logger.info("βœ… Connected to Redis cache.")
except Exception as e:
logger.warning(f"Redis unavailable: {e}. All cache operations will be no-ops.")
self.client = None
# ── CachePort interface ───────────────────────────────────────────────────
def get(self, key: str) -> Optional[Any]:
if not self.client:
return None
try:
data = self.client.get(key)
return json.loads(data) if data else None
except Exception as e:
logger.debug(f"Redis get error for key '{key}': {e}")
return None
def set(self, key: str, value: Any, expiration: int = 3600) -> bool:
if not self.client:
return False
try:
self.client.setex(key, expiration, json.dumps(value, default=str))
return True
except Exception as e:
logger.debug(f"Redis set error for key '{key}': {e}")
return False
def delete(self, key: str) -> bool:
if not self.client:
return False
try:
self.client.delete(key)
return True
except Exception as e:
logger.debug(f"Redis delete error for key '{key}': {e}")
return False
def search_similar(self, query_vector: list, threshold: float = 0.95) -> Optional[Dict[str, Any]]:
"""Vector similarity search β€” not implemented (requires RedisSearch module)."""
return None
# ── Key generation ────────────────────────────────────────────────────────
def generate_exact_hash(self, text: str) -> str:
"""SHA-256 hash of normalized text for exact-match cache keys."""
normalized = text.lower().strip()
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
def _make_key(self, prefix: str, query: str) -> str:
"""Build a namespaced cache key from query text."""
return f"{prefix}:{self.generate_exact_hash(query)}"
# ── Layer 1: Intent cache ─────────────────────────────────────────────────
def get_intent(self, query: str) -> Optional[Dict[str, Any]]:
"""
Retrieve cached intent result for a query.
Returns dict with keys: intent, confidence, method
"""
key = self._make_key("intent_v2", query)
result = self.get(key)
if result:
logger.debug(f"[Cache] Intent HIT for '{query[:50]}'")
return result
def set_intent(self, query: str, intent_data: Dict[str, Any]) -> bool:
"""Cache intent result for 1 hour."""
key = self._make_key("intent_v2", query)
success = self.set(key, intent_data, expiration=TTL_INTENT)
if success:
logger.debug(f"[Cache] Intent SET for '{query[:50]}' (TTL={TTL_INTENT}s)")
return success
# ── Layer 2: Live search cache ────────────────────────────────────────────
def get_live_search(self, query: str) -> Optional[List[Dict[str, Any]]]:
"""
Retrieve cached live search results (DuckDuckGo + NewsAPI).
Returns list of result dicts or None if not cached.
"""
key = self._make_key("live_search", query)
result = self.get(key)
if result:
age = result.get("_cached_at", 0)
elapsed = int(time.time()) - age if age else 0
logger.info(f"[Cache] Live search HIT for '{query[:50]}' (age={elapsed}s)")
return result.get("results", [])
return None
def set_live_search(self, query: str, results: List[Dict[str, Any]]) -> bool:
"""Cache live search results for 10 minutes."""
key = self._make_key("live_search", query)
payload = {
"results": results,
"_cached_at": int(time.time()),
"_query": query[:100],
}
success = self.set(key, payload, expiration=TTL_LIVE_SEARCH)
if success:
logger.info(f"[Cache] Live search SET for '{query[:50]}' ({len(results)} results, TTL={TTL_LIVE_SEARCH}s)")
return success
# ── Layer 3: Translation cache ────────────────────────────────────────────
def get_translation(self, query: str) -> Optional[Dict[str, Any]]:
"""
Retrieve cached translation + query expansion result.
Returns dict with keys: expanded_query, translations, days_back, etc.
"""
key = self._make_key("translation", query)
result = self.get(key)
if result:
logger.debug(f"[Cache] Translation HIT for '{query[:50]}'")
return result
def set_translation(self, query: str, translation_data: Dict[str, Any]) -> bool:
"""Cache translation result for 1 hour."""
key = self._make_key("translation", query)
success = self.set(key, translation_data, expiration=TTL_TRANSLATION)
if success:
logger.debug(f"[Cache] Translation SET for '{query[:50]}' (TTL={TTL_TRANSLATION}s)")
return success
# ── Layer 4: Full response cache ──────────────────────────────────────────
def get_response(self, query: str) -> Optional[Dict[str, Any]]:
"""
Retrieve cached full RAG response.
Returns complete response dict or None if not cached.
"""
key = self._make_key("rag_response", query)
result = self.get(key)
if result:
age = result.get("_cached_at", 0)
elapsed = int(time.time()) - age if age else 0
logger.info(f"[Cache] Response HIT for '{query[:50]}' (age={elapsed}s)")
return result
def set_response(
self,
query: str,
response: Dict[str, Any],
intent: str = "NEWS_GENERAL"
) -> bool:
"""
Cache full RAG response.
TTL depends on intent:
- NEWS_TEMPORAL β†’ 5 min (fresh news changes fast)
- NEWS_HISTORICAL β†’ 30 min (historical facts are stable)
- NEWS_GENERAL β†’ 5 min (default)
"""
key = self._make_key("rag_response", query)
ttl = TTL_RESPONSE_HISTORICAL if intent == "NEWS_HISTORICAL" else TTL_RESPONSE
payload = {
**response,
"_cached_at": int(time.time()),
"_intent": intent,
}
success = self.set(key, payload, expiration=ttl)
if success:
logger.info(f"[Cache] Response SET for '{query[:50]}' (intent={intent}, TTL={ttl}s)")
return success
# ── Cache stats ───────────────────────────────────────────────────────────
def get_stats(self) -> Dict[str, Any]:
"""Return cache statistics."""
if not self.client:
return {"status": "disconnected"}
try:
info = self.client.info("stats")
keyspace = self.client.info("keyspace")
return {
"status": "connected",
"hits": info.get("keyspace_hits", 0),
"misses": info.get("keyspace_misses", 0),
"hit_rate": round(
info.get("keyspace_hits", 0) /
max(1, info.get("keyspace_hits", 0) + info.get("keyspace_misses", 0))
* 100, 1
),
"total_keys": sum(
v.get("keys", 0) for v in keyspace.values()
if isinstance(v, dict)
),
"memory_used": self.client.info("memory").get("used_memory_human", "?"),
}
except Exception as e:
return {"status": "error", "error": str(e)}
def is_available(self) -> bool:
"""Check if Redis is connected."""
if not self.client:
return False
try:
self.client.ping()
return True
except Exception:
return False