File size: 6,522 Bytes
f202266
 
 
 
0a9b204
f202266
ab7ee83
f202266
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ccbdc68
f202266
 
ccbdc68
f202266
 
 
 
 
 
ab7ee83
 
 
 
 
f202266
ab7ee83
 
 
 
 
 
 
 
0a9b204
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ab7ee83
 
 
 
 
 
 
 
 
 
 
f202266
ab7ee83
 
 
f202266
ab7ee83
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f202266
 
 
ab7ee83
 
 
 
 
 
 
 
 
f202266
 
 
 
 
 
ab7ee83
 
 
 
 
 
 
 
 
 
 
 
 
 
f202266
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# Cache utility functions for API endpoints
import json
import zlib
import base64
import binascii
import time
from services.config.valkey_config import get_redis_client, is_connection_available

redis_client = get_redis_client()

# Cache metrics
cache_hits = 0
cache_misses = 0
start_time = time.time()
cached_data_size = 0
compressed_data_size = 0

# Compression threshold in bytes (10KB)
COMPRESSION_THRESHOLD = 10 * 1024

async def get_cached_data(key, db_fetch_func, ttl=3600, use_compression=False):
    """
    Get data from Valkey cache or database with optional compression
    
    Args:
        key: Valkey key
        db_fetch_func: Function to fetch data from database
        ttl: Time-to-live in seconds
        use_compression: Whether to use compression for large objects
    """
    global cache_hits, cache_misses, cached_data_size, compressed_data_size
    
    # If Valkey is not available, fetch directly from database
    if not is_connection_available() or not redis_client:
        print(f"Cache DISABLED: {key} - fetching from database")
        cache_misses += 1
        return await db_fetch_func()
    
    try:
        # Check if data is in cache
        cached_data = redis_client.get(key)
        if cached_data:
            # Increment hit counter
            cache_hits += 1
            print(f"Cache HIT: {key}")
            
            try:
                # Check if data is compressed (starts with special prefix)
                # Handle both string and bytes (due to decode_responses setting)
                if ((isinstance(cached_data, bytes) and cached_data.startswith(b'COMPRESSED:')) or
                    (isinstance(cached_data, str) and cached_data.startswith('COMPRESSED:'))):
                    # Remove prefix and decompress
                    if isinstance(cached_data, str):
                        compressed_data = base64.b64decode(cached_data[11:])
                    else:
                        compressed_data = base64.b64decode(cached_data[11:])
                    decompressed_data = zlib.decompress(compressed_data)
                    return json.loads(decompressed_data.decode('utf-8'))
                else:
                    # Regular non-compressed data
                    return json.loads(cached_data)
            except (json.JSONDecodeError, zlib.error, binascii.Error) as e:
                print(f"Cache ERROR for {key}: {e}")
                print("Falling back to database")
                # Delete corrupted cache entry
                redis_client.delete(key)
                cache_misses += 1
                return await db_fetch_func()
        
        # Increment miss counter
        cache_misses += 1
        print(f"Cache MISS: {key}")
        
        # Fetch data from database
        data = await db_fetch_func()
        
        # Serialize the data
        serialized_data = json.dumps(data)
        serialized_bytes = serialized_data.encode('utf-8')
        
        # Track original size
        original_size = len(serialized_bytes)
        cached_data_size += original_size
        
        # Decide whether to compress based on size and flag
        if use_compression and original_size > COMPRESSION_THRESHOLD:
            # Compress data
            compressed_data = zlib.compress(serialized_bytes)
            encoded_data = base64.b64encode(compressed_data)
            
            # Store with a prefix to indicate compression
            redis_value = b'COMPRESSED:' + encoded_data
            
            # Track compressed size
            compressed_size = len(redis_value)
            compressed_data_size += compressed_size
            
            # Calculate compression ratio
            ratio = (compressed_size / original_size) * 100
            print(f"Compressed {key}: {original_size} -> {compressed_size} bytes ({ratio:.2f}%)")
            
            # Store compressed data in Valkey
            redis_client.setex(key, ttl, redis_value)
        else:
            # Store regular data
            redis_client.setex(key, ttl, serialized_data)
        
        return data
    
    except Exception as e:
        print(f"Cache ERROR for {key}: {e}")
        print("Falling back to database")
        cache_misses += 1
        return await db_fetch_func()

def invalidate_cache(keys):
    """Delete multiple cache keys"""
    if not is_connection_available() or not redis_client:
        print("Cache DISABLED: Cannot invalidate cache keys")
        return
    
    try:
        if keys:
            redis_client.delete(*keys)
    except Exception as e:
        print(f"Cache invalidation ERROR: {e}")

def invalidate_cache_pattern(pattern):
    """
    Delete all cache keys matching a pattern
    For example: 'user:profile:*' to delete all user profiles
    """
    if not is_connection_available() or not redis_client:
        print("Cache DISABLED: Cannot invalidate cache pattern")
        return
    
    try:
        cursor = 0
        while True:
            cursor, keys = redis_client.scan(cursor, match=pattern, count=100)
            if keys:
                redis_client.delete(*keys)
            if cursor == 0:
                break
    except Exception as e:
        print(f"Cache pattern invalidation ERROR: {e}")

def get_cache_metrics():
    """Get cache hit/miss metrics and efficiency statistics"""
    global cache_hits, cache_misses, start_time, cached_data_size, compressed_data_size
    
    total_requests = cache_hits + cache_misses
    hit_ratio = 0
    if total_requests > 0:
        hit_ratio = (cache_hits / total_requests) * 100
    
    uptime = time.time() - start_time
    
    compression_savings = 0
    if cached_data_size > 0 and compressed_data_size > 0:
        compression_savings = 100 - ((compressed_data_size / cached_data_size) * 100)
    
    return {
        "hits": cache_hits,
        "misses": cache_misses,
        "total_requests": total_requests,
        "hit_ratio": f"{hit_ratio:.2f}%",
        "uptime_seconds": uptime,
        "cached_data_size_kb": cached_data_size / 1024,
        "compressed_data_size_kb": compressed_data_size / 1024,
        "compression_savings": f"{compression_savings:.2f}%"
    }

def reset_cache_metrics():
    """Reset all cache metrics counters"""
    global cache_hits, cache_misses, start_time, cached_data_size, compressed_data_size
    cache_hits = 0
    cache_misses = 0
    start_time = time.time()
    cached_data_size = 0
    compressed_data_size = 0
    return {"message": "Cache metrics reset"}