Spaces:
Runtime error
Runtime error
| """ | |
| Enhanced caching service for frequently accessed data and responses. | |
| This module provides intelligent caching for chat responses, language contexts, | |
| and other frequently accessed data to improve performance. | |
| """ | |
| import json | |
| import logging | |
| import hashlib | |
| import time | |
| from typing import Any, Optional, Dict, List, Union | |
| from datetime import datetime, timedelta | |
| from dataclasses import dataclass, asdict | |
| import redis | |
| from redis.exceptions import RedisError | |
| logger = logging.getLogger(__name__) | |
| class CacheEntry: | |
| """Represents a cached entry with metadata.""" | |
| key: str | |
| value: Any | |
| created_at: datetime | |
| expires_at: Optional[datetime] = None | |
| hit_count: int = 0 | |
| last_accessed: Optional[datetime] = None | |
| tags: List[str] = None | |
| def __post_init__(self): | |
| if self.tags is None: | |
| self.tags = [] | |
| class CacheService: | |
| """Enhanced caching service with intelligent cache management.""" | |
| def __init__(self, redis_client: Optional[redis.Redis] = None): | |
| """ | |
| Initialize cache service. | |
| Args: | |
| redis_client: Redis client instance (optional) | |
| """ | |
| self.redis_client = redis_client | |
| self.cache_prefix = "chat_cache:" | |
| self.stats_prefix = "cache_stats:" | |
| self.tag_prefix = "cache_tags:" | |
| # Cache configuration | |
| self.default_ttl = 3600 # 1 hour | |
| self.max_response_cache_size = 1000 # Max cached responses | |
| self.response_cache_ttl = 1800 # 30 minutes for responses | |
| self.language_context_ttl = 86400 # 24 hours for language contexts | |
| # Performance tracking | |
| self.cache_hits = 0 | |
| self.cache_misses = 0 | |
| self.cache_errors = 0 | |
| logger.info("Cache service initialized", extra={ | |
| 'redis_enabled': bool(redis_client), | |
| 'default_ttl': self.default_ttl | |
| }) | |
| def _generate_cache_key(self, namespace: str, identifier: str) -> str: | |
| """Generate cache key with namespace.""" | |
| return f"{self.cache_prefix}{namespace}:{identifier}" | |
| def _serialize_value(self, value: Any) -> str: | |
| """Serialize value for caching.""" | |
| if isinstance(value, (dict, list)): | |
| return json.dumps(value, default=str) | |
| elif isinstance(value, datetime): | |
| return value.isoformat() | |
| else: | |
| return str(value) | |
| def _deserialize_value(self, value: str, value_type: type = None) -> Any: | |
| """Deserialize cached value.""" | |
| if value_type == datetime: | |
| return datetime.fromisoformat(value) | |
| try: | |
| # Try to parse as JSON first | |
| return json.loads(value) | |
| except (json.JSONDecodeError, TypeError): | |
| # Return as string if JSON parsing fails | |
| return value | |
| def _hash_content(self, content: str) -> str: | |
| """Generate hash for content-based caching.""" | |
| return hashlib.md5(content.encode('utf-8')).hexdigest() | |
| def set(self, namespace: str, key: str, value: Any, ttl: Optional[int] = None, | |
| tags: Optional[List[str]] = None) -> bool: | |
| """ | |
| Set a value in cache. | |
| Args: | |
| namespace: Cache namespace | |
| key: Cache key | |
| value: Value to cache | |
| ttl: Time to live in seconds | |
| tags: Tags for cache invalidation | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| if not self.redis_client: | |
| return False | |
| try: | |
| cache_key = self._generate_cache_key(namespace, key) | |
| serialized_value = self._serialize_value(value) | |
| # Set value with TTL | |
| ttl = ttl or self.default_ttl | |
| success = self.redis_client.setex(cache_key, ttl, serialized_value) | |
| # Store metadata | |
| if success and tags: | |
| self._store_cache_metadata(cache_key, tags, ttl) | |
| # Update stats | |
| self._update_cache_stats('set', namespace) | |
| logger.debug(f"Cached value for key: {cache_key}") | |
| return bool(success) | |
| except RedisError as e: | |
| logger.warning(f"Failed to set cache value: {e}") | |
| self.cache_errors += 1 | |
| return False | |
| def get(self, namespace: str, key: str, value_type: type = None) -> Optional[Any]: | |
| """ | |
| Get a value from cache. | |
| Args: | |
| namespace: Cache namespace | |
| key: Cache key | |
| value_type: Expected value type for deserialization | |
| Returns: | |
| Cached value or None if not found | |
| """ | |
| if not self.redis_client: | |
| return None | |
| try: | |
| cache_key = self._generate_cache_key(namespace, key) | |
| cached_value = self.redis_client.get(cache_key) | |
| if cached_value is not None: | |
| self.cache_hits += 1 | |
| self._update_cache_stats('hit', namespace) | |
| # Update access metadata | |
| self._update_access_metadata(cache_key) | |
| # Deserialize and return | |
| return self._deserialize_value(cached_value.decode('utf-8'), value_type) | |
| else: | |
| self.cache_misses += 1 | |
| self._update_cache_stats('miss', namespace) | |
| return None | |
| except RedisError as e: | |
| logger.warning(f"Failed to get cache value: {e}") | |
| self.cache_errors += 1 | |
| self.cache_misses += 1 | |
| return None | |
| def delete(self, namespace: str, key: str) -> bool: | |
| """ | |
| Delete a value from cache. | |
| Args: | |
| namespace: Cache namespace | |
| key: Cache key | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| if not self.redis_client: | |
| return False | |
| try: | |
| cache_key = self._generate_cache_key(namespace, key) | |
| deleted = self.redis_client.delete(cache_key) | |
| # Clean up metadata | |
| self._cleanup_cache_metadata(cache_key) | |
| self._update_cache_stats('delete', namespace) | |
| return bool(deleted) | |
| except RedisError as e: | |
| logger.warning(f"Failed to delete cache value: {e}") | |
| self.cache_errors += 1 | |
| return False | |
| def invalidate_by_tags(self, tags: List[str]) -> int: | |
| """ | |
| Invalidate all cache entries with specified tags. | |
| Args: | |
| tags: List of tags to invalidate | |
| Returns: | |
| Number of entries invalidated | |
| """ | |
| if not self.redis_client or not tags: | |
| return 0 | |
| try: | |
| invalidated_count = 0 | |
| for tag in tags: | |
| tag_key = f"{self.tag_prefix}{tag}" | |
| cache_keys = self.redis_client.smembers(tag_key) | |
| if cache_keys: | |
| # Delete all keys with this tag | |
| deleted = self.redis_client.delete(*cache_keys) | |
| invalidated_count += deleted | |
| # Clean up tag set | |
| self.redis_client.delete(tag_key) | |
| logger.info(f"Invalidated {invalidated_count} cache entries for tags: {tags}") | |
| return invalidated_count | |
| except RedisError as e: | |
| logger.warning(f"Failed to invalidate cache by tags: {e}") | |
| return 0 | |
| def cache_response(self, prompt: str, language: str, response: str, | |
| metadata: Optional[Dict] = None) -> bool: | |
| """ | |
| Cache a chat response for similar prompts. | |
| Args: | |
| prompt: User prompt | |
| language: Programming language | |
| response: Generated response | |
| metadata: Additional metadata | |
| Returns: | |
| True if cached successfully | |
| """ | |
| # Generate cache key based on prompt and language | |
| prompt_hash = self._hash_content(f"{prompt}:{language}") | |
| cache_key = f"response:{prompt_hash}" | |
| cache_data = { | |
| 'prompt': prompt, | |
| 'language': language, | |
| 'response': response, | |
| 'metadata': metadata or {}, | |
| 'cached_at': datetime.utcnow().isoformat() | |
| } | |
| tags = [f"language:{language}", "responses"] | |
| return self.set("responses", cache_key, cache_data, | |
| ttl=self.response_cache_ttl, tags=tags) | |
| def get_cached_response(self, prompt: str, language: str) -> Optional[Dict]: | |
| """ | |
| Get cached response for similar prompt. | |
| Args: | |
| prompt: User prompt | |
| language: Programming language | |
| Returns: | |
| Cached response data or None | |
| """ | |
| prompt_hash = self._hash_content(f"{prompt}:{language}") | |
| cache_key = f"response:{prompt_hash}" | |
| return self.get("responses", cache_key) | |
| def cache_language_context(self, session_id: str, language: str, | |
| context_data: Dict) -> bool: | |
| """ | |
| Cache language context for a session. | |
| Args: | |
| session_id: Session identifier | |
| language: Programming language | |
| context_data: Language context data | |
| Returns: | |
| True if cached successfully | |
| """ | |
| cache_key = f"context:{session_id}" | |
| cache_data = { | |
| 'session_id': session_id, | |
| 'language': language, | |
| 'context_data': context_data, | |
| 'cached_at': datetime.utcnow().isoformat() | |
| } | |
| tags = [f"session:{session_id}", f"language:{language}", "contexts"] | |
| return self.set("language_contexts", cache_key, cache_data, | |
| ttl=self.language_context_ttl, tags=tags) | |
| def get_cached_language_context(self, session_id: str) -> Optional[Dict]: | |
| """ | |
| Get cached language context for a session. | |
| Args: | |
| session_id: Session identifier | |
| Returns: | |
| Cached context data or None | |
| """ | |
| cache_key = f"context:{session_id}" | |
| return self.get("language_contexts", cache_key) | |
| def cache_session_data(self, session_id: str, session_data: Dict, | |
| ttl: Optional[int] = None) -> bool: | |
| """ | |
| Cache session data for quick access. | |
| Args: | |
| session_id: Session identifier | |
| session_data: Session data to cache | |
| ttl: Time to live (defaults to 1 hour) | |
| Returns: | |
| True if cached successfully | |
| """ | |
| cache_key = f"session:{session_id}" | |
| cache_data = { | |
| 'session_data': session_data, | |
| 'cached_at': datetime.utcnow().isoformat() | |
| } | |
| tags = [f"session:{session_id}", "sessions"] | |
| return self.set("sessions", cache_key, cache_data, | |
| ttl=ttl or 3600, tags=tags) | |
| def get_cached_session_data(self, session_id: str) -> Optional[Dict]: | |
| """ | |
| Get cached session data. | |
| Args: | |
| session_id: Session identifier | |
| Returns: | |
| Cached session data or None | |
| """ | |
| cache_key = f"session:{session_id}" | |
| cached = self.get("sessions", cache_key) | |
| if cached: | |
| return cached.get('session_data') | |
| return None | |
| def _store_cache_metadata(self, cache_key: str, tags: List[str], ttl: int): | |
| """Store cache metadata for tag-based invalidation.""" | |
| try: | |
| # Store tags for this cache key | |
| for tag in tags: | |
| tag_key = f"{self.tag_prefix}{tag}" | |
| self.redis_client.sadd(tag_key, cache_key) | |
| self.redis_client.expire(tag_key, ttl + 300) # Expire tags 5 minutes after cache | |
| except RedisError as e: | |
| logger.warning(f"Failed to store cache metadata: {e}") | |
| def _cleanup_cache_metadata(self, cache_key: str): | |
| """Clean up metadata for deleted cache key.""" | |
| try: | |
| # This is a simplified cleanup - in production, you might want | |
| # to maintain a reverse index for more efficient cleanup | |
| pass | |
| except RedisError as e: | |
| logger.warning(f"Failed to cleanup cache metadata: {e}") | |
| def _update_access_metadata(self, cache_key: str): | |
| """Update access metadata for cache key.""" | |
| try: | |
| # Increment hit count | |
| hit_key = f"{cache_key}:hits" | |
| self.redis_client.incr(hit_key) | |
| self.redis_client.expire(hit_key, self.default_ttl) | |
| # Update last accessed time | |
| access_key = f"{cache_key}:last_access" | |
| self.redis_client.set(access_key, datetime.utcnow().isoformat(), ex=self.default_ttl) | |
| except RedisError as e: | |
| logger.warning(f"Failed to update access metadata: {e}") | |
| def _update_cache_stats(self, operation: str, namespace: str): | |
| """Update cache statistics.""" | |
| if not self.redis_client: | |
| return | |
| try: | |
| stats_key = f"{self.stats_prefix}{namespace}:{operation}" | |
| self.redis_client.incr(stats_key) | |
| self.redis_client.expire(stats_key, 86400) # Keep stats for 24 hours | |
| except RedisError as e: | |
| logger.warning(f"Failed to update cache stats: {e}") | |
| def get_cache_stats(self) -> Dict[str, Any]: | |
| """ | |
| Get cache performance statistics. | |
| Returns: | |
| Dictionary with cache statistics | |
| """ | |
| total_requests = self.cache_hits + self.cache_misses | |
| hit_rate = (self.cache_hits / total_requests * 100) if total_requests > 0 else 0 | |
| stats = { | |
| 'cache_hits': self.cache_hits, | |
| 'cache_misses': self.cache_misses, | |
| 'cache_errors': self.cache_errors, | |
| 'hit_rate_percent': round(hit_rate, 2), | |
| 'total_requests': total_requests, | |
| 'redis_enabled': bool(self.redis_client) | |
| } | |
| # Get Redis-specific stats if available | |
| if self.redis_client: | |
| try: | |
| info = self.redis_client.info() | |
| stats.update({ | |
| 'redis_memory_used': info.get('used_memory_human', 'N/A'), | |
| 'redis_connected_clients': info.get('connected_clients', 0), | |
| 'redis_keyspace_hits': info.get('keyspace_hits', 0), | |
| 'redis_keyspace_misses': info.get('keyspace_misses', 0) | |
| }) | |
| except RedisError as e: | |
| logger.warning(f"Failed to get Redis stats: {e}") | |
| return stats | |
| def clear_namespace(self, namespace: str) -> int: | |
| """ | |
| Clear all cache entries in a namespace. | |
| Args: | |
| namespace: Namespace to clear | |
| Returns: | |
| Number of entries cleared | |
| """ | |
| if not self.redis_client: | |
| return 0 | |
| try: | |
| pattern = f"{self.cache_prefix}{namespace}:*" | |
| keys = self.redis_client.keys(pattern) | |
| if keys: | |
| deleted = self.redis_client.delete(*keys) | |
| logger.info(f"Cleared {deleted} cache entries from namespace: {namespace}") | |
| return deleted | |
| return 0 | |
| except RedisError as e: | |
| logger.warning(f"Failed to clear namespace {namespace}: {e}") | |
| return 0 | |
| def warm_cache(self, data_loader_func, namespace: str, keys: List[str], | |
| ttl: Optional[int] = None): | |
| """ | |
| Warm cache with data from a loader function. | |
| Args: | |
| data_loader_func: Function to load data for cache warming | |
| namespace: Cache namespace | |
| keys: List of keys to warm | |
| ttl: Time to live for cached entries | |
| """ | |
| if not self.redis_client: | |
| return | |
| logger.info(f"Warming cache for namespace: {namespace}") | |
| for key in keys: | |
| try: | |
| # Check if already cached | |
| if self.get(namespace, key) is not None: | |
| continue | |
| # Load data and cache it | |
| data = data_loader_func(key) | |
| if data is not None: | |
| self.set(namespace, key, data, ttl=ttl) | |
| except Exception as e: | |
| logger.warning(f"Failed to warm cache for key {key}: {e}") | |
| logger.info(f"Cache warming completed for namespace: {namespace}") | |
| # Global cache service instance | |
| _cache_service: Optional[CacheService] = None | |
| def initialize_cache_service(redis_client: Optional[redis.Redis] = None) -> CacheService: | |
| """ | |
| Initialize global cache service. | |
| Args: | |
| redis_client: Redis client instance | |
| Returns: | |
| CacheService instance | |
| """ | |
| global _cache_service | |
| if _cache_service is None: | |
| _cache_service = CacheService(redis_client) | |
| return _cache_service | |
| def get_cache_service() -> Optional[CacheService]: | |
| """ | |
| Get the global cache service. | |
| Returns: | |
| CacheService instance or None if not initialized | |
| """ | |
| return _cache_service |