File size: 3,235 Bytes
b3b36f7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""
Redis connection and settings for arq queue.
"""

import logging
from typing import Optional

from arq.connections import RedisSettings as ArqRedisSettings
from redis.asyncio import Redis

logger = logging.getLogger(__name__)

# Module-level pool cache
_redis_pool: Optional[Redis] = None


def get_redis_settings() -> ArqRedisSettings:
    """
    Get Redis settings for arq worker.
    
    Reads from environment:
        REDIS_URL: Full Redis URL (redis://host:port/db)
        
    Falls back to localhost:6379 for development.
    """
    import os
    
    redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
    
    # Parse URL for arq settings
    # Format: redis://[user:password@]host:port/db
    from urllib.parse import urlparse
    parsed = urlparse(redis_url)
    
    return ArqRedisSettings(
        host=parsed.hostname or "localhost",
        port=parsed.port or 6379,
        database=int(parsed.path.lstrip("/") or 0),
        password=parsed.password,
    )


async def get_redis_pool(max_retries: int = 5, retry_delay: float = 1.0) -> Redis:
    """
    Get async Redis connection pool.
    Lazy initialization, cached at module level.
    
    Includes retry logic for HF Spaces where Redis might start
    slightly after API/Worker due to supervisord startup order.
    """
    global _redis_pool
    
    if _redis_pool is None:
        import os
        import asyncio
        
        redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
        
        for attempt in range(max_retries):
            try:
                pool = Redis.from_url(
                    redis_url, 
                    decode_responses=True,
                    socket_connect_timeout=5.0,
                    socket_timeout=5.0,
                )
                # Test connection
                await pool.ping()
                _redis_pool = pool
                logger.info(f"Redis pool created: {redis_url.split('@')[-1]}")
                break
            except Exception as e:
                if attempt < max_retries - 1:
                    logger.warning(f"Redis connection attempt {attempt + 1} failed, retrying in {retry_delay}s: {e}")
                    await asyncio.sleep(retry_delay)
                else:
                    logger.error(f"Redis connection failed after {max_retries} attempts: {e}")
                    raise
    
    return _redis_pool


async def close_redis_pool():
    """Close Redis pool on shutdown."""
    global _redis_pool
    if _redis_pool is not None:
        await _redis_pool.close()
        _redis_pool = None
        logger.info("Redis pool closed")


async def redis_healthcheck() -> dict:
    """
    Check Redis connectivity.
    
    Returns:
        dict with 'ok' bool and 'latency_ms' float
    """
    import time
    
    try:
        pool = await get_redis_pool()
        start = time.monotonic()
        await pool.ping()
        latency = (time.monotonic() - start) * 1000
        
        return {"ok": True, "latency_ms": round(latency, 2)}
    except Exception as e:
        logger.warning(f"Redis healthcheck failed: {e}")
        return {"ok": False, "error": str(e)}


# Re-export for convenience
RedisSettings = ArqRedisSettings