segmentopulse-backend / app /services /upstash_cache.py
SHAFI
the full ingestion pipline of the pulse is upgraded, worked on flaws and rectified them, solved over 10 major flaws
82bd507
"""
Upstash Redis Cache Service (REST API)
======================================
Optimized for Upstash Free Tier:
- Data Size: 256 MB (we target max 200 MB)
- Bandwidth: 50 GB/month
- Commands: 10,000/sec
- Request Size: 10 MB
- Record Size: 100 MB
Uses HTTP REST API instead of redis-py for serverless compatibility.
"""
import httpx
import json
import logging
from typing import Any, Optional
from datetime import datetime
logger = logging.getLogger(__name__)
class UpstashCache:
"""
REST-based Redis caching service for Upstash
Features:
- HTTP REST client (no redis-py needed)
- Automatic TTL management
- Memory-efficient serialization
- Graceful error handling
"""
def __init__(
self,
rest_url: str,
rest_token: str,
enabled: bool = True,
default_ttl: int = 300 # 5 minutes
):
"""
Initialize Upstash cache
Args:
rest_url: Upstash REST API URL
rest_token: Upstash REST API token
enabled: Whether caching is enabled
default_ttl: Default TTL in seconds
"""
self.rest_url = rest_url.rstrip('/')
if self.rest_url and not self.rest_url.startswith("http"):
self.rest_url = f"https://{self.rest_url}"
# Auto-disable if URL is missing
if not self.rest_url:
enabled = False
logger.warning("โš ๏ธ Upstash URL missing. Disabling cache.")
self.rest_token = rest_token
self.enabled = enabled
self.default_ttl = default_ttl
# Stats tracking
self.stats = {
'hits': 0,
'misses': 0,
'sets': 0,
'errors': 0
}
def _get_client(self) -> httpx.AsyncClient:
"""Lazy initialization of httpx client to avoid asyncio loop issues on Windows"""
if not hasattr(self, '_client') or self._client is None:
self._client = httpx.AsyncClient(
timeout=5.0, # 5 second timeout
headers={
"Authorization": f"Bearer {self.rest_token}",
"Content-Type": "application/json"
}
)
return self._client
# Stats tracking
self.stats = {
'hits': 0,
'misses': 0,
'sets': 0,
'errors': 0
}
if not self.enabled:
logger.info("โ„น๏ธ Upstash cache disabled (ENABLE_UPSTASH_CACHE=False)")
else:
logger.info("=" * 70)
logger.info("๐Ÿš€ [UPSTASH] Redis cache initialized")
logger.info(f" URL: {rest_url}")
logger.info(f" Default TTL: {default_ttl}s")
logger.info(f" Free Tier: 256 MB data, 50 GB/month bandwidth")
logger.info("=" * 70)
async def _execute_command(self, command: list) -> Optional[Any]:
"""
Execute Redis command via REST API
Args:
command: Redis command as list, e.g. ["GET", "key"]
Returns:
Command result or None on error
"""
if not self.enabled:
return None
try:
client = self._get_client()
response = await client.post(
f"{self.rest_url}",
json=command
)
if response.status_code == 200:
result = response.json()
return result.get("result")
else:
logger.warning(f"โš ๏ธ Upstash error: {response.status_code} - {response.text}")
self.stats['errors'] += 1
return None
except Exception as e:
logger.error(f"โŒ Upstash request failed: {e}")
self.stats['errors'] += 1
return None
async def get(self, key: str) -> Optional[Any]:
"""
Get value from cache
Args:
key: Cache key
Returns:
Cached value (deserialized) or None if not found
"""
if not self.enabled:
return None
try:
result = await self._execute_command(["GET", key])
if result is None:
self.stats['misses'] += 1
logger.debug(f"โŒ Cache MISS: {key}")
return None
# Deserialize JSON
value = json.loads(result)
self.stats['hits'] += 1
logger.debug(f"โœ… Cache HIT: {key}")
return value
except Exception as e:
logger.error(f"โŒ Cache get error for {key}: {e}")
self.stats['errors'] += 1
return None
async def set(
self,
key: str,
value: Any,
ttl: Optional[int] = None
) -> bool:
"""
Set value in cache with TTL
Args:
key: Cache key
value: Value to cache (will be JSON serialized)
ttl: Time-to-live in seconds (uses default if not specified)
Returns:
True if successful, False otherwise
"""
if not self.enabled:
return False
try:
# Serialize to JSON
serialized = json.dumps(value)
# Check size (warn if >1MB)
size_kb = len(serialized) / 1024
if size_kb > 1024: # >1MB
logger.warning(f"โš ๏ธ Large cache entry: {key} ({size_kb:.1f} KB)")
# Use provided TTL or default
ttl_seconds = ttl if ttl is not None else self.default_ttl
# SETEX command (set with expiration)
result = await self._execute_command(["SETEX", key, ttl_seconds, serialized])
if result == "OK" or result is not None:
self.stats['sets'] += 1
logger.debug(f"๐Ÿ’พ Cache SET: {key} (TTL: {ttl_seconds}s, Size: {size_kb:.1f} KB)")
return True
return False
except Exception as e:
logger.error(f"โŒ Cache set error for {key}: {e}")
self.stats['errors'] += 1
return False
async def delete(self, key: str) -> bool:
"""
Delete key from cache
Args:
key: Cache key to delete
Returns:
True if deleted, False otherwise
"""
if not self.enabled:
return False
try:
result = await self._execute_command(["DEL", key])
deleted = result == 1
if deleted:
logger.debug(f"๐Ÿ—‘๏ธ Cache DELETE: {key}")
return deleted
except Exception as e:
logger.error(f"โŒ Cache delete error for {key}: {e}")
return False
async def invalidate_pattern(self, pattern: str) -> int:
"""
Invalidate all keys matching pattern
Args:
pattern: Redis pattern (e.g., "news:*")
Returns:
Number of keys deleted
"""
if not self.enabled:
return 0
try:
# Get all matching keys
keys = await self._execute_command(["KEYS", pattern])
if not keys:
return 0
# Delete all keys
for key in keys:
await self._execute_command(["DEL", key])
logger.info(f"๐Ÿ—‘๏ธ Invalidated {len(keys)} keys matching '{pattern}'")
return len(keys)
except Exception as e:
logger.error(f"โŒ Cache invalidation error for {pattern}: {e}")
return 0
def get_stats(self) -> dict:
"""Get cache statistics"""
total_requests = self.stats['hits'] + self.stats['misses']
hit_rate = (
self.stats['hits'] / total_requests * 100
if total_requests > 0 else 0
)
return {
**self.stats,
'total_requests': total_requests,
'hit_rate_percent': round(hit_rate, 2),
'enabled': self.enabled
}
def print_stats(self):
"""Print cache statistics"""
stats = self.get_stats()
logger.info("")
logger.info("=" * 70)
logger.info("๐Ÿ“Š [UPSTASH] Cache Statistics")
logger.info("=" * 70)
logger.info(f" ๐Ÿ”น Total Requests: {stats['total_requests']:,}")
logger.info(f" ๐Ÿ”น Cache Hits: {stats['hits']:,}")
logger.info(f" ๐Ÿ”น Cache Misses: {stats['misses']:,}")
logger.info(f" ๐Ÿ”น Hit Rate: {stats['hit_rate_percent']}%")
logger.info(f" ๐Ÿ”น Sets: {stats['sets']:,}")
logger.info(f" ๐Ÿ”น Errors: {stats['errors']:,}")
logger.info("=" * 70)
logger.info("")
async def health_check(self) -> bool:
"""
Check if Upstash is reachable
Returns:
True if healthy, False otherwise
"""
try:
result = await self._execute_command(["PING"])
healthy = result == "PONG"
if healthy:
logger.info("โœ… Upstash health check: OK")
else:
logger.warning("โš ๏ธ Upstash health check: FAILED")
return healthy
except Exception as e:
logger.error(f"โŒ Upstash health check error: {e}")
return False
async def close(self):
"""Close HTTP client"""
if hasattr(self, '_client') and self._client is not None:
await self._client.aclose()
# Global singleton instance
_upstash_cache: Optional[UpstashCache] = None
def get_upstash_cache() -> UpstashCache:
"""
Get or create global Upstash cache instance
Returns:
UpstashCache: Singleton cache instance
"""
global _upstash_cache
if _upstash_cache is None:
from app.config import settings
_upstash_cache = UpstashCache(
rest_url=settings.UPSTASH_REDIS_REST_URL,
rest_token=settings.UPSTASH_REDIS_REST_TOKEN,
enabled=settings.ENABLE_UPSTASH_CACHE,
default_ttl=300 # 5 minutes default
)
return _upstash_cache