import json import hashlib from typing import Dict, Any, Optional from pathlib import Path import time import logging import aiofiles import asyncio from functools import wraps class CacheManager: """ Manager cache'owania wyników analiz z obsługą TTL i limitem rozmiaru. """ def __init__(self, cache_dir: str = "cache", ttl: int = 3600, max_size_mb: int = 500): self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) self.ttl = ttl self.max_size_mb = max_size_mb self.logger = logging.getLogger(__name__) self.metadata_file = self.cache_dir / "metadata.json" self._metadata = None self._initialized = False def _serialize(self, value: Any) -> Any: """ Konwertuje obiekt na format serializowalny (np. słownik). """ if hasattr(value, 'to_dict'): return value.to_dict() # Użyj metody to_dict, jeśli istnieje elif isinstance(value, (list, tuple)): return [self._serialize(item) for item in value] # Rekurencyjnie serializuj listy elif isinstance(value, dict): return {k: self._serialize(v) for k, v in value.items()} # Rekurencyjnie serializuj słowniki else: return value # Pozostaw wartości podstawowe (str, int, float, bool) bez zmian def _deserialize(self, value: Any) -> Any: """ Konwertuje format serializowalny (np. słownik) z powrotem na obiekt. """ if isinstance(value, dict) and 'name' in value and 'weight' in value: # Sprawdź, czy to kryterium from criteria_analyzer import EvaluationCriterion # Importuj klasę lokalnie, aby uniknąć cyklicznych importów return EvaluationCriterion.from_dict(value) # Użyj metody from_dict, jeśli istnieje elif isinstance(value, (list, tuple)): return [self._deserialize(item) for item in value] # Rekurencyjnie deserializuj listy elif isinstance(value, dict): return {k: self._deserialize(v) for k, v in value.items()} # Rekurencyjnie deserializuj słowniki else: return value # Pozostaw wartości podstawowe (str, int, float, bool) bez zmian async def initialize(self): """Inicjalizuje cache jeśli jeszcze nie jest zainicjalizowany""" if not self._initialized: try: if not self.metadata_file.exists(): self._metadata = { 'entries': {}, 'total_size': 0, 'last_cleanup': time.time() } await self._save_metadata(self._metadata) else: async with aiofiles.open(self.metadata_file, 'r') as f: self._metadata = json.loads(await f.read()) self._initialized = True except Exception as e: self.logger.error(f"Błąd podczas inicjalizacji cache: {str(e)}") raise async def _save_metadata(self, metadata: Dict): """Zapisuje metadane cache""" async with aiofiles.open(self.metadata_file, 'w') as f: await f.write(json.dumps(metadata, indent=2)) self._metadata = metadata async def get(self, key: str) -> Optional[Dict]: """Pobiera wartość z cache jeśli istnieje i nie wygasła""" await self.initialize() try: entry_meta = self._metadata['entries'].get(key) if not entry_meta: return None if time.time() - entry_meta['timestamp'] > self.ttl: await self.invalidate(key) return None cache_file = self.cache_dir / f"{key}.json" if not cache_file.exists(): await self.invalidate(key) return None async with aiofiles.open(cache_file, 'r') as f: data = json.loads(await f.read()) return self._deserialize(data) # Deserializuj dane przed zwróceniem except Exception as e: self.logger.error(f"Błąd podczas pobierania z cache: {str(e)}") return None async def set(self, key: str, value: Dict): """Zapisuje wartość do cache""" await self.initialize() try: cache_file = self.cache_dir / f"{key}.json" serialized_value = self._serialize(value) # Serializuj wartość przed zapisem data_str = json.dumps(serialized_value) size = len(data_str.encode()) self._metadata['entries'][key] = { 'timestamp': time.time(), 'size': size } self._metadata['total_size'] = sum( entry['size'] for entry in self._metadata['entries'].values() ) if self._metadata['total_size'] > self.max_size_mb * 1024 * 1024: await self._cleanup_old_entries() async with aiofiles.open(cache_file, 'w') as f: await f.write(data_str) await self._save_metadata(self._metadata) except Exception as e: self.logger.error(f"Błąd podczas zapisywania do cache: {str(e)}") async def invalidate(self, key: str): """Usuwa wpis z cache""" await self.initialize() try: if key in self._metadata['entries']: cache_file = self.cache_dir / f"{key}.json" if cache_file.exists(): cache_file.unlink() del self._metadata['entries'][key] await self._save_metadata(self._metadata) except Exception as e: self.logger.error(f"Błąd podczas usuwania z cache: {str(e)}") async def invalidate_all(self): """Usuwa wszystkie wpisy z cache""" await self.initialize() try: for key in list(self._metadata['entries'].keys()): await self.invalidate(key) except Exception as e: self.logger.error(f"Błąd podczas czyszczenia cache: {str(e)}") async def _cleanup_old_entries(self): """Usuwa najstarsze wpisy z cache, jeśli przekroczono limit rozmiaru""" try: while self._metadata['total_size'] > self.max_size_mb * 1024 * 1024: oldest_key = min( self._metadata['entries'].keys(), key=lambda k: self._metadata['entries'][k]['timestamp'] ) await self.invalidate(oldest_key) except Exception as e: self.logger.error(f"Błąd podczas czyszczenia starych wpisów: {str(e)}") def _compute_key(self, *args, **kwargs) -> str: """Generuje klucz cache na podstawie argumentów""" key_parts = [] # Dodaj nazwę funkcji jeśli jest pierwszym argumentem if args and isinstance(args[0], str): key_parts.append(args[0]) args = args[1:] # Dodaj pozostałe argumenty key_parts.extend(str(arg) for arg in args) # Dodaj posortowane kwargs key_parts.extend(f"{k}:{v}" for k, v in sorted(kwargs.items())) # Utwórz hash return hashlib.sha256(''.join(key_parts).encode()).hexdigest() # Dekorator do cachowania wyników funkcji def cache_result(ttl: int = 3600): def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): if not args or not hasattr(args[0], 'cache'): return await func(*args, **kwargs) cache = args[0].cache cache_key = cache._compute_key( func.__name__, *args[1:], **kwargs ) result = await cache.get(cache_key) if result is not None: return result result = await func(*args, **kwargs) await cache.set(cache_key, result) return result return wrapper return decorator