""" Redis service for rate limiting and task queue. """ import os import json import logging from typing import Optional, Dict, Any from datetime import datetime, timezone import redis from redis.exceptions import RedisError logger = logging.getLogger(__name__) class RedisService: """Singleton Redis connection for rate limiting and task queue.""" _instance = None _client: Optional[redis.Redis] = None def __new__(cls): """Ensure singleton pattern.""" if cls._instance is None: cls._instance = super(RedisService, cls).__new__(cls) return cls._instance def __init__(self): """Initialize Redis connection if not already done.""" if self._client is None: self._connect() def _connect(self): """Establish Redis connection.""" try: redis_host = os.getenv("REDIS_HOST", "localhost") redis_port = int(os.getenv("REDIS_PORT", "6379")) redis_password = os.getenv("REDIS_PASSWORD") self._client = redis.Redis( host=redis_host, port=redis_port, password=redis_password if redis_password else None, decode_responses=True, socket_connect_timeout=5, socket_timeout=5 ) # Test connection self._client.ping() logger.info("Redis connection established successfully") except Exception as e: logger.error(f"Failed to connect to Redis: {str(e)}") self._client = None def is_connected(self) -> bool: """Check if Redis is connected.""" return self._client is not None # ==================== RATE LIMITING ==================== def check_rate_limit( self, identifier: str, max_requests: int = 2, window_seconds: int = 60 ) -> tuple[bool, int]: """ Check if request should be rate limited. Args: identifier: User/device/IP identifier max_requests: Maximum requests allowed window_seconds: Time window in seconds Returns: Tuple of (is_allowed, current_count) """ if not self._client: # If Redis is down, allow request (fail open) logger.warning("Redis unavailable, bypassing rate limit") return True, 0 try: key = f"ratelimit:{identifier}" # Increment counter count = self._client.incr(key) # Set expiry on first request if count == 1: self._client.expire(key, window_seconds) # Check if limit exceeded is_allowed = count <= max_requests return is_allowed, count except RedisError as e: logger.error(f"Rate limit check failed: {str(e)}") # Fail open - allow request if Redis errors return True, 0 # ==================== IP LOGGING GATE ==================== def should_log_ip(self, device_id: str, ttl_seconds: int = 300) -> bool: """ Check if IP should be logged for this device (gated by TTL). Args: device_id: Device identifier ttl_seconds: Time-to-live for gate (default 5 minutes) Returns: True if IP should be logged, False if within TTL window """ if not self._client: # If Redis is down, log IP return True try: key = f"iplog:{device_id}" # Try to set key with NX (only if not exists) result = self._client.set(key, "1", nx=True, ex=ttl_seconds) # If set succeeded, IP should be logged return result is not None except RedisError as e: logger.error(f"IP log gate check failed: {str(e)}") # Fail safe - log IP if Redis errors return True # ==================== TASK QUEUE ==================== def enqueue_task(self, job_id: str, task_data: Dict[str, Any]) -> bool: """ Add task to Redis queue. Args: job_id: Unique job identifier task_data: Task payload Returns: True if enqueued successfully """ if not self._client: logger.error("Redis unavailable, cannot enqueue task") return False try: # Add to queue queue_key = "absa_tasks" task_payload = { "job_id": job_id, "data": task_data, "enqueued_at": datetime.now(timezone.utc).isoformat() } self._client.rpush(queue_key, json.dumps(task_payload)) # Set initial status self.set_task_status(job_id, "PENDING") logger.info(f"Task {job_id} enqueued successfully") return True except RedisError as e: logger.error(f"Failed to enqueue task: {str(e)}") return False def dequeue_task(self, timeout: int = 1) -> Optional[Dict[str, Any]]: """ Get next task from queue (blocking). Args: timeout: Block timeout in seconds Returns: Task payload or None """ if not self._client: return None try: queue_key = "absa_tasks" result = self._client.blpop(queue_key, timeout=timeout) if result: _, task_json = result task = json.loads(task_json) logger.info(f"Dequeued task: {task['job_id']}") return task return None except RedisError as e: logger.error(f"Failed to dequeue task: {str(e)}") return None def set_task_status( self, job_id: str, status: str, ttl_seconds: int = 3600 ) -> bool: """ Set task status in Redis. Args: job_id: Job identifier status: Status (PENDING, RUNNING, DONE, FAILED) ttl_seconds: Time-to-live for status key Returns: True if successful """ if not self._client: return False try: key = f"job:{job_id}:status" self._client.set(key, status, ex=ttl_seconds) return True except RedisError as e: logger.error(f"Failed to set task status: {str(e)}") return False def get_task_status(self, job_id: str) -> Optional[str]: """ Get task status from Redis. Args: job_id: Job identifier Returns: Status string or None """ if not self._client: return None try: key = f"job:{job_id}:status" status = self._client.get(key) return status except RedisError as e: logger.error(f"Failed to get task status: {str(e)}") return None def set_task_result( self, job_id: str, result: Dict[str, Any], ttl_seconds: int = 3600 ) -> bool: """ Store task result in Redis. Args: job_id: Job identifier result: Task result data ttl_seconds: Time-to-live for result Returns: True if successful """ if not self._client: return False try: key = f"job:{job_id}:result" self._client.set(key, json.dumps(result), ex=ttl_seconds) return True except RedisError as e: logger.error(f"Failed to set task result: {str(e)}") return False def get_task_result(self, job_id: str) -> Optional[Dict[str, Any]]: """ Get task result from Redis. Args: job_id: Job identifier Returns: Result data or None """ if not self._client: return None try: key = f"job:{job_id}:result" result_json = self._client.get(key) if result_json: return json.loads(result_json) return None except RedisError as e: logger.error(f"Failed to get task result: {str(e)}") return None def close(self): """Close Redis connection.""" if self._client: self._client.close() self._client = None logger.info("Redis connection closed") # Singleton instance _redis_service = None def get_redis_service() -> RedisService: """Get Redis service singleton.""" global _redis_service if _redis_service is None: _redis_service = RedisService() return _redis_service