""" Redis Caching Layer - Performance optimization - Session caching - API response caching """ import redis import json import os from typing import Optional, Any from functools import wraps import hashlib # Redis connection REDIS_URL = os.environ.get('REDIS_URL', 'redis://localhost:6379/0') try: redis_client = redis.from_url(REDIS_URL, decode_responses=True) redis_client.ping() REDIS_AVAILABLE = True except Exception as e: print(f"⚠️ Redis not available: {e}. Using in-memory cache fallback.") REDIS_AVAILABLE = False redis_client = None # In-memory fallback cache _memory_cache = {} class CacheManager: """Unified cache manager with Redis fallback""" def __init__(self): self.use_redis = REDIS_AVAILABLE def get(self, key: str) -> Optional[Any]: """Get value from cache""" try: if self.use_redis: val = redis_client.get(key) return json.loads(val) if val else None else: return _memory_cache.get(key) except Exception as e: print(f"Cache get error: {e}") return None def set(self, key: str, value: Any, ttl: int = 3600): """Set value in cache with TTL""" try: serialized = json.dumps(value) if self.use_redis: redis_client.setex(key, ttl, serialized) else: _memory_cache[key] = value except Exception as e: print(f"Cache set error: {e}") def delete(self, key: str): """Delete value from cache""" try: if self.use_redis: redis_client.delete(key) else: _memory_cache.pop(key, None) except Exception as e: print(f"Cache delete error: {e}") def clear(self): """Clear all cache""" try: if self.use_redis: redis_client.flushdb() else: _memory_cache.clear() except Exception as e: print(f"Cache clear error: {e}") def get_many(self, keys: list) -> dict: """Get multiple values""" result = {} try: if self.use_redis: vals = redis_client.mget(keys) for k, v in zip(keys, vals): result[k] = json.loads(v) if v else None else: for k in keys: result[k] = _memory_cache.get(k) except Exception as e: print(f"Cache get_many error: {e}") return result def set_many(self, data: dict, ttl: int = 3600): """Set multiple values""" try: if self.use_redis: pipe = redis_client.pipeline() for k, v in data.items(): pipe.setex(k, ttl, json.dumps(v)) pipe.execute() else: _memory_cache.update(data) except Exception as e: print(f"Cache set_many error: {e}") def increment(self, key: str, amount: int = 1) -> int: """Increment counter""" try: if self.use_redis: return redis_client.incr(key, amount) else: _memory_cache[key] = _memory_cache.get(key, 0) + amount return _memory_cache[key] except Exception as e: print(f"Cache increment error: {e}") return 0 def decrement(self, key: str, amount: int = 1) -> int: """Decrement counter""" try: if self.use_redis: return redis_client.decr(key, amount) else: _memory_cache[key] = _memory_cache.get(key, 0) - amount return _memory_cache[key] except Exception as e: print(f"Cache decrement error: {e}") return 0 cache = CacheManager() def cache_key(*args, **kwargs) -> str: """Generate cache key from arguments""" key_str = json.dumps({'args': args, 'kwargs': kwargs}, sort_keys=True, default=str) return hashlib.md5(key_str.encode()).hexdigest() def cached(ttl: int = 3600, key_prefix: str = ''): """Decorator for caching function results""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): # Generate cache key func_key = f"{key_prefix or func.__name__}:{cache_key(*args, **kwargs)}" # Try to get from cache cached_val = cache.get(func_key) if cached_val is not None: return cached_val # Call function and cache result result = func(*args, **kwargs) cache.set(func_key, result, ttl) return result return wrapper return decorator def cache_invalidate(pattern: str = ''): """Invalidate cache by pattern""" try: if cache.use_redis: keys = redis_client.keys(pattern or '*') if keys: redis_client.delete(*keys) else: if pattern: to_delete = [k for k in _memory_cache.keys() if pattern in k] for k in to_delete: del _memory_cache[k] else: _memory_cache.clear() except Exception as e: print(f"Cache invalidate error: {e}") # Session management class SessionManager: """Session management with Redis""" SESSION_PREFIX = 'session:' SESSION_TTL = 86400 # 24 hours @staticmethod def create(user_id: int, data: dict) -> str: """Create session""" import secrets session_id = secrets.token_urlsafe(32) key = f"{SessionManager.SESSION_PREFIX}{session_id}" cache.set(key, {'user_id': user_id, **data}, SessionManager.SESSION_TTL) return session_id @staticmethod def get(session_id: str) -> Optional[dict]: """Get session""" key = f"{SessionManager.SESSION_PREFIX}{session_id}" return cache.get(key) @staticmethod def update(session_id: str, data: dict): """Update session""" key = f"{SessionManager.SESSION_PREFIX}{session_id}" session = cache.get(key) if session: session.update(data) cache.set(key, session, SessionManager.SESSION_TTL) @staticmethod def delete(session_id: str): """Delete session""" key = f"{SessionManager.SESSION_PREFIX}{session_id}" cache.delete(key) # Rate limiting with Redis class RateLimiter: """Rate limiter using Redis""" def __init__(self, max_requests: int = 100, window_seconds: int = 60): self.max_requests = max_requests self.window_seconds = window_seconds def is_allowed(self, identifier: str) -> bool: """Check if request is allowed""" key = f"rate_limit:{identifier}" try: count = cache.increment(key) if count == 1: # Set expiry on first request if cache.use_redis: redis_client.expire(key, self.window_seconds) return count <= self.max_requests except Exception as e: print(f"Rate limit error: {e}") return True def get_remaining(self, identifier: str) -> int: """Get remaining requests""" key = f"rate_limit:{identifier}" try: count = cache.get(key) or 0 return max(0, self.max_requests - count) except Exception as e: print(f"Rate limit get_remaining error: {e}") return self.max_requests def reset(self, identifier: str): """Reset rate limit""" key = f"rate_limit:{identifier}" cache.delete(key) # Query result caching def cache_query_result(query_key: str, ttl: int = 3600): """Cache database query results""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): cache_k = f"query:{query_key}:{cache_key(*args, **kwargs)}" cached_val = cache.get(cache_k) if cached_val is not None: return cached_val result = func(*args, **kwargs) cache.set(cache_k, result, ttl) return result return wrapper return decorator # Cache statistics def get_cache_stats() -> dict: """Get cache statistics""" try: if cache.use_redis: info = redis_client.info() return { 'backend': 'redis', 'used_memory': info.get('used_memory_human', 'N/A'), 'connected_clients': info.get('connected_clients', 0), 'total_commands': info.get('total_commands_processed', 0) } else: return { 'backend': 'memory', 'items': len(_memory_cache), 'memory_usage': 'N/A' } except Exception as e: return {'error': str(e)} # Job-specific cache invalidation def invalidate_job_cache(job_id: int): """Invalidate all cache entries related to a specific job""" patterns = [ f"job:{job_id}:*", f"results:job_{job_id}", f"analysis:job_{job_id}", f"audit:job_{job_id}", f"keywords:job_{job_id}", f"recommendations:job_{job_id}" ] for pattern in patterns: cache_invalidate(pattern) def invalidate_results_cache(): """Invalidate all results and analysis cache""" patterns = [ "results:*", "analysis:*", "audit:*", "query:*" ] for pattern in patterns: cache_invalidate(pattern) def invalidate_url_cache(url: str): """Invalidate cache for a specific URL""" import hashlib url_hash = hashlib.md5(url.encode()).hexdigest() patterns = [ f"url:{url_hash}:*", f"results:{url_hash}", f"analysis:{url_hash}" ] for pattern in patterns: cache_invalidate(pattern)