Spaces:
Runtime error
Runtime error
| """ | |
| Enhanced Caching System for Predictions | |
| Provides intelligent caching with: | |
| - Redis-compatible in-memory cache | |
| - TTL-based expiration | |
| - Automatic cache invalidation | |
| - Performance metrics | |
| """ | |
| import time | |
| import hashlib | |
| import json | |
| from datetime import datetime, timedelta | |
| from typing import Any, Optional, Dict, Callable | |
| from functools import wraps | |
| import threading | |
| class PredictionCache: | |
| """High-performance prediction cache with TTL support""" | |
| def __init__(self, default_ttl: int = 300): # 5 minutes default | |
| self._cache: Dict[str, Dict] = {} | |
| self._lock = threading.RLock() | |
| self.default_ttl = default_ttl | |
| self.hits = 0 | |
| self.misses = 0 | |
| def _generate_key(self, *args, **kwargs) -> str: | |
| """Generate cache key from args""" | |
| key_data = json.dumps({'args': args, 'kwargs': kwargs}, sort_keys=True) | |
| return hashlib.md5(key_data.encode()).hexdigest() | |
| def get(self, key: str) -> Optional[Any]: | |
| """Get value from cache""" | |
| with self._lock: | |
| if key in self._cache: | |
| entry = self._cache[key] | |
| if entry['expires'] > time.time(): | |
| self.hits += 1 | |
| return entry['value'] | |
| else: | |
| del self._cache[key] | |
| self.misses += 1 | |
| return None | |
| def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None: | |
| """Set value in cache with TTL""" | |
| with self._lock: | |
| self._cache[key] = { | |
| 'value': value, | |
| 'expires': time.time() + (ttl or self.default_ttl), | |
| 'created': time.time() | |
| } | |
| def delete(self, key: str) -> bool: | |
| """Delete key from cache""" | |
| with self._lock: | |
| if key in self._cache: | |
| del self._cache[key] | |
| return True | |
| return False | |
| def clear(self) -> int: | |
| """Clear all cache entries""" | |
| with self._lock: | |
| count = len(self._cache) | |
| self._cache.clear() | |
| return count | |
| def cleanup_expired(self) -> int: | |
| """Remove expired entries""" | |
| with self._lock: | |
| now = time.time() | |
| expired = [k for k, v in self._cache.items() if v['expires'] <= now] | |
| for key in expired: | |
| del self._cache[key] | |
| return len(expired) | |
| def get_stats(self) -> Dict: | |
| """Get cache statistics""" | |
| with self._lock: | |
| total_requests = self.hits + self.misses | |
| hit_rate = (self.hits / total_requests * 100) if total_requests > 0 else 0 | |
| return { | |
| 'entries': len(self._cache), | |
| 'hits': self.hits, | |
| 'misses': self.misses, | |
| 'hit_rate': round(hit_rate, 2), | |
| 'memory_usage': self._estimate_memory() | |
| } | |
| def _estimate_memory(self) -> str: | |
| """Estimate memory usage""" | |
| try: | |
| import sys | |
| size = sys.getsizeof(self._cache) | |
| for v in self._cache.values(): | |
| size += sys.getsizeof(v) | |
| if size < 1024: | |
| return f"{size} B" | |
| elif size < 1024 * 1024: | |
| return f"{size / 1024:.1f} KB" | |
| else: | |
| return f"{size / (1024 * 1024):.1f} MB" | |
| except: | |
| return "Unknown" | |
| # Global cache instances | |
| prediction_cache = PredictionCache(default_ttl=300) # 5 min for predictions | |
| fixtures_cache = PredictionCache(default_ttl=600) # 10 min for fixtures | |
| odds_cache = PredictionCache(default_ttl=60) # 1 min for odds | |
| def cache_prediction(ttl: int = 300): | |
| """Decorator to cache prediction results""" | |
| def decorator(func: Callable): | |
| def wrapper(*args, **kwargs): | |
| # Generate cache key | |
| key = prediction_cache._generate_key(func.__name__, *args, **kwargs) | |
| # Check cache | |
| cached = prediction_cache.get(key) | |
| if cached is not None: | |
| cached['from_cache'] = True | |
| return cached | |
| # Call function | |
| result = func(*args, **kwargs) | |
| # Cache result | |
| if result: | |
| prediction_cache.set(key, result, ttl) | |
| return result | |
| return wrapper | |
| return decorator | |
| def cache_fixtures(ttl: int = 600): | |
| """Decorator to cache fixture results""" | |
| def decorator(func: Callable): | |
| def wrapper(*args, **kwargs): | |
| key = fixtures_cache._generate_key(func.__name__, *args, **kwargs) | |
| cached = fixtures_cache.get(key) | |
| if cached is not None: | |
| return cached | |
| result = func(*args, **kwargs) | |
| if result: | |
| fixtures_cache.set(key, result, ttl) | |
| return result | |
| return wrapper | |
| return decorator | |
| def invalidate_prediction_cache(home: str = None, away: str = None, league: str = None): | |
| """Invalidate cache entries matching criteria""" | |
| # For simplicity, clear all if specific criteria | |
| if home or away or league: | |
| prediction_cache.clear() | |
| return True | |
| class RealTimeUpdater: | |
| """Real-time update manager for live data""" | |
| def __init__(self): | |
| self.subscribers: Dict[str, list] = {} | |
| self.last_updates: Dict[str, float] = {} | |
| def subscribe(self, channel: str, callback: Callable): | |
| """Subscribe to a channel""" | |
| if channel not in self.subscribers: | |
| self.subscribers[channel] = [] | |
| self.subscribers[channel].append(callback) | |
| def publish(self, channel: str, data: Any): | |
| """Publish data to channel subscribers""" | |
| self.last_updates[channel] = time.time() | |
| if channel in self.subscribers: | |
| for callback in self.subscribers[channel]: | |
| try: | |
| callback(data) | |
| except Exception as e: | |
| print(f"Subscriber error: {e}") | |
| def get_channels(self) -> list: | |
| """Get list of active channels""" | |
| return list(self.subscribers.keys()) | |
| # Global real-time updater | |
| realtime = RealTimeUpdater() | |
| def get_cache_stats() -> Dict: | |
| """Get combined cache statistics""" | |
| return { | |
| 'predictions': prediction_cache.get_stats(), | |
| 'fixtures': fixtures_cache.get_stats(), | |
| 'odds': odds_cache.get_stats(), | |
| 'total_entries': ( | |
| len(prediction_cache._cache) + | |
| len(fixtures_cache._cache) + | |
| len(odds_cache._cache) | |
| ) | |
| } | |