""" Redis connection management and utilities for the FastAPI backend. This module provides Redis connection pooling, dependency injection, health checks, and error handling for the video generation API. """ import asyncio import json import logging from typing import Any, Dict, List, Optional, Union from contextlib import asynccontextmanager import redis.asyncio as redis from redis.asyncio import ConnectionPool, Redis from redis.exceptions import ConnectionError, TimeoutError, RedisError from fastapi import HTTPException, status from .config import get_settings logger = logging.getLogger(__name__) settings = get_settings() class RedisManager: """ Redis connection manager with connection pooling and error handling. Provides centralized Redis connection management, health checks, and utility methods for the FastAPI application. """ def __init__(self): self._pool: Optional[ConnectionPool] = None self._redis: Optional[Redis] = None self._is_connected = False async def initialize(self) -> None: """Initialize Redis connection pool.""" try: # Create connection pool self._pool = ConnectionPool.from_url( settings.get_redis_url(), max_connections=settings.redis_max_connections, retry_on_timeout=True, socket_connect_timeout=settings.redis_socket_connect_timeout, socket_timeout=settings.redis_socket_timeout, health_check_interval=30, decode_responses=True ) # Create Redis client self._redis = Redis(connection_pool=self._pool) # Test connection await self._redis.ping() self._is_connected = True logger.info("Redis connection initialized successfully") except Exception as e: logger.error(f"Failed to initialize Redis connection: {e}") self._is_connected = False raise async def close(self) -> None: """Close Redis connection pool.""" if self._redis: await self._redis.close() if self._pool: await self._pool.disconnect() self._is_connected = False logger.info("Redis connection closed") @property def redis(self) -> Redis: """Get Redis client instance.""" if not self._redis or not self._is_connected: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Redis connection not available" ) return self._redis @property def is_connected(self) -> bool: """Check if Redis is connected.""" return self._is_connected async def health_check(self) -> Dict[str, Any]: """ Perform Redis health check. Returns: Dict containing health status and metrics """ try: if not self._redis: return { "status": "unhealthy", "error": "Redis client not initialized" } # Test basic connectivity start_time = asyncio.get_event_loop().time() await self._redis.ping() response_time = (asyncio.get_event_loop().time() - start_time) * 1000 # Get Redis info info = await self._redis.info() return { "status": "healthy", "response_time_ms": round(response_time, 2), "connected_clients": info.get("connected_clients", 0), "used_memory": info.get("used_memory_human", "unknown"), "redis_version": info.get("redis_version", "unknown"), "uptime_seconds": info.get("uptime_in_seconds", 0) } except Exception as e: logger.error(f"Redis health check failed: {e}") return { "status": "unhealthy", "error": str(e) } async def get_connection_info(self) -> Dict[str, Any]: """Get Redis connection information.""" if not self._pool: return {"error": "Connection pool not initialized"} return { "max_connections": self._pool.max_connections, "created_connections": self._pool.created_connections, "available_connections": len(self._pool._available_connections), "in_use_connections": len(self._pool._in_use_connections) } # Global Redis manager instance redis_manager = RedisManager() async def get_redis() -> Redis: """ FastAPI dependency to get Redis client. Returns: Redis client instance Raises: HTTPException: If Redis is not available """ return redis_manager.redis @asynccontextmanager async def redis_transaction(): """ Context manager for Redis transactions. Usage: async with redis_transaction() as pipe: pipe.set("key", "value") pipe.incr("counter") await pipe.execute() """ redis_client = redis_manager.redis pipe = redis_client.pipeline(transaction=True) try: yield pipe except Exception as e: logger.error(f"Redis transaction failed: {e}") raise finally: await pipe.reset() class RedisKeyManager: """ Utility class for managing Redis keys with consistent naming patterns. """ # Key prefixes JOB_PREFIX = "jobs" VIDEO_PREFIX = "videos" USER_JOBS_PREFIX = "user_jobs" JOB_QUEUE = "job_queue" JOB_STATUS_PREFIX = "job_status" SYSTEM_PREFIX = "system" CACHE_PREFIX = "cache" FILE_PREFIX = "files" USER_FILES_PREFIX = "user_files" @staticmethod def job_key(job_id: str) -> str: """Generate Redis key for job data.""" return f"{RedisKeyManager.JOB_PREFIX}:{job_id}" @staticmethod def video_key(video_id: str) -> str: """Generate Redis key for video metadata.""" return f"{RedisKeyManager.VIDEO_PREFIX}:{video_id}" @staticmethod def user_jobs_key(user_id: str) -> str: """Generate Redis key for user jobs index.""" return f"{RedisKeyManager.USER_JOBS_PREFIX}:{user_id}" @staticmethod def job_status_key(job_id: str) -> str: """Generate Redis key for job status cache.""" return f"{RedisKeyManager.JOB_STATUS_PREFIX}:{job_id}" @staticmethod def system_health_key() -> str: """Generate Redis key for system health data.""" return f"{RedisKeyManager.SYSTEM_PREFIX}:health" @staticmethod def cache_key(namespace: str, key: str) -> str: """Generate Redis key for cached data.""" return f"{RedisKeyManager.CACHE_PREFIX}:{namespace}:{key}" @staticmethod def file_key(file_id: str) -> str: """Generate Redis key for file metadata.""" return f"{RedisKeyManager.FILE_PREFIX}:{file_id}" @staticmethod def user_files_key(user_id: str) -> str: """Generate Redis key for user files index.""" return f"{RedisKeyManager.USER_FILES_PREFIX}:{user_id}" class RedisErrorHandler: """ Utility class for handling Redis errors consistently. """ @staticmethod def handle_redis_error(operation: str, error: Exception) -> HTTPException: """ Convert Redis errors to appropriate HTTP exceptions. Args: operation: Description of the operation that failed error: The Redis exception that occurred Returns: HTTPException with appropriate status code and message """ logger.error(f"Redis {operation} failed: {error}") if isinstance(error, ConnectionError): return HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"Redis connection failed during {operation}" ) elif isinstance(error, TimeoutError): return HTTPException( status_code=status.HTTP_504_GATEWAY_TIMEOUT, detail=f"Redis operation timed out during {operation}" ) elif isinstance(error, RedisError): return HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Redis error during {operation}" ) else: return HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Unexpected error during {operation}" ) # Utility functions for common Redis operations async def safe_redis_operation(operation_func, *args, **kwargs): """ Safely execute a Redis operation with error handling. Args: operation_func: The Redis operation function to execute *args: Arguments for the operation **kwargs: Keyword arguments for the operation Returns: Result of the operation or None if failed Raises: HTTPException: If the operation fails """ try: return await operation_func(*args, **kwargs) except Exception as e: operation_name = getattr(operation_func, '__name__', 'unknown') raise RedisErrorHandler.handle_redis_error(operation_name, e) async def redis_json_get(redis_client: Redis, key: str, default: Any = None) -> Any: """ Get and deserialize JSON data from Redis. Args: redis_client: Redis client instance key: Redis key default: Default value if key doesn't exist Returns: Deserialized JSON data or default value """ try: data = await redis_client.get(key) if data is None: return default return json.loads(data) except json.JSONDecodeError: logger.warning(f"Invalid JSON data in Redis key: {key}") return default except Exception as e: raise RedisErrorHandler.handle_redis_error("json_get", e) async def redis_json_set( redis_client: Redis, key: str, value: Any, ex: Optional[int] = None ) -> bool: """ Serialize and store JSON data in Redis. Args: redis_client: Redis client instance key: Redis key value: Data to serialize and store ex: Expiration time in seconds Returns: True if successful """ try: json_data = json.dumps(value, default=str) return await redis_client.set(key, json_data, ex=ex) except Exception as e: raise RedisErrorHandler.handle_redis_error("json_set", e)