tillu-daemon / app /utils /cache.py
tillu-AI's picture
upload app/utils/cache.py
896b3f4 verified
"""
Cache manager for Redis operations
"""
import json
from typing import Any, Optional, List
from datetime import timedelta
import redis.asyncio as redis
from app.config import settings
from app.utils.logging import get_logger
logger = get_logger("cache")
class CacheManager:
"""Redis cache manager with async support"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._redis = None
return cls._instance
async def connect(self):
"""Initialize Redis connection"""
if self._redis is None:
try:
self._redis = await redis.from_url(
settings.redis_url,
encoding="utf-8",
decode_responses=True
)
await self._redis.ping()
logger.info("Redis connection established")
except Exception as e:
logger.error("Failed to connect to Redis", error=str(e))
raise
async def disconnect(self):
"""Close Redis connection"""
if self._redis:
await self._redis.close()
self._redis = None
logger.info("Redis connection closed")
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
try:
value = await self._redis.get(key)
if value:
return json.loads(value)
return None
except Exception as e:
logger.error("Cache get error", key=key, error=str(e))
return None
async def set(
self,
key: str,
value: Any,
ttl: Optional[int] = None
) -> bool:
"""Set value in cache with optional TTL"""
try:
serialized = json.dumps(value, default=str)
if ttl:
await self._redis.setex(key, ttl, serialized)
else:
await self._redis.set(key, serialized)
return True
except Exception as e:
logger.error("Cache set error", key=key, error=str(e))
return False
async def delete(self, key: str) -> bool:
"""Delete key from cache"""
try:
await self._redis.delete(key)
return True
except Exception as e:
logger.error("Cache delete error", key=key, error=str(e))
return False
async def exists(self, key: str) -> bool:
"""Check if key exists"""
try:
return await self._redis.exists(key) > 0
except Exception as e:
logger.error("Cache exists error", key=key, error=str(e))
return False
async def ttl(self, key: str) -> int:
"""Get TTL of key"""
try:
return await self._redis.ttl(key)
except Exception as e:
logger.error("Cache ttl error", key=key, error=str(e))
return -1
async def publish(self, channel: str, message: Any) -> bool:
"""Publish message to Redis channel"""
try:
serialized = json.dumps(message, default=str) if not isinstance(message, str) else message
await self._redis.publish(channel, serialized)
return True
except Exception as e:
logger.error("Cache publish error", channel=channel, error=str(e))
return False
async def subscribe(self, channel: str):
"""Subscribe to Redis channel"""
try:
pubsub = self._redis.pubsub()
await pubsub.subscribe(channel)
return pubsub
except Exception as e:
logger.error("Cache subscribe error", channel=channel, error=str(e))
return None
async def lpush(self, key: str, value: Any) -> bool:
"""Push value to list head"""
try:
serialized = json.dumps(value, default=str)
await self._redis.lpush(key, serialized)
return True
except Exception as e:
logger.error("Cache lpush error", key=key, error=str(e))
return False
async def rpop(self, key: str) -> Optional[Any]:
"""Pop value from list tail"""
try:
value = await self._redis.rpop(key)
if value:
return json.loads(value)
return None
except Exception as e:
logger.error("Cache rpop error", key=key, error=str(e))
return None
async def llen(self, key: str) -> int:
"""Get list length"""
try:
return await self._redis.llen(key)
except Exception as e:
logger.error("Cache llen error", key=key, error=str(e))
return 0
async def zadd(self, key: str, score: float, member: Any) -> bool:
"""Add member to sorted set"""
try:
serialized = json.dumps(member, default=str) if not isinstance(member, str) else member
await self._redis.zadd(key, {serialized: score})
return True
except Exception as e:
logger.error("Cache zadd error", key=key, error=str(e))
return False
async def zrange(self, key: str, start: int, end: int) -> List[Any]:
"""Get range from sorted set"""
try:
values = await self._redis.zrange(key, start, end)
return [json.loads(v) for v in values]
except Exception as e:
logger.error("Cache zrange error", key=key, error=str(e))
return []
# Global cache instance
cache = CacheManager()