Spaces:
Sleeping
Sleeping
| # Cache utility functions for API endpoints | |
| import json | |
| import zlib | |
| import base64 | |
| import binascii | |
| import time | |
| from services.config.valkey_config import get_redis_client, is_connection_available | |
| redis_client = get_redis_client() | |
| # Cache metrics | |
| cache_hits = 0 | |
| cache_misses = 0 | |
| start_time = time.time() | |
| cached_data_size = 0 | |
| compressed_data_size = 0 | |
| # Compression threshold in bytes (10KB) | |
| COMPRESSION_THRESHOLD = 10 * 1024 | |
| async def get_cached_data(key, db_fetch_func, ttl=3600, use_compression=False): | |
| """ | |
| Get data from Valkey cache or database with optional compression | |
| Args: | |
| key: Valkey key | |
| db_fetch_func: Function to fetch data from database | |
| ttl: Time-to-live in seconds | |
| use_compression: Whether to use compression for large objects | |
| """ | |
| global cache_hits, cache_misses, cached_data_size, compressed_data_size | |
| # If Valkey is not available, fetch directly from database | |
| if not is_connection_available() or not redis_client: | |
| print(f"Cache DISABLED: {key} - fetching from database") | |
| cache_misses += 1 | |
| return await db_fetch_func() | |
| try: | |
| # Check if data is in cache | |
| cached_data = redis_client.get(key) | |
| if cached_data: | |
| # Increment hit counter | |
| cache_hits += 1 | |
| print(f"Cache HIT: {key}") | |
| try: | |
| # Check if data is compressed (starts with special prefix) | |
| # Handle both string and bytes (due to decode_responses setting) | |
| if ((isinstance(cached_data, bytes) and cached_data.startswith(b'COMPRESSED:')) or | |
| (isinstance(cached_data, str) and cached_data.startswith('COMPRESSED:'))): | |
| # Remove prefix and decompress | |
| if isinstance(cached_data, str): | |
| compressed_data = base64.b64decode(cached_data[11:]) | |
| else: | |
| compressed_data = base64.b64decode(cached_data[11:]) | |
| decompressed_data = zlib.decompress(compressed_data) | |
| return json.loads(decompressed_data.decode('utf-8')) | |
| else: | |
| # Regular non-compressed data | |
| return json.loads(cached_data) | |
| except (json.JSONDecodeError, zlib.error, binascii.Error) as e: | |
| print(f"Cache ERROR for {key}: {e}") | |
| print("Falling back to database") | |
| # Delete corrupted cache entry | |
| redis_client.delete(key) | |
| cache_misses += 1 | |
| return await db_fetch_func() | |
| # Increment miss counter | |
| cache_misses += 1 | |
| print(f"Cache MISS: {key}") | |
| # Fetch data from database | |
| data = await db_fetch_func() | |
| # Serialize the data | |
| serialized_data = json.dumps(data) | |
| serialized_bytes = serialized_data.encode('utf-8') | |
| # Track original size | |
| original_size = len(serialized_bytes) | |
| cached_data_size += original_size | |
| # Decide whether to compress based on size and flag | |
| if use_compression and original_size > COMPRESSION_THRESHOLD: | |
| # Compress data | |
| compressed_data = zlib.compress(serialized_bytes) | |
| encoded_data = base64.b64encode(compressed_data) | |
| # Store with a prefix to indicate compression | |
| redis_value = b'COMPRESSED:' + encoded_data | |
| # Track compressed size | |
| compressed_size = len(redis_value) | |
| compressed_data_size += compressed_size | |
| # Calculate compression ratio | |
| ratio = (compressed_size / original_size) * 100 | |
| print(f"Compressed {key}: {original_size} -> {compressed_size} bytes ({ratio:.2f}%)") | |
| # Store compressed data in Valkey | |
| redis_client.setex(key, ttl, redis_value) | |
| else: | |
| # Store regular data | |
| redis_client.setex(key, ttl, serialized_data) | |
| return data | |
| except Exception as e: | |
| print(f"Cache ERROR for {key}: {e}") | |
| print("Falling back to database") | |
| cache_misses += 1 | |
| return await db_fetch_func() | |
| def invalidate_cache(keys): | |
| """Delete multiple cache keys""" | |
| if not is_connection_available() or not redis_client: | |
| print("Cache DISABLED: Cannot invalidate cache keys") | |
| return | |
| try: | |
| if keys: | |
| redis_client.delete(*keys) | |
| except Exception as e: | |
| print(f"Cache invalidation ERROR: {e}") | |
| def invalidate_cache_pattern(pattern): | |
| """ | |
| Delete all cache keys matching a pattern | |
| For example: 'user:profile:*' to delete all user profiles | |
| """ | |
| if not is_connection_available() or not redis_client: | |
| print("Cache DISABLED: Cannot invalidate cache pattern") | |
| return | |
| try: | |
| cursor = 0 | |
| while True: | |
| cursor, keys = redis_client.scan(cursor, match=pattern, count=100) | |
| if keys: | |
| redis_client.delete(*keys) | |
| if cursor == 0: | |
| break | |
| except Exception as e: | |
| print(f"Cache pattern invalidation ERROR: {e}") | |
| def get_cache_metrics(): | |
| """Get cache hit/miss metrics and efficiency statistics""" | |
| global cache_hits, cache_misses, start_time, cached_data_size, compressed_data_size | |
| total_requests = cache_hits + cache_misses | |
| hit_ratio = 0 | |
| if total_requests > 0: | |
| hit_ratio = (cache_hits / total_requests) * 100 | |
| uptime = time.time() - start_time | |
| compression_savings = 0 | |
| if cached_data_size > 0 and compressed_data_size > 0: | |
| compression_savings = 100 - ((compressed_data_size / cached_data_size) * 100) | |
| return { | |
| "hits": cache_hits, | |
| "misses": cache_misses, | |
| "total_requests": total_requests, | |
| "hit_ratio": f"{hit_ratio:.2f}%", | |
| "uptime_seconds": uptime, | |
| "cached_data_size_kb": cached_data_size / 1024, | |
| "compressed_data_size_kb": compressed_data_size / 1024, | |
| "compression_savings": f"{compression_savings:.2f}%" | |
| } | |
| def reset_cache_metrics(): | |
| """Reset all cache metrics counters""" | |
| global cache_hits, cache_misses, start_time, cached_data_size, compressed_data_size | |
| cache_hits = 0 | |
| cache_misses = 0 | |
| start_time = time.time() | |
| cached_data_size = 0 | |
| compressed_data_size = 0 | |
| return {"message": "Cache metrics reset"} | |