Spaces:
Paused
Paused
File size: 8,252 Bytes
0e0b608 ee77315 f626b65 0e0b608 f626b65 9b7ca40 1829618 f626b65 9b7ca40 f626b65 0e0b608 f626b65 0e0b608 f626b65 0e0b608 f626b65 0e0b608 1829618 0e0b608 f626b65 0e0b608 1829618 0e0b608 f626b65 0e0b608 f626b65 0e0b608 f626b65 0e0b608 f626b65 0e0b608 1829618 9db0f18 f626b65 9db0f18 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
import json
import hashlib
from typing import Dict, Any, Optional
from pathlib import Path
import time
import logging
import aiofiles
import asyncio
from functools import wraps
class CacheManager:
"""
Manager cache'owania wynik贸w analiz z obs艂ug膮 TTL i limitem rozmiaru.
"""
def __init__(self, cache_dir: str = "cache", ttl: int = 3600, max_size_mb: int = 500):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.ttl = ttl
self.max_size_mb = max_size_mb
self.logger = logging.getLogger(__name__)
self.metadata_file = self.cache_dir / "metadata.json"
self._metadata = None
self._initialized = False
def _serialize(self, value: Any) -> Any:
"""
Konwertuje obiekt na format serializowalny (np. s艂ownik).
"""
if hasattr(value, 'to_dict'):
return value.to_dict() # U偶yj metody to_dict, je艣li istnieje
elif isinstance(value, (list, tuple)):
return [self._serialize(item) for item in value] # Rekurencyjnie serializuj listy
elif isinstance(value, dict):
return {k: self._serialize(v) for k, v in value.items()} # Rekurencyjnie serializuj s艂owniki
else:
return value # Pozostaw warto艣ci podstawowe (str, int, float, bool) bez zmian
def _deserialize(self, value: Any) -> Any:
"""
Konwertuje format serializowalny (np. s艂ownik) z powrotem na obiekt.
"""
if isinstance(value, dict) and 'name' in value and 'weight' in value: # Sprawd藕, czy to kryterium
from criteria_analyzer import EvaluationCriterion # Importuj klas臋 lokalnie, aby unikn膮膰 cyklicznych import贸w
return EvaluationCriterion.from_dict(value) # U偶yj metody from_dict, je艣li istnieje
elif isinstance(value, (list, tuple)):
return [self._deserialize(item) for item in value] # Rekurencyjnie deserializuj listy
elif isinstance(value, dict):
return {k: self._deserialize(v) for k, v in value.items()} # Rekurencyjnie deserializuj s艂owniki
else:
return value # Pozostaw warto艣ci podstawowe (str, int, float, bool) bez zmian
async def initialize(self):
"""Inicjalizuje cache je艣li jeszcze nie jest zainicjalizowany"""
if not self._initialized:
try:
if not self.metadata_file.exists():
self._metadata = {
'entries': {},
'total_size': 0,
'last_cleanup': time.time()
}
await self._save_metadata(self._metadata)
else:
async with aiofiles.open(self.metadata_file, 'r') as f:
self._metadata = json.loads(await f.read())
self._initialized = True
except Exception as e:
self.logger.error(f"B艂膮d podczas inicjalizacji cache: {str(e)}")
raise
async def _save_metadata(self, metadata: Dict):
"""Zapisuje metadane cache"""
async with aiofiles.open(self.metadata_file, 'w') as f:
await f.write(json.dumps(metadata, indent=2))
self._metadata = metadata
async def get(self, key: str) -> Optional[Dict]:
"""Pobiera warto艣膰 z cache je艣li istnieje i nie wygas艂a"""
await self.initialize()
try:
entry_meta = self._metadata['entries'].get(key)
if not entry_meta:
return None
if time.time() - entry_meta['timestamp'] > self.ttl:
await self.invalidate(key)
return None
cache_file = self.cache_dir / f"{key}.json"
if not cache_file.exists():
await self.invalidate(key)
return None
async with aiofiles.open(cache_file, 'r') as f:
data = json.loads(await f.read())
return self._deserialize(data) # Deserializuj dane przed zwr贸ceniem
except Exception as e:
self.logger.error(f"B艂膮d podczas pobierania z cache: {str(e)}")
return None
async def set(self, key: str, value: Dict):
"""Zapisuje warto艣膰 do cache"""
await self.initialize()
try:
cache_file = self.cache_dir / f"{key}.json"
serialized_value = self._serialize(value) # Serializuj warto艣膰 przed zapisem
data_str = json.dumps(serialized_value)
size = len(data_str.encode())
self._metadata['entries'][key] = {
'timestamp': time.time(),
'size': size
}
self._metadata['total_size'] = sum(
entry['size'] for entry in self._metadata['entries'].values()
)
if self._metadata['total_size'] > self.max_size_mb * 1024 * 1024:
await self._cleanup_old_entries()
async with aiofiles.open(cache_file, 'w') as f:
await f.write(data_str)
await self._save_metadata(self._metadata)
except Exception as e:
self.logger.error(f"B艂膮d podczas zapisywania do cache: {str(e)}")
async def invalidate(self, key: str):
"""Usuwa wpis z cache"""
await self.initialize()
try:
if key in self._metadata['entries']:
cache_file = self.cache_dir / f"{key}.json"
if cache_file.exists():
cache_file.unlink()
del self._metadata['entries'][key]
await self._save_metadata(self._metadata)
except Exception as e:
self.logger.error(f"B艂膮d podczas usuwania z cache: {str(e)}")
async def invalidate_all(self):
"""Usuwa wszystkie wpisy z cache"""
await self.initialize()
try:
for key in list(self._metadata['entries'].keys()):
await self.invalidate(key)
except Exception as e:
self.logger.error(f"B艂膮d podczas czyszczenia cache: {str(e)}")
async def _cleanup_old_entries(self):
"""Usuwa najstarsze wpisy z cache, je艣li przekroczono limit rozmiaru"""
try:
while self._metadata['total_size'] > self.max_size_mb * 1024 * 1024:
oldest_key = min(
self._metadata['entries'].keys(),
key=lambda k: self._metadata['entries'][k]['timestamp']
)
await self.invalidate(oldest_key)
except Exception as e:
self.logger.error(f"B艂膮d podczas czyszczenia starych wpis贸w: {str(e)}")
def _compute_key(self, *args, **kwargs) -> str:
"""Generuje klucz cache na podstawie argument贸w"""
key_parts = []
# Dodaj nazw臋 funkcji je艣li jest pierwszym argumentem
if args and isinstance(args[0], str):
key_parts.append(args[0])
args = args[1:]
# Dodaj pozosta艂e argumenty
key_parts.extend(str(arg) for arg in args)
# Dodaj posortowane kwargs
key_parts.extend(f"{k}:{v}" for k, v in sorted(kwargs.items()))
# Utw贸rz hash
return hashlib.sha256(''.join(key_parts).encode()).hexdigest()
# Dekorator do cachowania wynik贸w funkcji
def cache_result(ttl: int = 3600):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
if not args or not hasattr(args[0], 'cache'):
return await func(*args, **kwargs)
cache = args[0].cache
cache_key = cache._compute_key(
func.__name__,
*args[1:],
**kwargs
)
result = await cache.get(cache_key)
if result is not None:
return result
result = await func(*args, **kwargs)
await cache.set(cache_key, result)
return result
return wrapper
return decorator |