Spaces:
Sleeping
Sleeping
File size: 6,522 Bytes
f202266 0a9b204 f202266 ab7ee83 f202266 ccbdc68 f202266 ccbdc68 f202266 ab7ee83 f202266 ab7ee83 0a9b204 ab7ee83 f202266 ab7ee83 f202266 ab7ee83 f202266 ab7ee83 f202266 ab7ee83 f202266 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
# Cache utility functions for API endpoints
import json
import zlib
import base64
import binascii
import time
from services.config.valkey_config import get_redis_client, is_connection_available
redis_client = get_redis_client()
# Cache metrics
cache_hits = 0
cache_misses = 0
start_time = time.time()
cached_data_size = 0
compressed_data_size = 0
# Compression threshold in bytes (10KB)
COMPRESSION_THRESHOLD = 10 * 1024
async def get_cached_data(key, db_fetch_func, ttl=3600, use_compression=False):
"""
Get data from Valkey cache or database with optional compression
Args:
key: Valkey key
db_fetch_func: Function to fetch data from database
ttl: Time-to-live in seconds
use_compression: Whether to use compression for large objects
"""
global cache_hits, cache_misses, cached_data_size, compressed_data_size
# If Valkey is not available, fetch directly from database
if not is_connection_available() or not redis_client:
print(f"Cache DISABLED: {key} - fetching from database")
cache_misses += 1
return await db_fetch_func()
try:
# Check if data is in cache
cached_data = redis_client.get(key)
if cached_data:
# Increment hit counter
cache_hits += 1
print(f"Cache HIT: {key}")
try:
# Check if data is compressed (starts with special prefix)
# Handle both string and bytes (due to decode_responses setting)
if ((isinstance(cached_data, bytes) and cached_data.startswith(b'COMPRESSED:')) or
(isinstance(cached_data, str) and cached_data.startswith('COMPRESSED:'))):
# Remove prefix and decompress
if isinstance(cached_data, str):
compressed_data = base64.b64decode(cached_data[11:])
else:
compressed_data = base64.b64decode(cached_data[11:])
decompressed_data = zlib.decompress(compressed_data)
return json.loads(decompressed_data.decode('utf-8'))
else:
# Regular non-compressed data
return json.loads(cached_data)
except (json.JSONDecodeError, zlib.error, binascii.Error) as e:
print(f"Cache ERROR for {key}: {e}")
print("Falling back to database")
# Delete corrupted cache entry
redis_client.delete(key)
cache_misses += 1
return await db_fetch_func()
# Increment miss counter
cache_misses += 1
print(f"Cache MISS: {key}")
# Fetch data from database
data = await db_fetch_func()
# Serialize the data
serialized_data = json.dumps(data)
serialized_bytes = serialized_data.encode('utf-8')
# Track original size
original_size = len(serialized_bytes)
cached_data_size += original_size
# Decide whether to compress based on size and flag
if use_compression and original_size > COMPRESSION_THRESHOLD:
# Compress data
compressed_data = zlib.compress(serialized_bytes)
encoded_data = base64.b64encode(compressed_data)
# Store with a prefix to indicate compression
redis_value = b'COMPRESSED:' + encoded_data
# Track compressed size
compressed_size = len(redis_value)
compressed_data_size += compressed_size
# Calculate compression ratio
ratio = (compressed_size / original_size) * 100
print(f"Compressed {key}: {original_size} -> {compressed_size} bytes ({ratio:.2f}%)")
# Store compressed data in Valkey
redis_client.setex(key, ttl, redis_value)
else:
# Store regular data
redis_client.setex(key, ttl, serialized_data)
return data
except Exception as e:
print(f"Cache ERROR for {key}: {e}")
print("Falling back to database")
cache_misses += 1
return await db_fetch_func()
def invalidate_cache(keys):
"""Delete multiple cache keys"""
if not is_connection_available() or not redis_client:
print("Cache DISABLED: Cannot invalidate cache keys")
return
try:
if keys:
redis_client.delete(*keys)
except Exception as e:
print(f"Cache invalidation ERROR: {e}")
def invalidate_cache_pattern(pattern):
"""
Delete all cache keys matching a pattern
For example: 'user:profile:*' to delete all user profiles
"""
if not is_connection_available() or not redis_client:
print("Cache DISABLED: Cannot invalidate cache pattern")
return
try:
cursor = 0
while True:
cursor, keys = redis_client.scan(cursor, match=pattern, count=100)
if keys:
redis_client.delete(*keys)
if cursor == 0:
break
except Exception as e:
print(f"Cache pattern invalidation ERROR: {e}")
def get_cache_metrics():
"""Get cache hit/miss metrics and efficiency statistics"""
global cache_hits, cache_misses, start_time, cached_data_size, compressed_data_size
total_requests = cache_hits + cache_misses
hit_ratio = 0
if total_requests > 0:
hit_ratio = (cache_hits / total_requests) * 100
uptime = time.time() - start_time
compression_savings = 0
if cached_data_size > 0 and compressed_data_size > 0:
compression_savings = 100 - ((compressed_data_size / cached_data_size) * 100)
return {
"hits": cache_hits,
"misses": cache_misses,
"total_requests": total_requests,
"hit_ratio": f"{hit_ratio:.2f}%",
"uptime_seconds": uptime,
"cached_data_size_kb": cached_data_size / 1024,
"compressed_data_size_kb": compressed_data_size / 1024,
"compression_savings": f"{compression_savings:.2f}%"
}
def reset_cache_metrics():
"""Reset all cache metrics counters"""
global cache_hits, cache_misses, start_time, cached_data_size, compressed_data_size
cache_hits = 0
cache_misses = 0
start_time = time.time()
cached_data_size = 0
compressed_data_size = 0
return {"message": "Cache metrics reset"}
|