""" Redis Cache Manager using Upstash Handles session caching and user data caching """ import os import json from typing import Optional, Dict, Any from dotenv import load_dotenv from upstash_redis import Redis # Load environment variables load_dotenv() class CacheManager: def __init__(self): """Initialize Upstash Redis client""" redis_url = os.getenv("UPSTASH_REDIS_REST_URL") redis_token = os.getenv("UPSTASH_REDIS_REST_TOKEN") if redis_url and redis_token: self.redis = Redis(url=redis_url, token=redis_token) self.enabled = True print("✅ Redis cache enabled (Upstash)") else: self.redis = None self.enabled = False print("⚠️ Redis cache disabled (no credentials)") def get(self, key: str) -> Optional[Any]: """Get value from cache""" if not self.enabled: return None try: data = self.redis.get(key) if data: return json.loads(data) if isinstance(data, str) else data return None except Exception as e: print(f"Cache get error: {e}") return None def set(self, key: str, value: Any, ttl: int = 3600): """Set value in cache with TTL (default 1 hour)""" if not self.enabled: return False try: serialized = json.dumps(value) if not isinstance(value, str) else value self.redis.setex(key, ttl, serialized) return True except Exception as e: print(f"Cache set error: {e}") return False def delete(self, key: str): """Delete key from cache""" if not self.enabled: return False try: self.redis.delete(key) return True except Exception as e: print(f"Cache delete error: {e}") return False def get_or_fetch(self, key: str, fetch_fn, ttl: int = 3600): """Get from cache or fetch and cache""" # Try cache first cached = self.get(key) if cached is not None: print(f"✅ Cache HIT: {key}") return cached # Cache miss - fetch print(f"❌ Cache MISS: {key}") data = fetch_fn() # Cache for next time if data is not None: self.set(key, data, ttl) return data # Global cache instance cache = CacheManager()