footypredict-pro / src /cache_system.py
NetBoss
V3.0 Ultimate Enhancement - Complete production system
6f7e932
"""
Enhanced Caching System for Predictions
Provides intelligent caching with:
- Redis-compatible in-memory cache
- TTL-based expiration
- Automatic cache invalidation
- Performance metrics
"""
import time
import hashlib
import json
from datetime import datetime, timedelta
from typing import Any, Optional, Dict, Callable
from functools import wraps
import threading
class PredictionCache:
"""High-performance prediction cache with TTL support"""
def __init__(self, default_ttl: int = 300): # 5 minutes default
self._cache: Dict[str, Dict] = {}
self._lock = threading.RLock()
self.default_ttl = default_ttl
self.hits = 0
self.misses = 0
def _generate_key(self, *args, **kwargs) -> str:
"""Generate cache key from args"""
key_data = json.dumps({'args': args, 'kwargs': kwargs}, sort_keys=True)
return hashlib.md5(key_data.encode()).hexdigest()
def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
with self._lock:
if key in self._cache:
entry = self._cache[key]
if entry['expires'] > time.time():
self.hits += 1
return entry['value']
else:
del self._cache[key]
self.misses += 1
return None
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
"""Set value in cache with TTL"""
with self._lock:
self._cache[key] = {
'value': value,
'expires': time.time() + (ttl or self.default_ttl),
'created': time.time()
}
def delete(self, key: str) -> bool:
"""Delete key from cache"""
with self._lock:
if key in self._cache:
del self._cache[key]
return True
return False
def clear(self) -> int:
"""Clear all cache entries"""
with self._lock:
count = len(self._cache)
self._cache.clear()
return count
def cleanup_expired(self) -> int:
"""Remove expired entries"""
with self._lock:
now = time.time()
expired = [k for k, v in self._cache.items() if v['expires'] <= now]
for key in expired:
del self._cache[key]
return len(expired)
def get_stats(self) -> Dict:
"""Get cache statistics"""
with self._lock:
total_requests = self.hits + self.misses
hit_rate = (self.hits / total_requests * 100) if total_requests > 0 else 0
return {
'entries': len(self._cache),
'hits': self.hits,
'misses': self.misses,
'hit_rate': round(hit_rate, 2),
'memory_usage': self._estimate_memory()
}
def _estimate_memory(self) -> str:
"""Estimate memory usage"""
try:
import sys
size = sys.getsizeof(self._cache)
for v in self._cache.values():
size += sys.getsizeof(v)
if size < 1024:
return f"{size} B"
elif size < 1024 * 1024:
return f"{size / 1024:.1f} KB"
else:
return f"{size / (1024 * 1024):.1f} MB"
except:
return "Unknown"
# Global cache instances
prediction_cache = PredictionCache(default_ttl=300) # 5 min for predictions
fixtures_cache = PredictionCache(default_ttl=600) # 10 min for fixtures
odds_cache = PredictionCache(default_ttl=60) # 1 min for odds
def cache_prediction(ttl: int = 300):
"""Decorator to cache prediction results"""
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
# Generate cache key
key = prediction_cache._generate_key(func.__name__, *args, **kwargs)
# Check cache
cached = prediction_cache.get(key)
if cached is not None:
cached['from_cache'] = True
return cached
# Call function
result = func(*args, **kwargs)
# Cache result
if result:
prediction_cache.set(key, result, ttl)
return result
return wrapper
return decorator
def cache_fixtures(ttl: int = 600):
"""Decorator to cache fixture results"""
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
key = fixtures_cache._generate_key(func.__name__, *args, **kwargs)
cached = fixtures_cache.get(key)
if cached is not None:
return cached
result = func(*args, **kwargs)
if result:
fixtures_cache.set(key, result, ttl)
return result
return wrapper
return decorator
def invalidate_prediction_cache(home: str = None, away: str = None, league: str = None):
"""Invalidate cache entries matching criteria"""
# For simplicity, clear all if specific criteria
if home or away or league:
prediction_cache.clear()
return True
class RealTimeUpdater:
"""Real-time update manager for live data"""
def __init__(self):
self.subscribers: Dict[str, list] = {}
self.last_updates: Dict[str, float] = {}
def subscribe(self, channel: str, callback: Callable):
"""Subscribe to a channel"""
if channel not in self.subscribers:
self.subscribers[channel] = []
self.subscribers[channel].append(callback)
def publish(self, channel: str, data: Any):
"""Publish data to channel subscribers"""
self.last_updates[channel] = time.time()
if channel in self.subscribers:
for callback in self.subscribers[channel]:
try:
callback(data)
except Exception as e:
print(f"Subscriber error: {e}")
def get_channels(self) -> list:
"""Get list of active channels"""
return list(self.subscribers.keys())
# Global real-time updater
realtime = RealTimeUpdater()
def get_cache_stats() -> Dict:
"""Get combined cache statistics"""
return {
'predictions': prediction_cache.get_stats(),
'fixtures': fixtures_cache.get_stats(),
'odds': odds_cache.get_stats(),
'total_entries': (
len(prediction_cache._cache) +
len(fixtures_cache._cache) +
len(odds_cache._cache)
)
}