Spaces:
Runtime error
Runtime error
| """ | |
| Redis connection for Notification microservice. | |
| Provides both sync cache and async pubsub for the worker queue. | |
| """ | |
| import redis | |
| from typing import Optional, Any | |
| import json | |
| from app.core.config import settings | |
| from app.core.logging import get_logger | |
| logger = get_logger(__name__) | |
| _redis_client: Optional[redis.Redis] = None | |
| def get_redis() -> Optional[redis.Redis]: | |
| global _redis_client | |
| if _redis_client is None: | |
| pool_params = { | |
| "host": settings.REDIS_HOST, | |
| "port": settings.REDIS_PORT, | |
| "db": settings.REDIS_DB, | |
| "decode_responses": True, | |
| "max_connections": 20, | |
| "socket_keepalive": True, | |
| "socket_connect_timeout": 5, | |
| "retry_on_timeout": True, | |
| "health_check_interval": 30, | |
| } | |
| if settings.REDIS_PASSWORD: | |
| pool_params["password"] = settings.REDIS_PASSWORD | |
| pool = redis.ConnectionPool(**pool_params) | |
| _redis_client = redis.Redis(connection_pool=pool) | |
| logger.info( | |
| "Redis connected", | |
| extra={ | |
| "event": "redis_connected", | |
| "redis_host": settings.REDIS_HOST, | |
| "redis_port": settings.REDIS_PORT, | |
| "redis_db": settings.REDIS_DB, | |
| }, | |
| ) | |
| return _redis_client | |
| class CacheService: | |
| def __init__(self): | |
| self._client: Optional[redis.Redis] = None | |
| def client(self) -> redis.Redis: | |
| if self._client is None: | |
| self._client = get_redis() | |
| return self._client | |
| async def set(self, key: str, value: Any, ttl: int = 300) -> bool: | |
| try: | |
| v = json.dumps(value) if not isinstance(value, str) else value | |
| return self.client.setex(key, ttl, v) | |
| except Exception: | |
| logger.error("Cache set failed", exc_info=True, extra={"event": "cache_set_failure", "key": key}) | |
| return False | |
| async def get(self, key: str) -> Optional[Any]: | |
| try: | |
| value = self.client.get(key) | |
| if value: | |
| try: | |
| return json.loads(value) | |
| except json.JSONDecodeError: | |
| return value | |
| return None | |
| except Exception: | |
| logger.error("Cache get failed", exc_info=True, extra={"event": "cache_get_failure", "key": key}) | |
| return None | |
| async def delete(self, key: str) -> bool: | |
| try: | |
| return self.client.delete(key) > 0 | |
| except Exception: | |
| logger.error( | |
| "Cache delete failed", | |
| exc_info=True, | |
| extra={"event": "cache_delete_failure", "key": key}, | |
| ) | |
| return False | |
| cache_service = CacheService() | |