mkkbms
feat: initial foundation — workforce profiles + live tracking & route replay
3818b61
"""
Redis connection management for Workforce Microservice.
"""
import redis.asyncio as redis
from app.core.logging import get_logger
from app.core.config import settings
logger = get_logger(__name__)
redis_client: redis.Redis = None
async def connect_to_redis():
global redis_client
try:
pool_params = {
"host": settings.REDIS_HOST,
"port": settings.REDIS_PORT,
"db": settings.REDIS_DB,
"decode_responses": True,
"max_connections": 5,
"socket_keepalive": True,
"socket_connect_timeout": 5,
"socket_timeout": 5,
"retry_on_timeout": False,
"health_check_interval": 30,
}
if settings.REDIS_PASSWORD:
pool_params["password"] = settings.REDIS_PASSWORD
pool = redis.ConnectionPool(**pool_params)
redis_client = redis.Redis(connection_pool=pool, decode_responses=True)
await redis_client.ping()
logger.info("Connected to Redis", extra={"host": settings.REDIS_HOST})
except Exception as e:
logger.warning("Redis connection failed (non-critical) - caching disabled", exc_info=e)
redis_client = None
async def close_redis_connection():
global redis_client
if redis_client:
try:
await redis_client.aclose()
logger.info("Redis connection closed")
except Exception as e:
logger.debug("Error closing Redis connection", exc_info=e)
finally:
redis_client = None
def get_redis() -> redis.Redis:
return redis_client