WhizTenderBot1.0 / cache_manager.py
Marek4321's picture
Update cache_manager.py
1829618 verified
import json
import hashlib
from typing import Dict, Any, Optional
from pathlib import Path
import time
import logging
import aiofiles
import asyncio
from functools import wraps
class CacheManager:
"""
Manager cache'owania wynik贸w analiz z obs艂ug膮 TTL i limitem rozmiaru.
"""
def __init__(self, cache_dir: str = "cache", ttl: int = 3600, max_size_mb: int = 500):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.ttl = ttl
self.max_size_mb = max_size_mb
self.logger = logging.getLogger(__name__)
self.metadata_file = self.cache_dir / "metadata.json"
self._metadata = None
self._initialized = False
def _serialize(self, value: Any) -> Any:
"""
Konwertuje obiekt na format serializowalny (np. s艂ownik).
"""
if hasattr(value, 'to_dict'):
return value.to_dict() # U偶yj metody to_dict, je艣li istnieje
elif isinstance(value, (list, tuple)):
return [self._serialize(item) for item in value] # Rekurencyjnie serializuj listy
elif isinstance(value, dict):
return {k: self._serialize(v) for k, v in value.items()} # Rekurencyjnie serializuj s艂owniki
else:
return value # Pozostaw warto艣ci podstawowe (str, int, float, bool) bez zmian
def _deserialize(self, value: Any) -> Any:
"""
Konwertuje format serializowalny (np. s艂ownik) z powrotem na obiekt.
"""
if isinstance(value, dict) and 'name' in value and 'weight' in value: # Sprawd藕, czy to kryterium
from criteria_analyzer import EvaluationCriterion # Importuj klas臋 lokalnie, aby unikn膮膰 cyklicznych import贸w
return EvaluationCriterion.from_dict(value) # U偶yj metody from_dict, je艣li istnieje
elif isinstance(value, (list, tuple)):
return [self._deserialize(item) for item in value] # Rekurencyjnie deserializuj listy
elif isinstance(value, dict):
return {k: self._deserialize(v) for k, v in value.items()} # Rekurencyjnie deserializuj s艂owniki
else:
return value # Pozostaw warto艣ci podstawowe (str, int, float, bool) bez zmian
async def initialize(self):
"""Inicjalizuje cache je艣li jeszcze nie jest zainicjalizowany"""
if not self._initialized:
try:
if not self.metadata_file.exists():
self._metadata = {
'entries': {},
'total_size': 0,
'last_cleanup': time.time()
}
await self._save_metadata(self._metadata)
else:
async with aiofiles.open(self.metadata_file, 'r') as f:
self._metadata = json.loads(await f.read())
self._initialized = True
except Exception as e:
self.logger.error(f"B艂膮d podczas inicjalizacji cache: {str(e)}")
raise
async def _save_metadata(self, metadata: Dict):
"""Zapisuje metadane cache"""
async with aiofiles.open(self.metadata_file, 'w') as f:
await f.write(json.dumps(metadata, indent=2))
self._metadata = metadata
async def get(self, key: str) -> Optional[Dict]:
"""Pobiera warto艣膰 z cache je艣li istnieje i nie wygas艂a"""
await self.initialize()
try:
entry_meta = self._metadata['entries'].get(key)
if not entry_meta:
return None
if time.time() - entry_meta['timestamp'] > self.ttl:
await self.invalidate(key)
return None
cache_file = self.cache_dir / f"{key}.json"
if not cache_file.exists():
await self.invalidate(key)
return None
async with aiofiles.open(cache_file, 'r') as f:
data = json.loads(await f.read())
return self._deserialize(data) # Deserializuj dane przed zwr贸ceniem
except Exception as e:
self.logger.error(f"B艂膮d podczas pobierania z cache: {str(e)}")
return None
async def set(self, key: str, value: Dict):
"""Zapisuje warto艣膰 do cache"""
await self.initialize()
try:
cache_file = self.cache_dir / f"{key}.json"
serialized_value = self._serialize(value) # Serializuj warto艣膰 przed zapisem
data_str = json.dumps(serialized_value)
size = len(data_str.encode())
self._metadata['entries'][key] = {
'timestamp': time.time(),
'size': size
}
self._metadata['total_size'] = sum(
entry['size'] for entry in self._metadata['entries'].values()
)
if self._metadata['total_size'] > self.max_size_mb * 1024 * 1024:
await self._cleanup_old_entries()
async with aiofiles.open(cache_file, 'w') as f:
await f.write(data_str)
await self._save_metadata(self._metadata)
except Exception as e:
self.logger.error(f"B艂膮d podczas zapisywania do cache: {str(e)}")
async def invalidate(self, key: str):
"""Usuwa wpis z cache"""
await self.initialize()
try:
if key in self._metadata['entries']:
cache_file = self.cache_dir / f"{key}.json"
if cache_file.exists():
cache_file.unlink()
del self._metadata['entries'][key]
await self._save_metadata(self._metadata)
except Exception as e:
self.logger.error(f"B艂膮d podczas usuwania z cache: {str(e)}")
async def invalidate_all(self):
"""Usuwa wszystkie wpisy z cache"""
await self.initialize()
try:
for key in list(self._metadata['entries'].keys()):
await self.invalidate(key)
except Exception as e:
self.logger.error(f"B艂膮d podczas czyszczenia cache: {str(e)}")
async def _cleanup_old_entries(self):
"""Usuwa najstarsze wpisy z cache, je艣li przekroczono limit rozmiaru"""
try:
while self._metadata['total_size'] > self.max_size_mb * 1024 * 1024:
oldest_key = min(
self._metadata['entries'].keys(),
key=lambda k: self._metadata['entries'][k]['timestamp']
)
await self.invalidate(oldest_key)
except Exception as e:
self.logger.error(f"B艂膮d podczas czyszczenia starych wpis贸w: {str(e)}")
def _compute_key(self, *args, **kwargs) -> str:
"""Generuje klucz cache na podstawie argument贸w"""
key_parts = []
# Dodaj nazw臋 funkcji je艣li jest pierwszym argumentem
if args and isinstance(args[0], str):
key_parts.append(args[0])
args = args[1:]
# Dodaj pozosta艂e argumenty
key_parts.extend(str(arg) for arg in args)
# Dodaj posortowane kwargs
key_parts.extend(f"{k}:{v}" for k, v in sorted(kwargs.items()))
# Utw贸rz hash
return hashlib.sha256(''.join(key_parts).encode()).hexdigest()
# Dekorator do cachowania wynik贸w funkcji
def cache_result(ttl: int = 3600):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
if not args or not hasattr(args[0], 'cache'):
return await func(*args, **kwargs)
cache = args[0].cache
cache_key = cache._compute_key(
func.__name__,
*args[1:],
**kwargs
)
result = await cache.get(cache_key)
if result is not None:
return result
result = await func(*args, **kwargs)
await cache.set(cache_key, result)
return result
return wrapper
return decorator