Spaces:
Runtime error
Runtime error
File size: 2,590 Bytes
0e805d4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
"""Result caching system for GPU quota savings."""
import hashlib
import time
from pathlib import Path
from typing import Optional, Tuple
import shutil
from core.config import CACHE_EXPIRY_HOURS
class CacheManager:
"""Manages result caching to save GPU quota."""
def __init__(self, cache_dir: str = "cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self.expiry_seconds = CACHE_EXPIRY_HOURS * 3600
def get_cache_key(self, prompt: str, quality: str, user_id: str = "default") -> str:
"""Generate unique cache key from parameters."""
# Include hour timestamp for cache invalidation
timestamp = int(time.time() / 3600)
key_string = f"{user_id}_{prompt}_{quality}_{timestamp}"
return hashlib.sha256(key_string.encode()).hexdigest()
def get_cached_result(self, prompt: str, quality: str) -> Optional[Path]:
"""Check if result exists in cache."""
cache_key = self.get_cache_key(prompt, quality)
cache_path = self.cache_dir / f"{cache_key}.glb"
if cache_path.exists():
file_age = time.time() - cache_path.stat().st_mtime
if file_age < self.expiry_seconds:
print(f"[Cache] Hit: {cache_path.name}")
return cache_path
else:
# Expired, remove it
cache_path.unlink()
print(f"[Cache] Expired: {cache_path.name}")
print(f"[Cache] Miss: {cache_key}")
return None
def save_to_cache(self, prompt: str, quality: str, result_path: Path) -> None:
"""Save generation result to cache."""
cache_key = self.get_cache_key(prompt, quality)
cache_path = self.cache_dir / f"{cache_key}.glb"
shutil.copy(result_path, cache_path)
print(f"[Cache] Saved: {cache_path.name}")
def cleanup_old_cache(self) -> Tuple[int, float]:
"""Remove expired cache files."""
removed_count = 0
removed_size_mb = 0.0
for cache_file in self.cache_dir.glob("*.glb"):
file_age = time.time() - cache_file.stat().st_mtime
if file_age > self.expiry_seconds:
size_mb = cache_file.stat().st_size / 1e6
cache_file.unlink()
removed_count += 1
removed_size_mb += size_mb
if removed_count > 0:
print(f"[Cache] Cleaned up {removed_count} files ({removed_size_mb:.2f} MB)")
return removed_count, removed_size_mb
|