Spaces:
Running
Running
| from __future__ import annotations | |
| import logging | |
| from typing import Optional | |
| from redis.asyncio import Redis | |
| from app.config import get_settings | |
| logger = logging.getLogger(__name__) | |
| def create_redis_client(redis_url: str) -> Optional[Redis]: | |
| if not redis_url: | |
| logger.warning("REDIS_URL not configured, Redis features disabled") | |
| return None | |
| logger.info("Connecting to Redis at %s", redis_url.split("@")[-1] if "@" in redis_url else redis_url) | |
| return Redis.from_url( | |
| redis_url, | |
| max_connections=10, | |
| socket_connect_timeout=5, | |
| socket_timeout=5, | |
| retry_on_timeout=True, | |
| decode_responses=True, | |
| ) | |
| async def close_redis(redis: Optional[Redis]) -> None: | |
| if redis: | |
| await redis.aclose() | |
| logger.info("Redis connection closed") | |