Spaces:
Running
Running
File size: 3,235 Bytes
b3b36f7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | """
Redis connection and settings for arq queue.
"""
import logging
from typing import Optional
from arq.connections import RedisSettings as ArqRedisSettings
from redis.asyncio import Redis
logger = logging.getLogger(__name__)
# Module-level pool cache
_redis_pool: Optional[Redis] = None
def get_redis_settings() -> ArqRedisSettings:
"""
Get Redis settings for arq worker.
Reads from environment:
REDIS_URL: Full Redis URL (redis://host:port/db)
Falls back to localhost:6379 for development.
"""
import os
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# Parse URL for arq settings
# Format: redis://[user:password@]host:port/db
from urllib.parse import urlparse
parsed = urlparse(redis_url)
return ArqRedisSettings(
host=parsed.hostname or "localhost",
port=parsed.port or 6379,
database=int(parsed.path.lstrip("/") or 0),
password=parsed.password,
)
async def get_redis_pool(max_retries: int = 5, retry_delay: float = 1.0) -> Redis:
"""
Get async Redis connection pool.
Lazy initialization, cached at module level.
Includes retry logic for HF Spaces where Redis might start
slightly after API/Worker due to supervisord startup order.
"""
global _redis_pool
if _redis_pool is None:
import os
import asyncio
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
for attempt in range(max_retries):
try:
pool = Redis.from_url(
redis_url,
decode_responses=True,
socket_connect_timeout=5.0,
socket_timeout=5.0,
)
# Test connection
await pool.ping()
_redis_pool = pool
logger.info(f"Redis pool created: {redis_url.split('@')[-1]}")
break
except Exception as e:
if attempt < max_retries - 1:
logger.warning(f"Redis connection attempt {attempt + 1} failed, retrying in {retry_delay}s: {e}")
await asyncio.sleep(retry_delay)
else:
logger.error(f"Redis connection failed after {max_retries} attempts: {e}")
raise
return _redis_pool
async def close_redis_pool():
"""Close Redis pool on shutdown."""
global _redis_pool
if _redis_pool is not None:
await _redis_pool.close()
_redis_pool = None
logger.info("Redis pool closed")
async def redis_healthcheck() -> dict:
"""
Check Redis connectivity.
Returns:
dict with 'ok' bool and 'latency_ms' float
"""
import time
try:
pool = await get_redis_pool()
start = time.monotonic()
await pool.ping()
latency = (time.monotonic() - start) * 1000
return {"ok": True, "latency_ms": round(latency, 2)}
except Exception as e:
logger.warning(f"Redis healthcheck failed: {e}")
return {"ok": False, "error": str(e)}
# Re-export for convenience
RedisSettings = ArqRedisSettings
|