""" Redis connection and cache instance. Provides Redis client for caching and session management. """ import redis.asyncio as redis from insightfy_utils.logging import get_logger from app.core.config import settings logger = get_logger(__name__) # Redis client instance redis_client: redis.Redis = None # Settings mapping CACHE_URL = settings.REDIS_URL CACHE_URI = f"{settings.REDIS_HOST}:{settings.REDIS_PORT}" CACHE_K = settings.REDIS_PASSWORD CACHE_DB = settings.REDIS_DB def create_redis_connection(**kwargs) -> redis.Redis: """Create a Redis client with the provided kwargs.""" return redis.Redis(**kwargs) def _redis_from_settings() -> redis.Redis: """Create Redis client from settings.""" cache_url = (CACHE_URL or "").strip() if cache_url: if cache_url.startswith(("redis://", "rediss://", "unix://")): return redis.from_url( cache_url, decode_responses=True, socket_timeout=5, socket_connect_timeout=5, retry_on_timeout=True, health_check_interval=30, ) logger.warning( "Invalid REDIS_URL scheme; falling back to REDIS_HOST/REDIS_PORT", extra={"redis_url": cache_url}, ) try: host, port_str = CACHE_URI.split(":") port = int(port_str) except Exception: raise ValueError("Invalid CACHE_URI format. Expected 'host:port'.") return create_redis_connection( host=host, port=port, password=CACHE_K or None, db=CACHE_DB, decode_responses=True, socket_timeout=5, socket_connect_timeout=5, retry_on_timeout=True, health_check_interval=30, ) async def connect_to_redis(): """ Establish connection to Redis. Called during application startup. """ global redis_client try: redis_client = _redis_from_settings() logger.info("Redis client initialized", extra={ "service": "cuatrolabs-spa-ms" }) # Test the connection await redis_client.ping() logger.info("Successfully connected to Redis") except Exception as e: logger.error("Failed to initialize Redis client", exc_info=e) raise async def close_redis_connection(): """ Close Redis connection. Called during application shutdown. """ global redis_client if redis_client: logger.info("Closing Redis connection") await redis_client.close() logger.info("Redis connection closed") def get_redis() -> redis.Redis: """ Get Redis client instance. Returns: redis.Redis: Redis client instance """ return redis_client