Spaces:
Paused
Paused
File size: 5,621 Bytes
12ed0dc 20adca1 12ed0dc 20adca1 1f8ac0c 12ed0dc 1f8ac0c 20adca1 4d3ce85 20adca1 1f8ac0c 20adca1 12ed0dc 1f8ac0c 12ed0dc 20adca1 1f8ac0c 20adca1 4d3ce85 12ed0dc 20adca1 1f8ac0c 12ed0dc 20adca1 4d3ce85 12ed0dc 20adca1 1f8ac0c 20adca1 12ed0dc 20adca1 12ed0dc 20adca1 4d3ce85 20adca1 12ed0dc 1f8ac0c 12ed0dc 1f8ac0c 12ed0dc 1f8ac0c 12ed0dc 1f8ac0c 20adca1 b70ff07 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
from typing import Optional, Any, Dict, Callable, TypeVar
import json
import inspect
import redis.asyncio as redis
from ..core.config import settings
from ..utils.logger import logger
T = TypeVar('T')
class RedisCache:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(RedisCache, cls).__new__(cls)
cls._instance.is_connected = False
cls._instance.fallback_cache = {}
cls._instance.redis = redis.Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
password=settings.REDIS_PASSWORD,
username=settings.REDIS_USERNAME,
decode_responses=True
)
return cls._instance
async def initialize(self):
"""Initialize Redis connection with fallback to dummy cache"""
try:
await self.redis.ping()
self.is_connected = True
logger.info("Redis cache initialized successfully")
except Exception as e:
self.is_connected = False
self.fallback_cache = {}
logger.warning(f"Redis connection failed, using in-memory fallback: {str(e)}")
async def set_cache(self, key: str, value: Any, expire: int = 3600):
"""Set a cache entry with optional expiration time (default 1 hour)"""
try:
if not self.is_connected:
self.fallback_cache[key] = value
return
await self.redis.set(key, json.dumps(value), ex=expire)
except Exception as e:
logger.error(f"Cache set error: {str(e)}")
self.fallback_cache[key] = value
async def get_cache(self, key: str) -> Optional[Any]:
"""Get a cached value by key"""
try:
if not self.is_connected:
return self.fallback_cache.get(key)
value = await self.redis.get(key)
if value:
return json.loads(value)
except Exception as e:
logger.error(f"Cache get error: {str(e)}")
return self.fallback_cache.get(key)
return None
async def delete_cache(self, key: str) -> bool:
"""Delete a cache entry by key"""
try:
if not self.is_connected:
return bool(self.fallback_cache.pop(key, None))
return bool(await self.redis.delete(key))
except Exception as e:
logger.error(f"Cache delete error: {str(e)}")
return False
async def clear_cache_pattern(self, pattern: str) -> bool:
"""Clear all cache entries matching a pattern"""
try:
if not self.is_connected:
# Basic pattern matching for fallback cache
removed = 0
for key in list(self.fallback_cache.keys()):
if pattern in key:
del self.fallback_cache[key]
removed += 1
return removed > 0
# Get all keys matching pattern
keys = [key async for key in self.redis.scan_iter(pattern)]
if keys:
await self.redis.delete(*keys)
return bool(keys)
except Exception as e:
logger.error(f"Cache pattern clear error: {str(e)}")
return False
async def check_connection(self) -> bool:
"""Check if Redis connection is alive"""
try:
await self.redis.ping()
self.is_connected = True
return True
except Exception:
self.is_connected = False
return False
async def cleanup_expired(self):
"""Clean up expired cache entries"""
# Redis automatically handles expiration, only need to clean fallback
if not self.is_connected:
self.fallback_cache = {}
def cached(ttl_seconds: int):
"""Decorator to cache function results"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
async def async_wrapper(*args, **kwargs) -> T:
# Create cache key from function name and arguments
key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
# Try to get from cache first
cached_value = await cache.get_cache(key)
if cached_value is not None:
return cached_value
# If not in cache, execute function
result = await func(*args, **kwargs)
# Cache the result
await cache.set_cache(key, result, expire=ttl_seconds)
return result
def sync_wrapper(*args, **kwargs) -> T:
# Create cache key from function name and arguments
key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
# For sync functions, we can't use async cache directly
# So we use the fallback cache
if key in cache.fallback_cache:
return cache.fallback_cache[key]
# If not in cache, execute function
result = func(*args, **kwargs)
# Cache the result in fallback
cache.fallback_cache[key] = result
return result
return async_wrapper if inspect.iscoroutinefunction(func) else sync_wrapper
return decorator
cache = RedisCache() |