""" Enhanced caching service for frequently accessed data and responses. This module provides intelligent caching for chat responses, language contexts, and other frequently accessed data to improve performance. """ import json import logging import hashlib import time from typing import Any, Optional, Dict, List, Union from datetime import datetime, timedelta from dataclasses import dataclass, asdict import redis from redis.exceptions import RedisError logger = logging.getLogger(__name__) @dataclass class CacheEntry: """Represents a cached entry with metadata.""" key: str value: Any created_at: datetime expires_at: Optional[datetime] = None hit_count: int = 0 last_accessed: Optional[datetime] = None tags: List[str] = None def __post_init__(self): if self.tags is None: self.tags = [] class CacheService: """Enhanced caching service with intelligent cache management.""" def __init__(self, redis_client: Optional[redis.Redis] = None): """ Initialize cache service. Args: redis_client: Redis client instance (optional) """ self.redis_client = redis_client self.cache_prefix = "chat_cache:" self.stats_prefix = "cache_stats:" self.tag_prefix = "cache_tags:" # Cache configuration self.default_ttl = 3600 # 1 hour self.max_response_cache_size = 1000 # Max cached responses self.response_cache_ttl = 1800 # 30 minutes for responses self.language_context_ttl = 86400 # 24 hours for language contexts # Performance tracking self.cache_hits = 0 self.cache_misses = 0 self.cache_errors = 0 logger.info("Cache service initialized", extra={ 'redis_enabled': bool(redis_client), 'default_ttl': self.default_ttl }) def _generate_cache_key(self, namespace: str, identifier: str) -> str: """Generate cache key with namespace.""" return f"{self.cache_prefix}{namespace}:{identifier}" def _serialize_value(self, value: Any) -> str: """Serialize value for caching.""" if isinstance(value, (dict, list)): return json.dumps(value, default=str) elif isinstance(value, datetime): return value.isoformat() else: return str(value) def _deserialize_value(self, value: str, value_type: type = None) -> Any: """Deserialize cached value.""" if value_type == datetime: return datetime.fromisoformat(value) try: # Try to parse as JSON first return json.loads(value) except (json.JSONDecodeError, TypeError): # Return as string if JSON parsing fails return value def _hash_content(self, content: str) -> str: """Generate hash for content-based caching.""" return hashlib.md5(content.encode('utf-8')).hexdigest() def set(self, namespace: str, key: str, value: Any, ttl: Optional[int] = None, tags: Optional[List[str]] = None) -> bool: """ Set a value in cache. Args: namespace: Cache namespace key: Cache key value: Value to cache ttl: Time to live in seconds tags: Tags for cache invalidation Returns: True if successful, False otherwise """ if not self.redis_client: return False try: cache_key = self._generate_cache_key(namespace, key) serialized_value = self._serialize_value(value) # Set value with TTL ttl = ttl or self.default_ttl success = self.redis_client.setex(cache_key, ttl, serialized_value) # Store metadata if success and tags: self._store_cache_metadata(cache_key, tags, ttl) # Update stats self._update_cache_stats('set', namespace) logger.debug(f"Cached value for key: {cache_key}") return bool(success) except RedisError as e: logger.warning(f"Failed to set cache value: {e}") self.cache_errors += 1 return False def get(self, namespace: str, key: str, value_type: type = None) -> Optional[Any]: """ Get a value from cache. Args: namespace: Cache namespace key: Cache key value_type: Expected value type for deserialization Returns: Cached value or None if not found """ if not self.redis_client: return None try: cache_key = self._generate_cache_key(namespace, key) cached_value = self.redis_client.get(cache_key) if cached_value is not None: self.cache_hits += 1 self._update_cache_stats('hit', namespace) # Update access metadata self._update_access_metadata(cache_key) # Deserialize and return return self._deserialize_value(cached_value.decode('utf-8'), value_type) else: self.cache_misses += 1 self._update_cache_stats('miss', namespace) return None except RedisError as e: logger.warning(f"Failed to get cache value: {e}") self.cache_errors += 1 self.cache_misses += 1 return None def delete(self, namespace: str, key: str) -> bool: """ Delete a value from cache. Args: namespace: Cache namespace key: Cache key Returns: True if successful, False otherwise """ if not self.redis_client: return False try: cache_key = self._generate_cache_key(namespace, key) deleted = self.redis_client.delete(cache_key) # Clean up metadata self._cleanup_cache_metadata(cache_key) self._update_cache_stats('delete', namespace) return bool(deleted) except RedisError as e: logger.warning(f"Failed to delete cache value: {e}") self.cache_errors += 1 return False def invalidate_by_tags(self, tags: List[str]) -> int: """ Invalidate all cache entries with specified tags. Args: tags: List of tags to invalidate Returns: Number of entries invalidated """ if not self.redis_client or not tags: return 0 try: invalidated_count = 0 for tag in tags: tag_key = f"{self.tag_prefix}{tag}" cache_keys = self.redis_client.smembers(tag_key) if cache_keys: # Delete all keys with this tag deleted = self.redis_client.delete(*cache_keys) invalidated_count += deleted # Clean up tag set self.redis_client.delete(tag_key) logger.info(f"Invalidated {invalidated_count} cache entries for tags: {tags}") return invalidated_count except RedisError as e: logger.warning(f"Failed to invalidate cache by tags: {e}") return 0 def cache_response(self, prompt: str, language: str, response: str, metadata: Optional[Dict] = None) -> bool: """ Cache a chat response for similar prompts. Args: prompt: User prompt language: Programming language response: Generated response metadata: Additional metadata Returns: True if cached successfully """ # Generate cache key based on prompt and language prompt_hash = self._hash_content(f"{prompt}:{language}") cache_key = f"response:{prompt_hash}" cache_data = { 'prompt': prompt, 'language': language, 'response': response, 'metadata': metadata or {}, 'cached_at': datetime.utcnow().isoformat() } tags = [f"language:{language}", "responses"] return self.set("responses", cache_key, cache_data, ttl=self.response_cache_ttl, tags=tags) def get_cached_response(self, prompt: str, language: str) -> Optional[Dict]: """ Get cached response for similar prompt. Args: prompt: User prompt language: Programming language Returns: Cached response data or None """ prompt_hash = self._hash_content(f"{prompt}:{language}") cache_key = f"response:{prompt_hash}" return self.get("responses", cache_key) def cache_language_context(self, session_id: str, language: str, context_data: Dict) -> bool: """ Cache language context for a session. Args: session_id: Session identifier language: Programming language context_data: Language context data Returns: True if cached successfully """ cache_key = f"context:{session_id}" cache_data = { 'session_id': session_id, 'language': language, 'context_data': context_data, 'cached_at': datetime.utcnow().isoformat() } tags = [f"session:{session_id}", f"language:{language}", "contexts"] return self.set("language_contexts", cache_key, cache_data, ttl=self.language_context_ttl, tags=tags) def get_cached_language_context(self, session_id: str) -> Optional[Dict]: """ Get cached language context for a session. Args: session_id: Session identifier Returns: Cached context data or None """ cache_key = f"context:{session_id}" return self.get("language_contexts", cache_key) def cache_session_data(self, session_id: str, session_data: Dict, ttl: Optional[int] = None) -> bool: """ Cache session data for quick access. Args: session_id: Session identifier session_data: Session data to cache ttl: Time to live (defaults to 1 hour) Returns: True if cached successfully """ cache_key = f"session:{session_id}" cache_data = { 'session_data': session_data, 'cached_at': datetime.utcnow().isoformat() } tags = [f"session:{session_id}", "sessions"] return self.set("sessions", cache_key, cache_data, ttl=ttl or 3600, tags=tags) def get_cached_session_data(self, session_id: str) -> Optional[Dict]: """ Get cached session data. Args: session_id: Session identifier Returns: Cached session data or None """ cache_key = f"session:{session_id}" cached = self.get("sessions", cache_key) if cached: return cached.get('session_data') return None def _store_cache_metadata(self, cache_key: str, tags: List[str], ttl: int): """Store cache metadata for tag-based invalidation.""" try: # Store tags for this cache key for tag in tags: tag_key = f"{self.tag_prefix}{tag}" self.redis_client.sadd(tag_key, cache_key) self.redis_client.expire(tag_key, ttl + 300) # Expire tags 5 minutes after cache except RedisError as e: logger.warning(f"Failed to store cache metadata: {e}") def _cleanup_cache_metadata(self, cache_key: str): """Clean up metadata for deleted cache key.""" try: # This is a simplified cleanup - in production, you might want # to maintain a reverse index for more efficient cleanup pass except RedisError as e: logger.warning(f"Failed to cleanup cache metadata: {e}") def _update_access_metadata(self, cache_key: str): """Update access metadata for cache key.""" try: # Increment hit count hit_key = f"{cache_key}:hits" self.redis_client.incr(hit_key) self.redis_client.expire(hit_key, self.default_ttl) # Update last accessed time access_key = f"{cache_key}:last_access" self.redis_client.set(access_key, datetime.utcnow().isoformat(), ex=self.default_ttl) except RedisError as e: logger.warning(f"Failed to update access metadata: {e}") def _update_cache_stats(self, operation: str, namespace: str): """Update cache statistics.""" if not self.redis_client: return try: stats_key = f"{self.stats_prefix}{namespace}:{operation}" self.redis_client.incr(stats_key) self.redis_client.expire(stats_key, 86400) # Keep stats for 24 hours except RedisError as e: logger.warning(f"Failed to update cache stats: {e}") def get_cache_stats(self) -> Dict[str, Any]: """ Get cache performance statistics. Returns: Dictionary with cache statistics """ total_requests = self.cache_hits + self.cache_misses hit_rate = (self.cache_hits / total_requests * 100) if total_requests > 0 else 0 stats = { 'cache_hits': self.cache_hits, 'cache_misses': self.cache_misses, 'cache_errors': self.cache_errors, 'hit_rate_percent': round(hit_rate, 2), 'total_requests': total_requests, 'redis_enabled': bool(self.redis_client) } # Get Redis-specific stats if available if self.redis_client: try: info = self.redis_client.info() stats.update({ 'redis_memory_used': info.get('used_memory_human', 'N/A'), 'redis_connected_clients': info.get('connected_clients', 0), 'redis_keyspace_hits': info.get('keyspace_hits', 0), 'redis_keyspace_misses': info.get('keyspace_misses', 0) }) except RedisError as e: logger.warning(f"Failed to get Redis stats: {e}") return stats def clear_namespace(self, namespace: str) -> int: """ Clear all cache entries in a namespace. Args: namespace: Namespace to clear Returns: Number of entries cleared """ if not self.redis_client: return 0 try: pattern = f"{self.cache_prefix}{namespace}:*" keys = self.redis_client.keys(pattern) if keys: deleted = self.redis_client.delete(*keys) logger.info(f"Cleared {deleted} cache entries from namespace: {namespace}") return deleted return 0 except RedisError as e: logger.warning(f"Failed to clear namespace {namespace}: {e}") return 0 def warm_cache(self, data_loader_func, namespace: str, keys: List[str], ttl: Optional[int] = None): """ Warm cache with data from a loader function. Args: data_loader_func: Function to load data for cache warming namespace: Cache namespace keys: List of keys to warm ttl: Time to live for cached entries """ if not self.redis_client: return logger.info(f"Warming cache for namespace: {namespace}") for key in keys: try: # Check if already cached if self.get(namespace, key) is not None: continue # Load data and cache it data = data_loader_func(key) if data is not None: self.set(namespace, key, data, ttl=ttl) except Exception as e: logger.warning(f"Failed to warm cache for key {key}: {e}") logger.info(f"Cache warming completed for namespace: {namespace}") # Global cache service instance _cache_service: Optional[CacheService] = None def initialize_cache_service(redis_client: Optional[redis.Redis] = None) -> CacheService: """ Initialize global cache service. Args: redis_client: Redis client instance Returns: CacheService instance """ global _cache_service if _cache_service is None: _cache_service = CacheService(redis_client) return _cache_service def get_cache_service() -> Optional[CacheService]: """ Get the global cache service. Returns: CacheService instance or None if not initialized """ return _cache_service