Spaces:
Sleeping
Sleeping
| import time | |
| from typing import Dict, Any, Optional | |
| from backend.credentials import setup_google_credentials | |
| setup_google_credentials() | |
| # Simple in-memory cache | |
| user_cache: Dict[str, Dict[str, Any]] = {} | |
| routing_cache: Dict[str, str] = {} | |
| CACHE_DURATION = 30 # 30 seconds | |
| ROUTING_CACHE_DURATION = 1800 # 30 minutes for routing | |
| def get_cache_key(user_id: str) -> str: | |
| """Generate cache key for user data""" | |
| return f"user_data:{user_id}" | |
| def is_cache_valid(cache_entry: Dict[str, Any], duration: int = CACHE_DURATION) -> bool: | |
| """Check if cache entry is still valid""" | |
| current_time = time.time() | |
| return current_time - cache_entry['timestamp'] < duration | |
| def get_cached_user_data(user_id: str) -> Optional[Dict[str, Any]]: | |
| """Retrieve cached user data if valid""" | |
| cache_key = get_cache_key(user_id) | |
| if cache_key in user_cache: | |
| cache_entry = user_cache[cache_key] | |
| if is_cache_valid(cache_entry): | |
| return cache_entry['data'] | |
| else: | |
| # Remove expired entry | |
| del user_cache[cache_key] | |
| return None | |
| def cache_user_data(user_id: str, data: Dict[str, Any]) -> None: | |
| """Cache user data with timestamp""" | |
| cache_key = get_cache_key(user_id) | |
| user_cache[cache_key] = { | |
| 'data': data, | |
| 'timestamp': time.time() | |
| } | |
| print(f"[CACHE] Cached user data for user: {user_id}") | |
| def get_cached_route(user_message: str) -> Optional[str]: | |
| """Get cached routing decision""" | |
| # Create a simple hash of the message for caching | |
| import hashlib | |
| message_hash = hashlib.md5(user_message.lower().strip().encode()).hexdigest() | |
| if message_hash in routing_cache: | |
| cache_entry = routing_cache[message_hash] | |
| if is_cache_valid(cache_entry, ROUTING_CACHE_DURATION): | |
| return cache_entry['route'] | |
| else: | |
| del routing_cache[message_hash] | |
| return None | |
| def cache_route(user_message: str, route: str) -> None: | |
| """Cache routing decision""" | |
| import hashlib | |
| message_hash = hashlib.md5(user_message.lower().strip().encode()).hexdigest() | |
| routing_cache[message_hash] = { | |
| 'route': route, | |
| 'timestamp': time.time() | |
| } | |
| print(f"[CACHE] Cached route '{route}' for message hash: {message_hash[:8]}...") | |
| def clear_user_cache(user_id: str = None) -> None: | |
| """Clear cache for specific user or all users""" | |
| global user_cache | |
| if user_id: | |
| cache_key = get_cache_key(user_id) | |
| user_cache.pop(cache_key, None) | |
| print(f"[CACHE] Cleared cache for user: {user_id}") | |
| else: | |
| user_cache.clear() | |
| print("[CACHE] Cleared all user cache") | |
| def get_cache_stats() -> Dict[str, Any]: | |
| """Get cache statistics""" | |
| current_time = time.time() | |
| valid_user_entries = 0 | |
| expired_user_entries = 0 | |
| valid_routing_entries = 0 | |
| expired_routing_entries = 0 | |
| # User cache stats | |
| for entry in user_cache.values(): | |
| if current_time - entry['timestamp'] < CACHE_DURATION: | |
| valid_user_entries += 1 | |
| else: | |
| expired_user_entries += 1 | |
| # Routing cache stats | |
| for entry in routing_cache.values(): | |
| if current_time - entry['timestamp'] < ROUTING_CACHE_DURATION: | |
| valid_routing_entries += 1 | |
| else: | |
| expired_routing_entries += 1 | |
| return { | |
| "user_cache": { | |
| "total_entries": len(user_cache), | |
| "valid_entries": valid_user_entries, | |
| "expired_entries": expired_user_entries, | |
| "cache_duration_seconds": CACHE_DURATION | |
| }, | |
| "routing_cache": { | |
| "total_entries": len(routing_cache), | |
| "valid_entries": valid_routing_entries, | |
| "expired_entries": expired_routing_entries, | |
| "cache_duration_seconds": ROUTING_CACHE_DURATION | |
| } | |
| } | |
| def cleanup_expired_cache() -> Dict[str, int]: | |
| """Remove expired cache entries and return count removed""" | |
| global user_cache, routing_cache | |
| current_time = time.time() | |
| # Clean user cache | |
| expired_user_keys = [] | |
| for key, entry in user_cache.items(): | |
| if current_time - entry['timestamp'] >= CACHE_DURATION: | |
| expired_user_keys.append(key) | |
| for key in expired_user_keys: | |
| del user_cache[key] | |
| # Clean routing cache | |
| expired_routing_keys = [] | |
| for key, entry in routing_cache.items(): | |
| if current_time - entry['timestamp'] >= ROUTING_CACHE_DURATION: | |
| expired_routing_keys.append(key) | |
| for key in expired_routing_keys: | |
| del routing_cache[key] | |
| if expired_user_keys or expired_routing_keys: | |
| print(f"[CACHE] Cleaned up {len(expired_user_keys)} user entries and {len(expired_routing_keys)} routing entries") | |
| return { | |
| "user_entries_removed": len(expired_user_keys), | |
| "routing_entries_removed": len(expired_routing_keys) | |
| } | |