Spaces:
Sleeping
Sleeping
| """ | |
| Redis service for rate limiting and task queue. | |
| """ | |
| import os | |
| import json | |
| import logging | |
| from typing import Optional, Dict, Any | |
| from datetime import datetime, timezone | |
| import redis | |
| from redis.exceptions import RedisError | |
| logger = logging.getLogger(__name__) | |
| class RedisService: | |
| """Singleton Redis connection for rate limiting and task queue.""" | |
| _instance = None | |
| _client: Optional[redis.Redis] = None | |
| def __new__(cls): | |
| """Ensure singleton pattern.""" | |
| if cls._instance is None: | |
| cls._instance = super(RedisService, cls).__new__(cls) | |
| return cls._instance | |
| def __init__(self): | |
| """Initialize Redis connection if not already done.""" | |
| if self._client is None: | |
| self._connect() | |
| def _connect(self): | |
| """Establish Redis connection.""" | |
| try: | |
| redis_host = os.getenv("REDIS_HOST", "localhost") | |
| redis_port = int(os.getenv("REDIS_PORT", "6379")) | |
| redis_password = os.getenv("REDIS_PASSWORD") | |
| self._client = redis.Redis( | |
| host=redis_host, | |
| port=redis_port, | |
| password=redis_password if redis_password else None, | |
| decode_responses=True, | |
| socket_connect_timeout=5, | |
| socket_timeout=5 | |
| ) | |
| # Test connection | |
| self._client.ping() | |
| logger.info("Redis connection established successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to connect to Redis: {str(e)}") | |
| self._client = None | |
| def is_connected(self) -> bool: | |
| """Check if Redis is connected.""" | |
| return self._client is not None | |
| # ==================== RATE LIMITING ==================== | |
| def check_rate_limit( | |
| self, | |
| identifier: str, | |
| max_requests: int = 2, | |
| window_seconds: int = 60 | |
| ) -> tuple[bool, int]: | |
| """ | |
| Check if request should be rate limited. | |
| Args: | |
| identifier: User/device/IP identifier | |
| max_requests: Maximum requests allowed | |
| window_seconds: Time window in seconds | |
| Returns: | |
| Tuple of (is_allowed, current_count) | |
| """ | |
| if not self._client: | |
| # If Redis is down, allow request (fail open) | |
| logger.warning("Redis unavailable, bypassing rate limit") | |
| return True, 0 | |
| try: | |
| key = f"ratelimit:{identifier}" | |
| # Increment counter | |
| count = self._client.incr(key) | |
| # Set expiry on first request | |
| if count == 1: | |
| self._client.expire(key, window_seconds) | |
| # Check if limit exceeded | |
| is_allowed = count <= max_requests | |
| return is_allowed, count | |
| except RedisError as e: | |
| logger.error(f"Rate limit check failed: {str(e)}") | |
| # Fail open - allow request if Redis errors | |
| return True, 0 | |
| # ==================== IP LOGGING GATE ==================== | |
| def should_log_ip(self, device_id: str, ttl_seconds: int = 300) -> bool: | |
| """ | |
| Check if IP should be logged for this device (gated by TTL). | |
| Args: | |
| device_id: Device identifier | |
| ttl_seconds: Time-to-live for gate (default 5 minutes) | |
| Returns: | |
| True if IP should be logged, False if within TTL window | |
| """ | |
| if not self._client: | |
| # If Redis is down, log IP | |
| return True | |
| try: | |
| key = f"iplog:{device_id}" | |
| # Try to set key with NX (only if not exists) | |
| result = self._client.set(key, "1", nx=True, ex=ttl_seconds) | |
| # If set succeeded, IP should be logged | |
| return result is not None | |
| except RedisError as e: | |
| logger.error(f"IP log gate check failed: {str(e)}") | |
| # Fail safe - log IP if Redis errors | |
| return True | |
| # ==================== TASK QUEUE ==================== | |
| def enqueue_task(self, job_id: str, task_data: Dict[str, Any]) -> bool: | |
| """ | |
| Add task to Redis queue. | |
| Args: | |
| job_id: Unique job identifier | |
| task_data: Task payload | |
| Returns: | |
| True if enqueued successfully | |
| """ | |
| if not self._client: | |
| logger.error("Redis unavailable, cannot enqueue task") | |
| return False | |
| try: | |
| # Add to queue | |
| queue_key = "absa_tasks" | |
| task_payload = { | |
| "job_id": job_id, | |
| "data": task_data, | |
| "enqueued_at": datetime.now(timezone.utc).isoformat() | |
| } | |
| self._client.rpush(queue_key, json.dumps(task_payload)) | |
| # Set initial status | |
| self.set_task_status(job_id, "PENDING") | |
| logger.info(f"Task {job_id} enqueued successfully") | |
| return True | |
| except RedisError as e: | |
| logger.error(f"Failed to enqueue task: {str(e)}") | |
| return False | |
| def dequeue_task(self, timeout: int = 1) -> Optional[Dict[str, Any]]: | |
| """ | |
| Get next task from queue (blocking). | |
| Args: | |
| timeout: Block timeout in seconds | |
| Returns: | |
| Task payload or None | |
| """ | |
| if not self._client: | |
| return None | |
| try: | |
| queue_key = "absa_tasks" | |
| result = self._client.blpop(queue_key, timeout=timeout) | |
| if result: | |
| _, task_json = result | |
| task = json.loads(task_json) | |
| logger.info(f"Dequeued task: {task['job_id']}") | |
| return task | |
| return None | |
| except RedisError as e: | |
| logger.error(f"Failed to dequeue task: {str(e)}") | |
| return None | |
| def set_task_status( | |
| self, | |
| job_id: str, | |
| status: str, | |
| ttl_seconds: int = 3600 | |
| ) -> bool: | |
| """ | |
| Set task status in Redis. | |
| Args: | |
| job_id: Job identifier | |
| status: Status (PENDING, RUNNING, DONE, FAILED) | |
| ttl_seconds: Time-to-live for status key | |
| Returns: | |
| True if successful | |
| """ | |
| if not self._client: | |
| return False | |
| try: | |
| key = f"job:{job_id}:status" | |
| self._client.set(key, status, ex=ttl_seconds) | |
| return True | |
| except RedisError as e: | |
| logger.error(f"Failed to set task status: {str(e)}") | |
| return False | |
| def get_task_status(self, job_id: str) -> Optional[str]: | |
| """ | |
| Get task status from Redis. | |
| Args: | |
| job_id: Job identifier | |
| Returns: | |
| Status string or None | |
| """ | |
| if not self._client: | |
| return None | |
| try: | |
| key = f"job:{job_id}:status" | |
| status = self._client.get(key) | |
| return status | |
| except RedisError as e: | |
| logger.error(f"Failed to get task status: {str(e)}") | |
| return None | |
| def set_task_result( | |
| self, | |
| job_id: str, | |
| result: Dict[str, Any], | |
| ttl_seconds: int = 3600 | |
| ) -> bool: | |
| """ | |
| Store task result in Redis. | |
| Args: | |
| job_id: Job identifier | |
| result: Task result data | |
| ttl_seconds: Time-to-live for result | |
| Returns: | |
| True if successful | |
| """ | |
| if not self._client: | |
| return False | |
| try: | |
| key = f"job:{job_id}:result" | |
| self._client.set(key, json.dumps(result), ex=ttl_seconds) | |
| return True | |
| except RedisError as e: | |
| logger.error(f"Failed to set task result: {str(e)}") | |
| return False | |
| def get_task_result(self, job_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Get task result from Redis. | |
| Args: | |
| job_id: Job identifier | |
| Returns: | |
| Result data or None | |
| """ | |
| if not self._client: | |
| return None | |
| try: | |
| key = f"job:{job_id}:result" | |
| result_json = self._client.get(key) | |
| if result_json: | |
| return json.loads(result_json) | |
| return None | |
| except RedisError as e: | |
| logger.error(f"Failed to get task result: {str(e)}") | |
| return None | |
| def close(self): | |
| """Close Redis connection.""" | |
| if self._client: | |
| self._client.close() | |
| self._client = None | |
| logger.info("Redis connection closed") | |
| # Singleton instance | |
| _redis_service = None | |
| def get_redis_service() -> RedisService: | |
| """Get Redis service singleton.""" | |
| global _redis_service | |
| if _redis_service is None: | |
| _redis_service = RedisService() | |
| return _redis_service | |