Spaces:
Sleeping
Sleeping
| import logging | |
| from redis.asyncio import Redis | |
| from redis.exceptions import RedisError | |
| from app.core.config import settings | |
| logger = logging.getLogger(__name__) | |
| # Parse host and port | |
| CACHE_HOST, CACHE_PORT = settings.CACHE_URI.split(":") | |
| CACHE_PORT = int(CACHE_PORT) | |
| try: | |
| redis_client = Redis( | |
| host=CACHE_HOST, | |
| port=CACHE_PORT, | |
| username="default", | |
| password=settings.CACHE_K, | |
| decode_responses=True | |
| ) | |
| logger.info("Connected to Redis.") | |
| except RedisError as e: | |
| logger.error(f"Failed to connect to Redis: {e}") | |
| raise | |
| async def get_redis() -> Redis: | |
| return redis_client |