Admin-Desk / app /utils /cache.py
Fred808's picture
Upload 54 files
a3b84cc verified
import redis
import json
from ..core.config import settings
from typing import Any, Optional
class RedisCache:
def __init__(self):
self.redis_client = redis.Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
decode_responses=True
)
async def set_cache(self, key: str, value: Any, expire: int = 3600):
"""Set a cache entry with optional expiration time (default 1 hour)"""
try:
self.redis_client.setex(
key,
expire,
json.dumps(value)
)
return True
except Exception as e:
print(f"Cache set error: {str(e)}")
return False
async def get_cache(self, key: str) -> Optional[Any]:
"""Get a cached value by key"""
try:
value = self.redis_client.get(key)
return json.loads(value) if value else None
except Exception as e:
print(f"Cache get error: {str(e)}")
return None
async def delete_cache(self, key: str) -> bool:
"""Delete a cache entry by key"""
try:
return bool(self.redis_client.delete(key))
except Exception as e:
print(f"Cache delete error: {str(e)}")
return False
async def clear_cache_pattern(self, pattern: str) -> bool:
"""Clear all cache entries matching a pattern"""
try:
keys = self.redis_client.keys(pattern)
if keys:
return bool(self.redis_client.delete(*keys))
return True
except Exception as e:
print(f"Cache clear error: {str(e)}")
return False
cache = RedisCache()