Spaces:
Running
Running
| """ | |
| Cache manager for Redis operations | |
| """ | |
| import json | |
| from typing import Any, Optional, List | |
| from datetime import timedelta | |
| import redis.asyncio as redis | |
| from app.config import settings | |
| from app.utils.logging import get_logger | |
| logger = get_logger("cache") | |
| class CacheManager: | |
| """Redis cache manager with async support""" | |
| _instance = None | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| cls._instance._redis = None | |
| return cls._instance | |
| async def connect(self): | |
| """Initialize Redis connection""" | |
| if self._redis is None: | |
| try: | |
| self._redis = await redis.from_url( | |
| settings.redis_url, | |
| encoding="utf-8", | |
| decode_responses=True | |
| ) | |
| await self._redis.ping() | |
| logger.info("Redis connection established") | |
| except Exception as e: | |
| logger.error("Failed to connect to Redis", error=str(e)) | |
| raise | |
| async def disconnect(self): | |
| """Close Redis connection""" | |
| if self._redis: | |
| await self._redis.close() | |
| self._redis = None | |
| logger.info("Redis connection closed") | |
| async def get(self, key: str) -> Optional[Any]: | |
| """Get value from cache""" | |
| try: | |
| value = await self._redis.get(key) | |
| if value: | |
| return json.loads(value) | |
| return None | |
| except Exception as e: | |
| logger.error("Cache get error", key=key, error=str(e)) | |
| return None | |
| async def set( | |
| self, | |
| key: str, | |
| value: Any, | |
| ttl: Optional[int] = None | |
| ) -> bool: | |
| """Set value in cache with optional TTL""" | |
| try: | |
| serialized = json.dumps(value, default=str) | |
| if ttl: | |
| await self._redis.setex(key, ttl, serialized) | |
| else: | |
| await self._redis.set(key, serialized) | |
| return True | |
| except Exception as e: | |
| logger.error("Cache set error", key=key, error=str(e)) | |
| return False | |
| async def delete(self, key: str) -> bool: | |
| """Delete key from cache""" | |
| try: | |
| await self._redis.delete(key) | |
| return True | |
| except Exception as e: | |
| logger.error("Cache delete error", key=key, error=str(e)) | |
| return False | |
| async def exists(self, key: str) -> bool: | |
| """Check if key exists""" | |
| try: | |
| return await self._redis.exists(key) > 0 | |
| except Exception as e: | |
| logger.error("Cache exists error", key=key, error=str(e)) | |
| return False | |
| async def ttl(self, key: str) -> int: | |
| """Get TTL of key""" | |
| try: | |
| return await self._redis.ttl(key) | |
| except Exception as e: | |
| logger.error("Cache ttl error", key=key, error=str(e)) | |
| return -1 | |
| async def publish(self, channel: str, message: Any) -> bool: | |
| """Publish message to Redis channel""" | |
| try: | |
| serialized = json.dumps(message, default=str) if not isinstance(message, str) else message | |
| await self._redis.publish(channel, serialized) | |
| return True | |
| except Exception as e: | |
| logger.error("Cache publish error", channel=channel, error=str(e)) | |
| return False | |
| async def subscribe(self, channel: str): | |
| """Subscribe to Redis channel""" | |
| try: | |
| pubsub = self._redis.pubsub() | |
| await pubsub.subscribe(channel) | |
| return pubsub | |
| except Exception as e: | |
| logger.error("Cache subscribe error", channel=channel, error=str(e)) | |
| return None | |
| async def lpush(self, key: str, value: Any) -> bool: | |
| """Push value to list head""" | |
| try: | |
| serialized = json.dumps(value, default=str) | |
| await self._redis.lpush(key, serialized) | |
| return True | |
| except Exception as e: | |
| logger.error("Cache lpush error", key=key, error=str(e)) | |
| return False | |
| async def rpop(self, key: str) -> Optional[Any]: | |
| """Pop value from list tail""" | |
| try: | |
| value = await self._redis.rpop(key) | |
| if value: | |
| return json.loads(value) | |
| return None | |
| except Exception as e: | |
| logger.error("Cache rpop error", key=key, error=str(e)) | |
| return None | |
| async def llen(self, key: str) -> int: | |
| """Get list length""" | |
| try: | |
| return await self._redis.llen(key) | |
| except Exception as e: | |
| logger.error("Cache llen error", key=key, error=str(e)) | |
| return 0 | |
| async def zadd(self, key: str, score: float, member: Any) -> bool: | |
| """Add member to sorted set""" | |
| try: | |
| serialized = json.dumps(member, default=str) if not isinstance(member, str) else member | |
| await self._redis.zadd(key, {serialized: score}) | |
| return True | |
| except Exception as e: | |
| logger.error("Cache zadd error", key=key, error=str(e)) | |
| return False | |
| async def zrange(self, key: str, start: int, end: int) -> List[Any]: | |
| """Get range from sorted set""" | |
| try: | |
| values = await self._redis.zrange(key, start, end) | |
| return [json.loads(v) for v in values] | |
| except Exception as e: | |
| logger.error("Cache zrange error", key=key, error=str(e)) | |
| return [] | |
| # Global cache instance | |
| cache = CacheManager() | |