MukeshKapoor25's picture
refactor(notification-ms): enhance structured logging across all channels
47fd0cd
"""
Redis connection for Notification microservice.
Provides both sync cache and async pubsub for the worker queue.
"""
import redis
from typing import Optional, Any
import json
from app.core.config import settings
from app.core.logging import get_logger
logger = get_logger(__name__)
_redis_client: Optional[redis.Redis] = None
def get_redis() -> Optional[redis.Redis]:
global _redis_client
if _redis_client is None:
pool_params = {
"host": settings.REDIS_HOST,
"port": settings.REDIS_PORT,
"db": settings.REDIS_DB,
"decode_responses": True,
"max_connections": 20,
"socket_keepalive": True,
"socket_connect_timeout": 5,
"retry_on_timeout": True,
"health_check_interval": 30,
}
if settings.REDIS_PASSWORD:
pool_params["password"] = settings.REDIS_PASSWORD
pool = redis.ConnectionPool(**pool_params)
_redis_client = redis.Redis(connection_pool=pool)
logger.info(
"Redis connected",
extra={
"event": "redis_connected",
"redis_host": settings.REDIS_HOST,
"redis_port": settings.REDIS_PORT,
"redis_db": settings.REDIS_DB,
},
)
return _redis_client
class CacheService:
def __init__(self):
self._client: Optional[redis.Redis] = None
@property
def client(self) -> redis.Redis:
if self._client is None:
self._client = get_redis()
return self._client
async def set(self, key: str, value: Any, ttl: int = 300) -> bool:
try:
v = json.dumps(value) if not isinstance(value, str) else value
return self.client.setex(key, ttl, v)
except Exception:
logger.error("Cache set failed", exc_info=True, extra={"event": "cache_set_failure", "key": key})
return False
async def get(self, key: str) -> Optional[Any]:
try:
value = self.client.get(key)
if value:
try:
return json.loads(value)
except json.JSONDecodeError:
return value
return None
except Exception:
logger.error("Cache get failed", exc_info=True, extra={"event": "cache_get_failure", "key": key})
return None
async def delete(self, key: str) -> bool:
try:
return self.client.delete(key) > 0
except Exception:
logger.error(
"Cache delete failed",
exc_info=True,
extra={"event": "cache_delete_failure", "key": key},
)
return False
cache_service = CacheService()