tlong-ds commited on
Commit
ab7ee83
·
1 Parent(s): 86f0645
services/config/valkey_config.py CHANGED
@@ -7,15 +7,59 @@ load_dotenv()
7
  # Valkey Configuration
8
  VALKEY_HOST = os.getenv('VALKEY_HOST', os.getenv('REDIS_HOST', 'localhost'))
9
  VALKEY_PORT = int(os.getenv('VALKEY_PORT', os.getenv('REDIS_PORT', 6379)))
10
- VALKEY_USER = os.getenv('VALKEY_USER', os.getenv('REDIS_USER', 'Default'))
11
  VALKEY_PASSWORD = os.getenv('VALKEY_PASSWORD', os.getenv('REDIS_PASSWORD', None))
12
  VALKEY_DB = int(os.getenv('VALKEY_DB', os.getenv('REDIS_DB', 0)))
13
 
14
- VALKEY_URL = f"rediss://{VALKEY_USER}:{VALKEY_PASSWORD}@{VALKEY_HOST}:{VALKEY_PORT}"
 
 
15
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
 
17
  # Maintain backward compatibility - alias for existing code
18
- valkey_client = valkey.from_url(VALKEY_URL)
19
  redis_client = valkey_client
20
 
21
  def get_redis_client():
@@ -26,12 +70,22 @@ def get_valkey_client():
26
  """Returns the Valkey client instance"""
27
  return valkey_client
28
 
 
 
 
 
29
  # Test Valkey connection
30
  def test_connection():
 
 
 
 
31
  try:
32
  valkey_client.ping()
 
33
  print("Successfully connected to Valkey")
34
  return True
35
- except valkey.ConnectionError:
36
- print("Failed to connect to Valkey")
 
37
  return False
 
7
  # Valkey Configuration
8
  VALKEY_HOST = os.getenv('VALKEY_HOST', os.getenv('REDIS_HOST', 'localhost'))
9
  VALKEY_PORT = int(os.getenv('VALKEY_PORT', os.getenv('REDIS_PORT', 6379)))
10
+ VALKEY_USER = os.getenv('VALKEY_USER', os.getenv('REDIS_USER', 'default'))
11
  VALKEY_PASSWORD = os.getenv('VALKEY_PASSWORD', os.getenv('REDIS_PASSWORD', None))
12
  VALKEY_DB = int(os.getenv('VALKEY_DB', os.getenv('REDIS_DB', 0)))
13
 
14
+ # Try to create Valkey client with fallback options
15
+ valkey_client = None
16
+ connection_available = False
17
 
18
+ def create_valkey_client():
19
+ """Create Valkey client with fallback connection options"""
20
+ global valkey_client, connection_available
21
+
22
+ try:
23
+ # First try with SSL if password is provided
24
+ if VALKEY_PASSWORD:
25
+ valkey_client = valkey.Valkey(
26
+ host=VALKEY_HOST,
27
+ port=VALKEY_PORT,
28
+ username=VALKEY_USER,
29
+ password=VALKEY_PASSWORD,
30
+ db=VALKEY_DB,
31
+ decode_responses=True,
32
+ ssl=True,
33
+ socket_timeout=5,
34
+ socket_connect_timeout=5
35
+ )
36
+ else:
37
+ # Try without SSL for local development
38
+ valkey_client = valkey.Valkey(
39
+ host=VALKEY_HOST,
40
+ port=VALKEY_PORT,
41
+ db=VALKEY_DB,
42
+ decode_responses=True,
43
+ ssl=False,
44
+ socket_timeout=5,
45
+ socket_connect_timeout=5
46
+ )
47
+
48
+ # Test the connection
49
+ valkey_client.ping()
50
+ connection_available = True
51
+ print("✅ Successfully connected to Valkey")
52
+
53
+ except Exception as e:
54
+ print(f"⚠️ Valkey connection failed: {e}")
55
+ print("🔄 Falling back to non-cached operations")
56
+ connection_available = False
57
+ valkey_client = None
58
+
59
+ # Initialize the client
60
+ create_valkey_client()
61
 
62
  # Maintain backward compatibility - alias for existing code
 
63
  redis_client = valkey_client
64
 
65
  def get_redis_client():
 
70
  """Returns the Valkey client instance"""
71
  return valkey_client
72
 
73
+ def is_connection_available():
74
+ """Check if Valkey connection is available"""
75
+ return connection_available
76
+
77
  # Test Valkey connection
78
  def test_connection():
79
+ global connection_available
80
+ if not valkey_client:
81
+ return False
82
+
83
  try:
84
  valkey_client.ping()
85
+ connection_available = True
86
  print("Successfully connected to Valkey")
87
  return True
88
+ except Exception as e:
89
+ connection_available = False
90
+ print(f"Failed to connect to Valkey: {e}")
91
  return False
services/utils/api_cache.py CHANGED
@@ -3,7 +3,7 @@ import json
3
  import zlib
4
  import base64
5
  import time
6
- from services.config.valkey_config import get_redis_client
7
 
8
  redis_client = get_redis_client()
9
 
@@ -29,49 +29,75 @@ async def get_cached_data(key, db_fetch_func, ttl=3600, use_compression=False):
29
  """
30
  global cache_hits, cache_misses, cached_data_size, compressed_data_size
31
 
32
- # Check if data is in cache
33
- cached_data = redis_client.get(key)
34
- if cached_data:
35
- # Increment hit counter
36
- cache_hits += 1
37
- print(f"Cache HIT: {key}")
38
-
39
- # Check if data is compressed (starts with special prefix)
40
- if isinstance(cached_data, bytes) and cached_data.startswith(b'COMPRESSED:'):
41
- # Remove prefix and decompress
42
- compressed_data = base64.b64decode(cached_data[11:])
43
- decompressed_data = zlib.decompress(compressed_data)
44
- return json.loads(decompressed_data.decode('utf-8'))
45
- else:
46
- # Regular non-compressed data
47
- return json.loads(cached_data)
48
-
49
- # Increment miss counter
50
- cache_misses += 1
51
- print(f"Cache MISS: {key}")
52
-
53
- # Fetch data from database
54
- data = await db_fetch_func()
55
-
56
- # Serialize the data
57
- serialized_data = json.dumps(data)
58
- serialized_bytes = serialized_data.encode('utf-8')
59
 
60
- # Track original size
61
- original_size = len(serialized_bytes)
62
- cached_data_size += original_size
63
-
64
- # Decide whether to compress based on size and flag
65
- if use_compression and original_size > COMPRESSION_THRESHOLD:
66
- # Compress data
67
- compressed_data = zlib.compress(serialized_bytes)
68
- encoded_data = base64.b64encode(compressed_data)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
 
70
- # Store with a prefix to indicate compression
71
- redis_value = b'COMPRESSED:' + encoded_data
 
72
 
73
- # Track compressed size
74
- compressed_size = len(redis_value)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
  compressed_data_size += compressed_size
76
 
77
  # Calculate compression ratio
@@ -88,21 +114,35 @@ async def get_cached_data(key, db_fetch_func, ttl=3600, use_compression=False):
88
 
89
  def invalidate_cache(keys):
90
  """Delete multiple cache keys"""
91
- if keys:
92
- redis_client.delete(*keys)
 
 
 
 
 
 
 
93
 
94
  def invalidate_cache_pattern(pattern):
95
  """
96
  Delete all cache keys matching a pattern
97
  For example: 'user:profile:*' to delete all user profiles
98
  """
99
- cursor = 0
100
- while True:
101
- cursor, keys = redis_client.scan(cursor, match=pattern, count=100)
102
- if keys:
103
- redis_client.delete(*keys)
104
- if cursor == 0:
105
- break
 
 
 
 
 
 
 
106
 
107
  def get_cache_metrics():
108
  """Get cache hit/miss metrics and efficiency statistics"""
 
3
  import zlib
4
  import base64
5
  import time
6
+ from services.config.valkey_config import get_redis_client, is_connection_available
7
 
8
  redis_client = get_redis_client()
9
 
 
29
  """
30
  global cache_hits, cache_misses, cached_data_size, compressed_data_size
31
 
32
+ # If Valkey is not available, fetch directly from database
33
+ if not is_connection_available() or not redis_client:
34
+ print(f"Cache DISABLED: {key} - fetching from database")
35
+ cache_misses += 1
36
+ return await db_fetch_func()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37
 
38
+ try:
39
+ # Check if data is in cache
40
+ cached_data = redis_client.get(key)
41
+ if cached_data:
42
+ # Increment hit counter
43
+ cache_hits += 1
44
+ print(f"Cache HIT: {key}")
45
+
46
+ # Check if data is compressed (starts with special prefix)
47
+ if isinstance(cached_data, bytes) and cached_data.startswith(b'COMPRESSED:'):
48
+ # Remove prefix and decompress
49
+ compressed_data = base64.b64decode(cached_data[11:])
50
+ decompressed_data = zlib.decompress(compressed_data)
51
+ return json.loads(decompressed_data.decode('utf-8'))
52
+ else:
53
+ # Regular non-compressed data
54
+ return json.loads(cached_data)
55
+
56
+ # Increment miss counter
57
+ cache_misses += 1
58
+ print(f"Cache MISS: {key}")
59
+
60
+ # Fetch data from database
61
+ data = await db_fetch_func()
62
+
63
+ # Serialize the data
64
+ serialized_data = json.dumps(data)
65
+ serialized_bytes = serialized_data.encode('utf-8')
66
 
67
+ # Track original size
68
+ original_size = len(serialized_bytes)
69
+ cached_data_size += original_size
70
 
71
+ # Decide whether to compress based on size and flag
72
+ if use_compression and original_size > COMPRESSION_THRESHOLD:
73
+ # Compress data
74
+ compressed_data = zlib.compress(serialized_bytes)
75
+ encoded_data = base64.b64encode(compressed_data)
76
+
77
+ # Store with a prefix to indicate compression
78
+ redis_value = b'COMPRESSED:' + encoded_data
79
+
80
+ # Track compressed size
81
+ compressed_size = len(redis_value)
82
+ compressed_data_size += compressed_size
83
+
84
+ # Calculate compression ratio
85
+ ratio = (compressed_size / original_size) * 100
86
+ print(f"Compressed {key}: {original_size} -> {compressed_size} bytes ({ratio:.2f}%)")
87
+
88
+ # Store compressed data in Valkey
89
+ redis_client.setex(key, ttl, redis_value)
90
+ else:
91
+ # Store regular data
92
+ redis_client.setex(key, ttl, serialized_data)
93
+
94
+ return data
95
+
96
+ except Exception as e:
97
+ print(f"Cache ERROR for {key}: {e}")
98
+ print("Falling back to database")
99
+ cache_misses += 1
100
+ return await db_fetch_func()
101
  compressed_data_size += compressed_size
102
 
103
  # Calculate compression ratio
 
114
 
115
  def invalidate_cache(keys):
116
  """Delete multiple cache keys"""
117
+ if not is_connection_available() or not redis_client:
118
+ print("Cache DISABLED: Cannot invalidate cache keys")
119
+ return
120
+
121
+ try:
122
+ if keys:
123
+ redis_client.delete(*keys)
124
+ except Exception as e:
125
+ print(f"Cache invalidation ERROR: {e}")
126
 
127
  def invalidate_cache_pattern(pattern):
128
  """
129
  Delete all cache keys matching a pattern
130
  For example: 'user:profile:*' to delete all user profiles
131
  """
132
+ if not is_connection_available() or not redis_client:
133
+ print("Cache DISABLED: Cannot invalidate cache pattern")
134
+ return
135
+
136
+ try:
137
+ cursor = 0
138
+ while True:
139
+ cursor, keys = redis_client.scan(cursor, match=pattern, count=100)
140
+ if keys:
141
+ redis_client.delete(*keys)
142
+ if cursor == 0:
143
+ break
144
+ except Exception as e:
145
+ print(f"Cache pattern invalidation ERROR: {e}")
146
 
147
  def get_cache_metrics():
148
  """Get cache hit/miss metrics and efficiency statistics"""
services/utils/cache_utils.py CHANGED
@@ -1,6 +1,6 @@
1
  import json
2
  from functools import wraps
3
- from services.config.valkey_config import get_redis_client
4
  import time
5
 
6
  redis_client = get_redis_client()
@@ -16,25 +16,37 @@ def cache_data(key_prefix, expire_time=3600):
16
  def decorator(func):
17
  @wraps(func)
18
  async def wrapper(*args, **kwargs):
19
- # Create a unique key based on the function arguments
20
- cache_key = f"{key_prefix}:{str(args)}:{str(kwargs)}"
 
 
21
 
22
- # Try to get the cached result
23
- cached_result = redis_client.get(cache_key)
24
- if cached_result:
25
- return json.loads(cached_result)
26
-
27
- # If not cached, execute the function
28
- result = await func(*args, **kwargs)
29
-
30
- # Cache the result
31
- redis_client.setex(
32
- cache_key,
33
- expire_time,
34
- json.dumps(result)
35
- )
36
-
37
- return result
 
 
 
 
 
 
 
 
 
 
38
  return wrapper
39
  return decorator
40
 
@@ -42,13 +54,21 @@ def clear_cache(pattern="*"):
42
  """
43
  Clear cache entries matching the given pattern
44
  """
45
- cursor = 0
46
- while True:
47
- cursor, keys = redis_client.scan(cursor, match=pattern)
48
- if keys:
49
- redis_client.delete(*keys)
50
- if cursor == 0:
51
- break
 
 
 
 
 
 
 
 
52
 
53
  def cache_with_fallback(func):
54
  """
@@ -57,14 +77,21 @@ def cache_with_fallback(func):
57
  """
58
  @wraps(func)
59
  async def wrapper(*args, **kwargs):
 
 
 
 
 
60
  cache_key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
61
  try:
62
  # Try to get from cache
63
  cached_result = redis_client.get(cache_key)
64
  if cached_result:
 
65
  return json.loads(cached_result)
66
 
67
  # If not in cache, execute function
 
68
  result = await func(*args, **kwargs)
69
 
70
  # Cache the result
@@ -76,6 +103,6 @@ def cache_with_fallback(func):
76
  return result
77
  except Exception as e:
78
  # If Valkey fails, execute function directly
79
- print(f"Cache error: {str(e)}, executing function directly")
80
  return await func(*args, **kwargs)
81
  return wrapper
 
1
  import json
2
  from functools import wraps
3
+ from services.config.valkey_config import get_redis_client, is_connection_available
4
  import time
5
 
6
  redis_client = get_redis_client()
 
16
  def decorator(func):
17
  @wraps(func)
18
  async def wrapper(*args, **kwargs):
19
+ # If Valkey is not available, execute function directly
20
+ if not is_connection_available() or not redis_client:
21
+ print(f"Cache DISABLED for {key_prefix} - executing function directly")
22
+ return await func(*args, **kwargs)
23
 
24
+ try:
25
+ # Create a unique key based on the function arguments
26
+ cache_key = f"{key_prefix}:{str(args)}:{str(kwargs)}"
27
+
28
+ # Try to get the cached result
29
+ cached_result = redis_client.get(cache_key)
30
+ if cached_result:
31
+ print(f"Cache HIT: {cache_key}")
32
+ return json.loads(cached_result)
33
+
34
+ # If not cached, execute the function
35
+ print(f"Cache MISS: {cache_key}")
36
+ result = await func(*args, **kwargs)
37
+
38
+ # Cache the result
39
+ redis_client.setex(
40
+ cache_key,
41
+ expire_time,
42
+ json.dumps(result)
43
+ )
44
+
45
+ return result
46
+ except Exception as e:
47
+ print(f"Cache ERROR for {key_prefix}: {e}")
48
+ print("Falling back to direct function execution")
49
+ return await func(*args, **kwargs)
50
  return wrapper
51
  return decorator
52
 
 
54
  """
55
  Clear cache entries matching the given pattern
56
  """
57
+ if not is_connection_available() or not redis_client:
58
+ print("Cache DISABLED: Cannot clear cache")
59
+ return
60
+
61
+ try:
62
+ cursor = 0
63
+ while True:
64
+ cursor, keys = redis_client.scan(cursor, match=pattern)
65
+ if keys:
66
+ redis_client.delete(*keys)
67
+ if cursor == 0:
68
+ break
69
+ print(f"Cache cleared for pattern: {pattern}")
70
+ except Exception as e:
71
+ print(f"Cache clear ERROR: {e}")
72
 
73
  def cache_with_fallback(func):
74
  """
 
77
  """
78
  @wraps(func)
79
  async def wrapper(*args, **kwargs):
80
+ # If Valkey is not available, execute function directly
81
+ if not is_connection_available() or not redis_client:
82
+ print(f"Cache DISABLED for {func.__name__} - executing function directly")
83
+ return await func(*args, **kwargs)
84
+
85
  cache_key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
86
  try:
87
  # Try to get from cache
88
  cached_result = redis_client.get(cache_key)
89
  if cached_result:
90
+ print(f"Cache HIT: {cache_key}")
91
  return json.loads(cached_result)
92
 
93
  # If not in cache, execute function
94
+ print(f"Cache MISS: {cache_key}")
95
  result = await func(*args, **kwargs)
96
 
97
  # Cache the result
 
103
  return result
104
  except Exception as e:
105
  # If Valkey fails, execute function directly
106
+ print(f"Cache error for {func.__name__}: {str(e)}, executing function directly")
107
  return await func(*args, **kwargs)
108
  return wrapper
services/utils/chat_cache.py CHANGED
@@ -1,4 +1,4 @@
1
- from services.config.valkey_config import get_redis_client
2
  import json
3
 
4
  redis_client = get_redis_client()
@@ -6,29 +6,52 @@ CHAT_HISTORY_TTL = 3600 # 1 hour session expiry
6
 
7
  def get_chat_history(username: str) -> list:
8
  """Get cached chat history for a user"""
9
- key = f"chat:history:{username}"
10
- history = redis_client.get(key)
11
- if history:
12
- try:
13
- parsed = json.loads(history)
14
- print(f"Retrieved chat history for {username}: {len(parsed)} messages")
15
- return parsed
16
- except Exception as e:
17
- print(f"Error parsing chat history: {str(e)}")
18
- return []
19
- return []
 
 
 
 
 
 
 
 
20
 
21
  def append_chat_message(username: str, message: str, is_user: bool):
22
  """Add a new message to user's chat history"""
23
- key = f"chat:history:{username}"
24
- history = get_chat_history(username)
25
- history.append({
26
- "content": message,
27
- "is_user": is_user
28
- })
29
- redis_client.setex(key, CHAT_HISTORY_TTL, json.dumps(history))
 
 
 
 
 
 
 
30
 
31
  def clear_user_chat_history(username: str):
32
  """Clear chat history for a user"""
33
- key = f"chat:history:{username}"
34
- redis_client.delete(key)
 
 
 
 
 
 
 
 
 
1
+ from services.config.valkey_config import get_redis_client, is_connection_available
2
  import json
3
 
4
  redis_client = get_redis_client()
 
6
 
7
  def get_chat_history(username: str) -> list:
8
  """Get cached chat history for a user"""
9
+ if not is_connection_available() or not redis_client:
10
+ print(f"Chat cache DISABLED for {username} - returning empty history")
11
+ return []
12
+
13
+ try:
14
+ key = f"chat:history:{username}"
15
+ history = redis_client.get(key)
16
+ if history:
17
+ try:
18
+ parsed = json.loads(history)
19
+ print(f"Retrieved chat history for {username}: {len(parsed)} messages")
20
+ return parsed
21
+ except Exception as e:
22
+ print(f"Error parsing chat history: {str(e)}")
23
+ return []
24
+ return []
25
+ except Exception as e:
26
+ print(f"Chat cache ERROR for {username}: {e}")
27
+ return []
28
 
29
  def append_chat_message(username: str, message: str, is_user: bool):
30
  """Add a new message to user's chat history"""
31
+ if not is_connection_available() or not redis_client:
32
+ print(f"Chat cache DISABLED for {username} - cannot save message")
33
+ return
34
+
35
+ try:
36
+ key = f"chat:history:{username}"
37
+ history = get_chat_history(username)
38
+ history.append({
39
+ "content": message,
40
+ "is_user": is_user
41
+ })
42
+ redis_client.setex(key, CHAT_HISTORY_TTL, json.dumps(history))
43
+ except Exception as e:
44
+ print(f"Chat cache ERROR saving message for {username}: {e}")
45
 
46
  def clear_user_chat_history(username: str):
47
  """Clear chat history for a user"""
48
+ if not is_connection_available() or not redis_client:
49
+ print(f"Chat cache DISABLED for {username} - cannot clear history")
50
+ return
51
+
52
+ try:
53
+ key = f"chat:history:{username}"
54
+ redis_client.delete(key)
55
+ print(f"Cleared chat history for {username}")
56
+ except Exception as e:
57
+ print(f"Chat cache ERROR clearing history for {username}: {e}")