Spaces:
Running
Running
File size: 6,008 Bytes
60d4850 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 | """Redis caching layer for VoxDoc.
Phase 8: Optional Redis-backed cache for RAG embeddings, model outputs,
and rate limit windows. Falls back to in-memory LRU when Redis is unavailable.
Set ``REDIS_URL`` env var or ``redis_url`` in config to enable.
"""
import hashlib
import json
import logging
import time
from collections import OrderedDict
from typing import Any, Optional
from app.config import settings
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# In-memory fallback (bounded LRU)
# ---------------------------------------------------------------------------
class _LRUCache:
"""Simple thread-safe-ish LRU cache for single-process fallback."""
def __init__(self, maxsize: int = 512):
self._cache: OrderedDict[str, tuple[Any, float]] = OrderedDict()
self._maxsize = maxsize
def get(self, key: str) -> Optional[Any]:
entry = self._cache.get(key)
if entry is None:
return None
value, expires_at = entry
if expires_at and time.time() > expires_at:
self._cache.pop(key, None)
return None
self._cache.move_to_end(key)
return value
def set(self, key: str, value: Any, ttl: int = 0) -> None:
expires_at = (time.time() + ttl) if ttl > 0 else 0.0
if key in self._cache:
self._cache.move_to_end(key)
self._cache[key] = (value, expires_at)
while len(self._cache) > self._maxsize:
self._cache.popitem(last=False)
def delete(self, key: str) -> None:
self._cache.pop(key, None)
def clear(self) -> None:
self._cache.clear()
def size(self) -> int:
return len(self._cache)
# ---------------------------------------------------------------------------
# Redis-backed cache
# ---------------------------------------------------------------------------
class _RedisCache:
"""Async Redis cache wrapper."""
def __init__(self, url: str, default_ttl: int = 300):
self._url = url
self._default_ttl = default_ttl
self._redis = None
async def _connect(self):
if self._redis is not None:
return
try:
import redis.asyncio as aioredis
self._redis = aioredis.from_url(
self._url,
decode_responses=True,
socket_connect_timeout=5,
)
await self._redis.ping()
logger.info("Redis cache connected: %s", self._url.split("@")[-1])
except Exception as e:
logger.warning("Redis connection failed, falling back to memory cache: %s", e)
self._redis = None
async def get(self, key: str) -> Optional[Any]:
await self._connect()
if self._redis is None:
return None
try:
raw = await self._redis.get(key)
if raw is None:
return None
return json.loads(raw)
except Exception as e:
logger.debug("Redis GET error for key %s: %s", key, e)
return None
async def set(self, key: str, value: Any, ttl: int = 0) -> None:
await self._connect()
if self._redis is None:
return
try:
ttl = ttl or self._default_ttl
await self._redis.setex(key, ttl, json.dumps(value, default=str))
except Exception as e:
logger.debug("Redis SET error for key %s: %s", key, e)
async def delete(self, key: str) -> None:
await self._connect()
if self._redis is None:
return
try:
await self._redis.delete(key)
except Exception as e:
logger.debug("Redis DELETE error for key %s: %s", key, e)
async def clear(self) -> None:
await self._connect()
if self._redis is None:
return
try:
await self._redis.flushdb(asynchronous=True)
except Exception as e:
logger.debug("Redis CLEAR error: %s", e)
# ---------------------------------------------------------------------------
# Unified cache interface
# ---------------------------------------------------------------------------
class CacheService:
"""Unified cache with Redis primary and LRU fallback."""
def __init__(self):
self._memory = _LRUCache(maxsize=512)
self._redis: Optional[_RedisCache] = None
if settings.redis_url:
self._redis = _RedisCache(
url=settings.redis_url,
default_ttl=settings.redis_cache_ttl_seconds,
)
@staticmethod
def make_key(*parts: str) -> str:
"""Build a namespaced cache key."""
raw = ":".join(str(p) for p in parts)
return f"voxdoc:{hashlib.sha256(raw.encode()).hexdigest()[:16]}"
async def get(self, key: str) -> Optional[Any]:
"""Get from Redis first, then memory fallback."""
if self._redis:
val = await self._redis.get(key)
if val is not None:
return val
return self._memory.get(key)
async def set(self, key: str, value: Any, ttl: int = 0) -> None:
"""Set in both Redis and memory."""
self._memory.set(key, value, ttl)
if self._redis:
await self._redis.set(key, value, ttl)
async def delete(self, key: str) -> None:
self._memory.delete(key)
if self._redis:
await self._redis.delete(key)
async def clear(self) -> None:
self._memory.clear()
if self._redis:
await self._redis.clear()
@property
def redis_enabled(self) -> bool:
return self._redis is not None
# Singleton
_cache_service: Optional[CacheService] = None
def get_cache_service() -> CacheService:
"""Get or create the global cache service."""
global _cache_service
if _cache_service is None:
_cache_service = CacheService()
return _cache_service
|