Spaces:
Paused
Paused
| import json | |
| import hashlib | |
| from typing import Dict, Any, Optional | |
| from pathlib import Path | |
| import time | |
| import logging | |
| import aiofiles | |
| import asyncio | |
| from functools import wraps | |
| class CacheManager: | |
| """ | |
| Manager cache'owania wynik贸w analiz z obs艂ug膮 TTL i limitem rozmiaru. | |
| """ | |
| def __init__(self, cache_dir: str = "cache", ttl: int = 3600, max_size_mb: int = 500): | |
| self.cache_dir = Path(cache_dir) | |
| self.cache_dir.mkdir(parents=True, exist_ok=True) | |
| self.ttl = ttl | |
| self.max_size_mb = max_size_mb | |
| self.logger = logging.getLogger(__name__) | |
| self.metadata_file = self.cache_dir / "metadata.json" | |
| self._metadata = None | |
| self._initialized = False | |
| def _serialize(self, value: Any) -> Any: | |
| """ | |
| Konwertuje obiekt na format serializowalny (np. s艂ownik). | |
| """ | |
| if hasattr(value, 'to_dict'): | |
| return value.to_dict() # U偶yj metody to_dict, je艣li istnieje | |
| elif isinstance(value, (list, tuple)): | |
| return [self._serialize(item) for item in value] # Rekurencyjnie serializuj listy | |
| elif isinstance(value, dict): | |
| return {k: self._serialize(v) for k, v in value.items()} # Rekurencyjnie serializuj s艂owniki | |
| else: | |
| return value # Pozostaw warto艣ci podstawowe (str, int, float, bool) bez zmian | |
| def _deserialize(self, value: Any) -> Any: | |
| """ | |
| Konwertuje format serializowalny (np. s艂ownik) z powrotem na obiekt. | |
| """ | |
| if isinstance(value, dict) and 'name' in value and 'weight' in value: # Sprawd藕, czy to kryterium | |
| from criteria_analyzer import EvaluationCriterion # Importuj klas臋 lokalnie, aby unikn膮膰 cyklicznych import贸w | |
| return EvaluationCriterion.from_dict(value) # U偶yj metody from_dict, je艣li istnieje | |
| elif isinstance(value, (list, tuple)): | |
| return [self._deserialize(item) for item in value] # Rekurencyjnie deserializuj listy | |
| elif isinstance(value, dict): | |
| return {k: self._deserialize(v) for k, v in value.items()} # Rekurencyjnie deserializuj s艂owniki | |
| else: | |
| return value # Pozostaw warto艣ci podstawowe (str, int, float, bool) bez zmian | |
| async def initialize(self): | |
| """Inicjalizuje cache je艣li jeszcze nie jest zainicjalizowany""" | |
| if not self._initialized: | |
| try: | |
| if not self.metadata_file.exists(): | |
| self._metadata = { | |
| 'entries': {}, | |
| 'total_size': 0, | |
| 'last_cleanup': time.time() | |
| } | |
| await self._save_metadata(self._metadata) | |
| else: | |
| async with aiofiles.open(self.metadata_file, 'r') as f: | |
| self._metadata = json.loads(await f.read()) | |
| self._initialized = True | |
| except Exception as e: | |
| self.logger.error(f"B艂膮d podczas inicjalizacji cache: {str(e)}") | |
| raise | |
| async def _save_metadata(self, metadata: Dict): | |
| """Zapisuje metadane cache""" | |
| async with aiofiles.open(self.metadata_file, 'w') as f: | |
| await f.write(json.dumps(metadata, indent=2)) | |
| self._metadata = metadata | |
| async def get(self, key: str) -> Optional[Dict]: | |
| """Pobiera warto艣膰 z cache je艣li istnieje i nie wygas艂a""" | |
| await self.initialize() | |
| try: | |
| entry_meta = self._metadata['entries'].get(key) | |
| if not entry_meta: | |
| return None | |
| if time.time() - entry_meta['timestamp'] > self.ttl: | |
| await self.invalidate(key) | |
| return None | |
| cache_file = self.cache_dir / f"{key}.json" | |
| if not cache_file.exists(): | |
| await self.invalidate(key) | |
| return None | |
| async with aiofiles.open(cache_file, 'r') as f: | |
| data = json.loads(await f.read()) | |
| return self._deserialize(data) # Deserializuj dane przed zwr贸ceniem | |
| except Exception as e: | |
| self.logger.error(f"B艂膮d podczas pobierania z cache: {str(e)}") | |
| return None | |
| async def set(self, key: str, value: Dict): | |
| """Zapisuje warto艣膰 do cache""" | |
| await self.initialize() | |
| try: | |
| cache_file = self.cache_dir / f"{key}.json" | |
| serialized_value = self._serialize(value) # Serializuj warto艣膰 przed zapisem | |
| data_str = json.dumps(serialized_value) | |
| size = len(data_str.encode()) | |
| self._metadata['entries'][key] = { | |
| 'timestamp': time.time(), | |
| 'size': size | |
| } | |
| self._metadata['total_size'] = sum( | |
| entry['size'] for entry in self._metadata['entries'].values() | |
| ) | |
| if self._metadata['total_size'] > self.max_size_mb * 1024 * 1024: | |
| await self._cleanup_old_entries() | |
| async with aiofiles.open(cache_file, 'w') as f: | |
| await f.write(data_str) | |
| await self._save_metadata(self._metadata) | |
| except Exception as e: | |
| self.logger.error(f"B艂膮d podczas zapisywania do cache: {str(e)}") | |
| async def invalidate(self, key: str): | |
| """Usuwa wpis z cache""" | |
| await self.initialize() | |
| try: | |
| if key in self._metadata['entries']: | |
| cache_file = self.cache_dir / f"{key}.json" | |
| if cache_file.exists(): | |
| cache_file.unlink() | |
| del self._metadata['entries'][key] | |
| await self._save_metadata(self._metadata) | |
| except Exception as e: | |
| self.logger.error(f"B艂膮d podczas usuwania z cache: {str(e)}") | |
| async def invalidate_all(self): | |
| """Usuwa wszystkie wpisy z cache""" | |
| await self.initialize() | |
| try: | |
| for key in list(self._metadata['entries'].keys()): | |
| await self.invalidate(key) | |
| except Exception as e: | |
| self.logger.error(f"B艂膮d podczas czyszczenia cache: {str(e)}") | |
| async def _cleanup_old_entries(self): | |
| """Usuwa najstarsze wpisy z cache, je艣li przekroczono limit rozmiaru""" | |
| try: | |
| while self._metadata['total_size'] > self.max_size_mb * 1024 * 1024: | |
| oldest_key = min( | |
| self._metadata['entries'].keys(), | |
| key=lambda k: self._metadata['entries'][k]['timestamp'] | |
| ) | |
| await self.invalidate(oldest_key) | |
| except Exception as e: | |
| self.logger.error(f"B艂膮d podczas czyszczenia starych wpis贸w: {str(e)}") | |
| def _compute_key(self, *args, **kwargs) -> str: | |
| """Generuje klucz cache na podstawie argument贸w""" | |
| key_parts = [] | |
| # Dodaj nazw臋 funkcji je艣li jest pierwszym argumentem | |
| if args and isinstance(args[0], str): | |
| key_parts.append(args[0]) | |
| args = args[1:] | |
| # Dodaj pozosta艂e argumenty | |
| key_parts.extend(str(arg) for arg in args) | |
| # Dodaj posortowane kwargs | |
| key_parts.extend(f"{k}:{v}" for k, v in sorted(kwargs.items())) | |
| # Utw贸rz hash | |
| return hashlib.sha256(''.join(key_parts).encode()).hexdigest() | |
| # Dekorator do cachowania wynik贸w funkcji | |
| def cache_result(ttl: int = 3600): | |
| def decorator(func): | |
| async def wrapper(*args, **kwargs): | |
| if not args or not hasattr(args[0], 'cache'): | |
| return await func(*args, **kwargs) | |
| cache = args[0].cache | |
| cache_key = cache._compute_key( | |
| func.__name__, | |
| *args[1:], | |
| **kwargs | |
| ) | |
| result = await cache.get(cache_key) | |
| if result is not None: | |
| return result | |
| result = await func(*args, **kwargs) | |
| await cache.set(cache_key, result) | |
| return result | |
| return wrapper | |
| return decorator |