Spaces:
Sleeping
Sleeping
| # ============================================================= | |
| # File: backend/api/services/query_cache.py | |
| # ============================================================= | |
| """ | |
| Query caching service for repeated queries. | |
| Uses in-memory cache with TTL for fast responses. | |
| """ | |
| import time | |
| import hashlib | |
| from typing import Optional, Dict, Any | |
| from collections import OrderedDict | |
| class QueryCache: | |
| """In-memory cache for query responses with TTL.""" | |
| def __init__(self, max_size: int = 100, ttl_seconds: int = 300): | |
| """ | |
| Initialize cache. | |
| Args: | |
| max_size: Maximum number of cached entries | |
| ttl_seconds: Time-to-live in seconds (default 5 minutes) | |
| """ | |
| self.max_size = max_size | |
| self.ttl_seconds = ttl_seconds | |
| self.cache: OrderedDict[str, Dict[str, Any]] = OrderedDict() | |
| def _generate_key(self, query: str, tenant_id: str) -> str: | |
| """Generate cache key from query and tenant.""" | |
| key_string = f"{tenant_id}:{query.lower().strip()}" | |
| return hashlib.md5(key_string.encode()).hexdigest() | |
| def get(self, query: str, tenant_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Get cached response if available and not expired. | |
| Returns: | |
| Cached response dict or None if not found/expired | |
| """ | |
| key = self._generate_key(query, tenant_id) | |
| if key not in self.cache: | |
| return None | |
| entry = self.cache[key] | |
| current_time = time.time() | |
| # Check if expired | |
| if current_time - entry['timestamp'] > self.ttl_seconds: | |
| del self.cache[key] | |
| return None | |
| # Move to end (LRU) | |
| self.cache.move_to_end(key) | |
| return entry['response'] | |
| def set(self, query: str, tenant_id: str, response: Dict[str, Any]): | |
| """ | |
| Cache a response. | |
| Args: | |
| query: Original query | |
| tenant_id: Tenant ID | |
| response: Response to cache | |
| """ | |
| key = self._generate_key(query, tenant_id) | |
| # Remove if exists | |
| if key in self.cache: | |
| del self.cache[key] | |
| # Add new entry | |
| self.cache[key] = { | |
| 'response': response, | |
| 'timestamp': time.time() | |
| } | |
| # Enforce max size (remove oldest) | |
| if len(self.cache) > self.max_size: | |
| self.cache.popitem(last=False) | |
| def clear(self, tenant_id: Optional[str] = None): | |
| """Clear cache for tenant or all if tenant_id is None.""" | |
| if tenant_id is None: | |
| self.cache.clear() | |
| else: | |
| keys_to_remove = [ | |
| key for key in self.cache.keys() | |
| if self.cache[key]['response'].get('tenant_id') == tenant_id | |
| ] | |
| for key in keys_to_remove: | |
| del self.cache[key] | |
| def stats(self) -> Dict[str, Any]: | |
| """Get cache statistics.""" | |
| return { | |
| 'size': len(self.cache), | |
| 'max_size': self.max_size, | |
| 'ttl_seconds': self.ttl_seconds | |
| } | |
| # Global cache instance | |
| _global_cache = QueryCache(max_size=200, ttl_seconds=300) | |
| def get_cache() -> QueryCache: | |
| """Get global cache instance.""" | |
| return _global_cache | |