Spaces:
Running
Running
| """ | |
| Redis connection and settings for arq queue. | |
| """ | |
| import logging | |
| from typing import Optional | |
| from arq.connections import RedisSettings as ArqRedisSettings | |
| from redis.asyncio import Redis | |
| logger = logging.getLogger(__name__) | |
| # Module-level pool cache | |
| _redis_pool: Optional[Redis] = None | |
| def get_redis_settings() -> ArqRedisSettings: | |
| """ | |
| Get Redis settings for arq worker. | |
| Reads from environment: | |
| REDIS_URL: Full Redis URL (redis://host:port/db) | |
| Falls back to localhost:6379 for development. | |
| """ | |
| import os | |
| redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0") | |
| # Parse URL for arq settings | |
| # Format: redis://[user:password@]host:port/db | |
| from urllib.parse import urlparse | |
| parsed = urlparse(redis_url) | |
| return ArqRedisSettings( | |
| host=parsed.hostname or "localhost", | |
| port=parsed.port or 6379, | |
| database=int(parsed.path.lstrip("/") or 0), | |
| password=parsed.password, | |
| ) | |
| async def get_redis_pool(max_retries: int = 5, retry_delay: float = 1.0) -> Redis: | |
| """ | |
| Get async Redis connection pool. | |
| Lazy initialization, cached at module level. | |
| Includes retry logic for HF Spaces where Redis might start | |
| slightly after API/Worker due to supervisord startup order. | |
| """ | |
| global _redis_pool | |
| if _redis_pool is None: | |
| import os | |
| import asyncio | |
| redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0") | |
| for attempt in range(max_retries): | |
| try: | |
| pool = Redis.from_url( | |
| redis_url, | |
| decode_responses=True, | |
| socket_connect_timeout=5.0, | |
| socket_timeout=5.0, | |
| ) | |
| # Test connection | |
| await pool.ping() | |
| _redis_pool = pool | |
| logger.info(f"Redis pool created: {redis_url.split('@')[-1]}") | |
| break | |
| except Exception as e: | |
| if attempt < max_retries - 1: | |
| logger.warning(f"Redis connection attempt {attempt + 1} failed, retrying in {retry_delay}s: {e}") | |
| await asyncio.sleep(retry_delay) | |
| else: | |
| logger.error(f"Redis connection failed after {max_retries} attempts: {e}") | |
| raise | |
| return _redis_pool | |
| async def close_redis_pool(): | |
| """Close Redis pool on shutdown.""" | |
| global _redis_pool | |
| if _redis_pool is not None: | |
| await _redis_pool.close() | |
| _redis_pool = None | |
| logger.info("Redis pool closed") | |
| async def redis_healthcheck() -> dict: | |
| """ | |
| Check Redis connectivity. | |
| Returns: | |
| dict with 'ok' bool and 'latency_ms' float | |
| """ | |
| import time | |
| try: | |
| pool = await get_redis_pool() | |
| start = time.monotonic() | |
| await pool.ping() | |
| latency = (time.monotonic() - start) * 1000 | |
| return {"ok": True, "latency_ms": round(latency, 2)} | |
| except Exception as e: | |
| logger.warning(f"Redis healthcheck failed: {e}") | |
| return {"ok": False, "error": str(e)} | |
| # Re-export for convenience | |
| RedisSettings = ArqRedisSettings | |