File size: 5,684 Bytes
896b3f4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
"""
Cache manager for Redis operations
"""
import json
from typing import Any, Optional, List
from datetime import timedelta
import redis.asyncio as redis
from app.config import settings
from app.utils.logging import get_logger

logger = get_logger("cache")


class CacheManager:
    """Redis cache manager with async support"""
    
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._redis = None
        return cls._instance
    
    async def connect(self):
        """Initialize Redis connection"""
        if self._redis is None:
            try:
                self._redis = await redis.from_url(
                    settings.redis_url,
                    encoding="utf-8",
                    decode_responses=True
                )
                await self._redis.ping()
                logger.info("Redis connection established")
            except Exception as e:
                logger.error("Failed to connect to Redis", error=str(e))
                raise
    
    async def disconnect(self):
        """Close Redis connection"""
        if self._redis:
            await self._redis.close()
            self._redis = None
            logger.info("Redis connection closed")
    
    async def get(self, key: str) -> Optional[Any]:
        """Get value from cache"""
        try:
            value = await self._redis.get(key)
            if value:
                return json.loads(value)
            return None
        except Exception as e:
            logger.error("Cache get error", key=key, error=str(e))
            return None
    
    async def set(
        self,
        key: str,
        value: Any,
        ttl: Optional[int] = None
    ) -> bool:
        """Set value in cache with optional TTL"""
        try:
            serialized = json.dumps(value, default=str)
            if ttl:
                await self._redis.setex(key, ttl, serialized)
            else:
                await self._redis.set(key, serialized)
            return True
        except Exception as e:
            logger.error("Cache set error", key=key, error=str(e))
            return False
    
    async def delete(self, key: str) -> bool:
        """Delete key from cache"""
        try:
            await self._redis.delete(key)
            return True
        except Exception as e:
            logger.error("Cache delete error", key=key, error=str(e))
            return False
    
    async def exists(self, key: str) -> bool:
        """Check if key exists"""
        try:
            return await self._redis.exists(key) > 0
        except Exception as e:
            logger.error("Cache exists error", key=key, error=str(e))
            return False
    
    async def ttl(self, key: str) -> int:
        """Get TTL of key"""
        try:
            return await self._redis.ttl(key)
        except Exception as e:
            logger.error("Cache ttl error", key=key, error=str(e))
            return -1
    
    async def publish(self, channel: str, message: Any) -> bool:
        """Publish message to Redis channel"""
        try:
            serialized = json.dumps(message, default=str) if not isinstance(message, str) else message
            await self._redis.publish(channel, serialized)
            return True
        except Exception as e:
            logger.error("Cache publish error", channel=channel, error=str(e))
            return False
    
    async def subscribe(self, channel: str):
        """Subscribe to Redis channel"""
        try:
            pubsub = self._redis.pubsub()
            await pubsub.subscribe(channel)
            return pubsub
        except Exception as e:
            logger.error("Cache subscribe error", channel=channel, error=str(e))
            return None
    
    async def lpush(self, key: str, value: Any) -> bool:
        """Push value to list head"""
        try:
            serialized = json.dumps(value, default=str)
            await self._redis.lpush(key, serialized)
            return True
        except Exception as e:
            logger.error("Cache lpush error", key=key, error=str(e))
            return False
    
    async def rpop(self, key: str) -> Optional[Any]:
        """Pop value from list tail"""
        try:
            value = await self._redis.rpop(key)
            if value:
                return json.loads(value)
            return None
        except Exception as e:
            logger.error("Cache rpop error", key=key, error=str(e))
            return None
    
    async def llen(self, key: str) -> int:
        """Get list length"""
        try:
            return await self._redis.llen(key)
        except Exception as e:
            logger.error("Cache llen error", key=key, error=str(e))
            return 0
    
    async def zadd(self, key: str, score: float, member: Any) -> bool:
        """Add member to sorted set"""
        try:
            serialized = json.dumps(member, default=str) if not isinstance(member, str) else member
            await self._redis.zadd(key, {serialized: score})
            return True
        except Exception as e:
            logger.error("Cache zadd error", key=key, error=str(e))
            return False
    
    async def zrange(self, key: str, start: int, end: int) -> List[Any]:
        """Get range from sorted set"""
        try:
            values = await self._redis.zrange(key, start, end)
            return [json.loads(v) for v in values]
        except Exception as e:
            logger.error("Cache zrange error", key=key, error=str(e))
            return []


# Global cache instance
cache = CacheManager()