Spaces:
Sleeping
Sleeping
| """ | |
| Upstash Redis Cache Service for Observability Dashboard. | |
| Provides caching layer for LangSmith metrics with TTL support. | |
| Uses HTTP-based Redis client optimized for serverless environments. | |
| """ | |
| import json | |
| import logging | |
| import os | |
| from dataclasses import dataclass | |
| from datetime import UTC, datetime | |
| from functools import wraps | |
| from typing import Any | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| logger = logging.getLogger(__name__) | |
| class CacheConfig: | |
| """Cache configuration with sensible defaults.""" | |
| # TTL values in seconds | |
| summary_ttl: int = 300 # 5 minutes for summary stats | |
| trends_ttl: int = 600 # 10 minutes for trend data | |
| projects_ttl: int = 300 # 5 minutes for project list | |
| runs_ttl: int = 120 # 2 minutes for run details | |
| default_ttl: int = 300 # 5 minutes default | |
| # Cache key prefixes | |
| prefix: str = "specsbeforecode:observability" | |
| class UpstashRedisCache: | |
| """ | |
| Async-compatible Redis cache using Upstash REST API. | |
| Designed for serverless environments with HTTP-based connections. | |
| Supports both sync and async operations. | |
| """ | |
| def __init__(self, config: CacheConfig | None = None): | |
| self.config = config or CacheConfig() | |
| self._sync_client = None | |
| self._async_client = None | |
| self._initialized = False | |
| # Load credentials from environment | |
| self.url = os.getenv("UPSTASH_REDIS_REST_URL") | |
| self.token = os.getenv("UPSTASH_REDIS_REST_TOKEN") | |
| if not self.url or not self.token: | |
| logger.warning( | |
| "Upstash Redis credentials not found. " | |
| "Set UPSTASH_REDIS_REST_URL and UPSTASH_REDIS_REST_TOKEN. " | |
| "Cache operations will be no-ops." | |
| ) | |
| def is_configured(self) -> bool: | |
| """Check if Redis is properly configured.""" | |
| return bool(self.url and self.token) | |
| def _get_sync_client(self): | |
| """Get or create synchronous Redis client.""" | |
| if not self.is_configured: | |
| return None | |
| if self._sync_client is None: | |
| try: | |
| from upstash_redis import Redis | |
| self._sync_client = Redis(url=self.url, token=self.token) | |
| logger.info("Upstash Redis sync client initialized") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize Upstash Redis sync client: {e}") | |
| return None | |
| return self._sync_client | |
| def _get_async_client(self): | |
| """Get or create asynchronous Redis client.""" | |
| if not self.is_configured: | |
| return None | |
| if self._async_client is None: | |
| try: | |
| from upstash_redis.asyncio import Redis | |
| self._async_client = Redis(url=self.url, token=self.token) | |
| logger.info("Upstash Redis async client initialized") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize Upstash Redis async client: {e}") | |
| return None | |
| return self._async_client | |
| def _make_key(self, key: str) -> str: | |
| """Generate full cache key with prefix.""" | |
| return f"{self.config.prefix}:{key}" | |
| def _serialize(self, value: Any) -> str: | |
| """Serialize value to JSON string.""" | |
| if isinstance(value, (dict, list)): | |
| return json.dumps(value, default=str) | |
| return json.dumps({"value": value}, default=str) | |
| def _deserialize(self, value: str | None) -> Any: | |
| """Deserialize JSON string to Python object.""" | |
| if value is None: | |
| return None | |
| try: | |
| data = json.loads(value) | |
| # Unwrap simple values | |
| if isinstance(data, dict) and "value" in data and len(data) == 1: | |
| return data["value"] | |
| return data | |
| except (json.JSONDecodeError, TypeError): | |
| return value | |
| def get(self, key: str) -> Any | None: | |
| """Get value from cache (sync).""" | |
| client = self._get_sync_client() | |
| if not client: | |
| return None | |
| try: | |
| full_key = self._make_key(key) | |
| value = client.get(full_key) | |
| if value: | |
| logger.debug(f"Cache HIT: {key}") | |
| return self._deserialize(value) | |
| logger.debug(f"Cache MISS: {key}") | |
| return None | |
| except Exception as e: | |
| logger.warning(f"Cache get error for {key}: {e}") | |
| return None | |
| def set( | |
| self, | |
| key: str, | |
| value: Any, | |
| ttl: int | None = None | |
| ) -> bool: | |
| """Set value in cache with TTL (sync).""" | |
| client = self._get_sync_client() | |
| if not client: | |
| return False | |
| try: | |
| full_key = self._make_key(key) | |
| serialized = self._serialize(value) | |
| ex = ttl or self.config.default_ttl | |
| result = client.set(full_key, serialized, ex=ex) | |
| logger.debug(f"Cache SET: {key} (TTL: {ex}s)") | |
| return bool(result) | |
| except Exception as e: | |
| logger.warning(f"Cache set error for {key}: {e}") | |
| return False | |
| def delete(self, key: str) -> bool: | |
| """Delete key from cache (sync).""" | |
| client = self._get_sync_client() | |
| if not client: | |
| return False | |
| try: | |
| full_key = self._make_key(key) | |
| result = client.delete(full_key) | |
| logger.debug(f"Cache DELETE: {key}") | |
| return bool(result) | |
| except Exception as e: | |
| logger.warning(f"Cache delete error for {key}: {e}") | |
| return False | |
| def exists(self, key: str) -> bool: | |
| """Check if key exists in cache (sync).""" | |
| client = self._get_sync_client() | |
| if not client: | |
| return False | |
| try: | |
| full_key = self._make_key(key) | |
| return bool(client.exists(full_key)) | |
| except Exception as e: | |
| logger.warning(f"Cache exists error for {key}: {e}") | |
| return False | |
| async def aget(self, key: str) -> Any | None: | |
| """Get value from cache (async).""" | |
| client = self._get_async_client() | |
| if not client: | |
| return None | |
| try: | |
| full_key = self._make_key(key) | |
| value = await client.get(full_key) | |
| if value: | |
| logger.debug(f"Cache HIT: {key}") | |
| return self._deserialize(value) | |
| logger.debug(f"Cache MISS: {key}") | |
| return None | |
| except Exception as e: | |
| logger.warning(f"Async cache get error for {key}: {e}") | |
| return None | |
| async def aset( | |
| self, | |
| key: str, | |
| value: Any, | |
| ttl: int | None = None | |
| ) -> bool: | |
| """Set value in cache with TTL (async).""" | |
| client = self._get_async_client() | |
| if not client: | |
| return False | |
| try: | |
| full_key = self._make_key(key) | |
| serialized = self._serialize(value) | |
| ex = ttl or self.config.default_ttl | |
| result = await client.set(full_key, serialized, ex=ex) | |
| logger.debug(f"Async cache SET: {key} (TTL: {ex}s)") | |
| return bool(result) | |
| except Exception as e: | |
| logger.warning(f"Async cache set error for {key}: {e}") | |
| return False | |
| async def adelete(self, key: str) -> bool: | |
| """Delete key from cache (async).""" | |
| client = self._get_async_client() | |
| if not client: | |
| return False | |
| try: | |
| full_key = self._make_key(key) | |
| result = await client.delete(full_key) | |
| logger.debug(f"Async cache DELETE: {key}") | |
| return bool(result) | |
| except Exception as e: | |
| logger.warning(f"Async cache delete error for {key}: {e}") | |
| return False | |
| async def aexists(self, key: str) -> bool: | |
| """Check if key exists in cache (async).""" | |
| client = self._get_async_client() | |
| if not client: | |
| return False | |
| try: | |
| full_key = self._make_key(key) | |
| return bool(await client.exists(full_key)) | |
| except Exception as e: | |
| logger.warning(f"Async cache exists error for {key}: {e}") | |
| return False | |
| def get_summary_key(self, user_id: str, start_date: str, end_date: str, project_id: str | None = None) -> str: | |
| """Generate cache key for summary stats.""" | |
| parts = ["summary", user_id, start_date, end_date] | |
| if project_id: | |
| parts.append(project_id) | |
| return ":".join(parts) | |
| def get_trends_key(self, user_id: str, period: str, project_id: str | None = None) -> str: | |
| """Generate cache key for trends data.""" | |
| parts = ["trends", user_id, period] | |
| if project_id: | |
| parts.append(project_id) | |
| return ":".join(parts) | |
| def get_projects_key(self, user_id: str) -> str: | |
| """Generate cache key for projects list.""" | |
| return f"projects:{user_id}" | |
| def get_runs_key(self, user_id: str, project_id: str, page: int = 1) -> str: | |
| """Generate cache key for runs list.""" | |
| return f"runs:{user_id}:{project_id}:page{page}" | |
| async def cache_summary( | |
| self, | |
| user_id: str, | |
| start_date: str, | |
| end_date: str, | |
| data: dict[str, Any], | |
| project_id: str | None = None | |
| ) -> bool: | |
| """Cache summary statistics.""" | |
| key = self.get_summary_key(user_id, start_date, end_date, project_id) | |
| return await self.aset(key, data, ttl=self.config.summary_ttl) | |
| async def get_cached_summary( | |
| self, | |
| user_id: str, | |
| start_date: str, | |
| end_date: str, | |
| project_id: str | None = None | |
| ) -> dict[str, Any] | None: | |
| """Get cached summary statistics.""" | |
| key = self.get_summary_key(user_id, start_date, end_date, project_id) | |
| return await self.aget(key) | |
| async def cache_trends( | |
| self, | |
| user_id: str, | |
| period: str, | |
| data: dict[str, Any], | |
| project_id: str | None = None | |
| ) -> bool: | |
| """Cache trends data.""" | |
| key = self.get_trends_key(user_id, period, project_id) | |
| return await self.aset(key, data, ttl=self.config.trends_ttl) | |
| async def get_cached_trends( | |
| self, | |
| user_id: str, | |
| period: str, | |
| project_id: str | None = None | |
| ) -> dict[str, Any] | None: | |
| """Get cached trends data.""" | |
| key = self.get_trends_key(user_id, period, project_id) | |
| return await self.aget(key) | |
| async def cache_projects(self, user_id: str, data: dict[str, Any]) -> bool: | |
| """Cache projects list.""" | |
| key = self.get_projects_key(user_id) | |
| return await self.aset(key, data, ttl=self.config.projects_ttl) | |
| async def get_cached_projects(self, user_id: str) -> dict[str, Any] | None: | |
| """Get cached projects list.""" | |
| key = self.get_projects_key(user_id) | |
| return await self.aget(key) | |
| async def invalidate_user_cache(self, user_id: str) -> None: | |
| """Invalidate all cache entries for a user.""" | |
| # Note: Upstash doesn't support SCAN easily via REST, | |
| # so we invalidate known keys | |
| logger.info(f"Cache invalidation requested for user {user_id}") | |
| # In practice, rely on TTL expiration for now | |
| # For manual invalidation, track specific keys | |
| def cached( | |
| key_func: callable, | |
| ttl: int | None = None, | |
| cache_instance: UpstashRedisCache | None = None | |
| ): | |
| """ | |
| Decorator for caching async function results. | |
| Args: | |
| key_func: Function that generates cache key from function arguments | |
| ttl: Time-to-live in seconds (uses default if None) | |
| cache_instance: Cache instance to use (creates default if None) | |
| Example: | |
| @cached( | |
| key_func=lambda user_id, period: f"trends:{user_id}:{period}", | |
| ttl=600 | |
| ) | |
| async def get_trends(user_id: str, period: str): | |
| # expensive operation | |
| return await fetch_from_langsmith(...) | |
| """ | |
| def decorator(func): | |
| async def wrapper(*args, **kwargs): | |
| cache = cache_instance or get_cache() | |
| # Generate cache key | |
| """ | |
| Decorator for caching async function results. | |
| Args: | |
| key_func: Function that generates cache key from function arguments | |
| ttl: Time-to-live in seconds (uses default if None) | |
| cache_instance: Cache instance to use (creates default if None) | |
| Example: | |
| @cached( | |
| key_func=lambda *a, **kw: f"user:{a[0]}", | |
| ttl=60, | |
| ) | |
| """ | |
| # Assume 'result' is the return value of func | |
| result = await func(*args, **kwargs) | |
| cache_key = key_func(*args, **kwargs) if 'key_func' in locals() else None | |
| if result is not None and cache_key: | |
| await cache.aset(cache_key, result, ttl=ttl) | |
| return result | |
| return wrapper | |
| return decorator | |
| _cache_instance: UpstashRedisCache | None = None | |
| def get_cache() -> UpstashRedisCache: | |
| """Get or create the global cache instance.""" | |
| global _cache_instance | |
| if _cache_instance is None: | |
| _cache_instance = UpstashRedisCache() | |
| return _cache_instance | |
| def init_cache(config: CacheConfig | None = None) -> UpstashRedisCache: | |
| """Initialize the global cache with custom config.""" | |
| global _cache_instance | |
| _cache_instance = UpstashRedisCache(config) | |
| return _cache_instance | |
| async def check_cache_health() -> dict[str, Any]: | |
| """ | |
| Check Redis cache health and connectivity. | |
| Returns: | |
| Dict with status, latency, and configuration info | |
| """ | |
| cache = get_cache() | |
| result = { | |
| "service": "upstash_redis", | |
| "configured": cache.is_configured, | |
| "status": "unknown", | |
| "latency_ms": None, | |
| "error": None | |
| } | |
| if not cache.is_configured: | |
| result["status"] = "not_configured" | |
| result["error"] = "Missing UPSTASH_REDIS_REST_URL or UPSTASH_REDIS_REST_TOKEN" | |
| return result | |
| try: | |
| import time | |
| start = time.time() | |
| # Ping test with a simple set/get | |
| test_key = "_health_check" | |
| await cache.aset(test_key, {"timestamp": datetime.now(UTC).isoformat()}, ttl=10) | |
| value = await cache.aget(test_key) | |
| await cache.adelete(test_key) | |
| latency = (time.time() - start) * 1000 | |
| result["status"] = "healthy" if value else "degraded" | |
| result["latency_ms"] = round(latency, 2) | |
| except Exception as e: | |
| result["status"] = "unhealthy" | |
| result["error"] = str(e) | |
| return result | |