Spaces:
Runtime error
Runtime error
| """ | |
| Cache Manager for Similarity Engine | |
| Handles JSON caching of analysis results for improved performance | |
| """ | |
| import json | |
| import os | |
| import hashlib | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import Dict, Any, Optional, List | |
| from pathlib import Path | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class SimilarityCacheManager: | |
| """Manages caching of similarity analysis results """ | |
| def __init__(self, cache_base_dir: str = "cache"): | |
| """ | |
| Initialize cache manager | |
| Args: | |
| cache_base_dir: Base directory for cache files | |
| """ | |
| self.cache_base_dir = Path(cache_base_dir) | |
| self.cache_dirs = { | |
| 'duplicates': self.cache_base_dir / 'duplicates', | |
| 'promo_matches': self.cache_base_dir / 'promo_matches', | |
| 'comparisons': self.cache_base_dir / 'comparisons' | |
| } | |
| # Ensure cache directories exist | |
| for cache_dir in self.cache_dirs.values(): | |
| cache_dir.mkdir(parents=True, exist_ok=True) | |
| logger.info(f"📁 Cache manager initialized with base dir: {self.cache_base_dir}") | |
| def generate_cache_key( | |
| self, | |
| analysis_type: str, | |
| products_count: int, | |
| threshold: float, | |
| algorithm: str = "hybrid", | |
| additional_params: Dict = None | |
| ) -> str: | |
| """ | |
| Generate unique cache key for analysis parameters | |
| Args: | |
| analysis_type: Type of analysis ('duplicates', 'promo', 'comparison') | |
| products_count: Number of products in analysis | |
| threshold: Similarity threshold used | |
| algorithm: Algorithm used | |
| additional_params: Any additional parameters to include in key | |
| Returns: | |
| Unique cache key string | |
| """ | |
| # Base parameters | |
| key_data = { | |
| 'type': analysis_type, | |
| 'count': products_count, | |
| 'threshold': round(threshold, 2), | |
| 'algorithm': algorithm, | |
| 'date': datetime.now().strftime("%Y%m%d") | |
| } | |
| # Add additional parameters if provided | |
| if additional_params: | |
| key_data.update(additional_params) | |
| # Create hash from parameters for uniqueness | |
| key_string = json.dumps(key_data, sort_keys=True) | |
| key_hash = hashlib.md5(key_string.encode()).hexdigest()[:8] | |
| # Create readable cache key | |
| cache_key = f"{analysis_type}_{products_count}_{int(threshold*100)}_{algorithm}_{key_hash}" | |
| logger.debug(f"🔑 Generated cache key: {cache_key}") | |
| return cache_key | |
| def get_cache_file_path(self, analysis_type: str, cache_key: str) -> Path: | |
| """Get full path for cache file""" | |
| cache_dir = self.cache_dirs.get(analysis_type, self.cache_dirs['comparisons']) | |
| return cache_dir / f"{cache_key}.json" | |
| def save_cache( | |
| self, | |
| analysis_type: str, | |
| cache_key: str, | |
| results: Dict[str, Any], | |
| parameters: Dict[str, Any], | |
| expiry_hours: int = 24 | |
| ) -> bool: | |
| """ | |
| Save analysis results to cache | |
| Args: | |
| analysis_type: Type of analysis | |
| cache_key: Unique cache key | |
| results: Analysis results to cache | |
| parameters: Parameters used for analysis | |
| expiry_hours: Hours until cache expires | |
| Returns: | |
| True if saved successfully, False otherwise | |
| """ | |
| try: | |
| cache_file = self.get_cache_file_path(analysis_type, cache_key) | |
| cache_data = { | |
| 'cache_id': cache_key, | |
| 'analysis_type': analysis_type, | |
| 'created_at': datetime.now().isoformat(), | |
| 'expires_at': (datetime.now() + timedelta(hours=expiry_hours)).isoformat(), | |
| 'parameters': parameters, | |
| 'results': results, | |
| 'version': '1.0' | |
| } | |
| with open(cache_file, 'w', encoding='utf-8') as f: | |
| json.dump(cache_data, f, indent=2, ensure_ascii=False) | |
| file_size = cache_file.stat().st_size / 1024 # KB | |
| logger.info(f"💾 Saved cache: {cache_key} ({file_size:.1f} KB)") | |
| return True | |
| except Exception as e: | |
| logger.error(f"❌ Failed to save cache {cache_key}: {e}") | |
| return False | |
| def load_cache(self, analysis_type: str, cache_key: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Load cached analysis results | |
| Args: | |
| analysis_type: Type of analysis | |
| cache_key: Cache key to load | |
| Returns: | |
| Cached results if valid, None otherwise | |
| """ | |
| try: | |
| cache_file = self.get_cache_file_path(analysis_type, cache_key) | |
| if not cache_file.exists(): | |
| logger.debug(f"📭 Cache miss: {cache_key}") | |
| return None | |
| with open(cache_file, 'r', encoding='utf-8') as f: | |
| cache_data = json.load(f) | |
| # Check if cache is expired | |
| expiry_time = datetime.fromisoformat(cache_data['expires_at']) | |
| if datetime.now() > expiry_time: | |
| logger.info(f"⏰ Cache expired: {cache_key}") | |
| cache_file.unlink() # Remove expired cache | |
| return None | |
| logger.info(f"✅ Cache hit: {cache_key}") | |
| return cache_data['results'] | |
| except Exception as e: | |
| logger.error(f"❌ Failed to load cache {cache_key}: {e}") | |
| return None | |
| def is_cache_valid(self, analysis_type: str, cache_key: str) -> bool: | |
| """Check if cache exists and is valid""" | |
| try: | |
| cache_file = self.get_cache_file_path(analysis_type, cache_key) | |
| if not cache_file.exists(): | |
| return False | |
| with open(cache_file, 'r', encoding='utf-8') as f: | |
| cache_data = json.load(f) | |
| expiry_time = datetime.fromisoformat(cache_data['expires_at']) | |
| return datetime.now() <= expiry_time | |
| except Exception: | |
| return False | |
| def clear_cache(self, analysis_type: str = None, older_than_hours: int = None) -> int: | |
| """ | |
| Clear cached results | |
| Args: | |
| analysis_type: Specific analysis type to clear, or None for all | |
| older_than_hours: Clear cache older than X hours, or None for all | |
| Returns: | |
| Number of files removed | |
| """ | |
| removed_count = 0 | |
| # Determine which directories to clear | |
| dirs_to_clear = [self.cache_dirs[analysis_type]] if analysis_type else self.cache_dirs.values() | |
| for cache_dir in dirs_to_clear: | |
| if not cache_dir.exists(): | |
| continue | |
| for cache_file in cache_dir.glob("*.json"): | |
| should_remove = False | |
| try: | |
| if older_than_hours is None: | |
| should_remove = True | |
| else: | |
| # Check file age | |
| with open(cache_file, 'r') as f: | |
| cache_data = json.load(f) | |
| created_time = datetime.fromisoformat(cache_data['created_at']) | |
| age_hours = (datetime.now() - created_time).total_seconds() / 3600 | |
| if age_hours > older_than_hours: | |
| should_remove = True | |
| if should_remove: | |
| cache_file.unlink() | |
| removed_count += 1 | |
| logger.info(f"🗑️ Removed cache: {cache_file.name}") | |
| except Exception as e: | |
| logger.warning(f"⚠️ Failed to process cache file {cache_file}: {e}") | |
| logger.info(f"🧹 Cache cleanup complete: {removed_count} files removed") | |
| return removed_count | |
| def get_cache_stats(self) -> Dict[str, Any]: | |
| """Get cache statistics""" | |
| stats = { | |
| 'total_files': 0, | |
| 'total_size_mb': 0, | |
| 'by_type': {}, | |
| 'cache_dirs': {} | |
| } | |
| for analysis_type, cache_dir in self.cache_dirs.items(): | |
| if not cache_dir.exists(): | |
| continue | |
| type_stats = { | |
| 'files': 0, | |
| 'size_mb': 0, | |
| 'valid_files': 0, | |
| 'expired_files': 0 | |
| } | |
| for cache_file in cache_dir.glob("*.json"): | |
| try: | |
| file_size = cache_file.stat().st_size / (1024 * 1024) # MB | |
| type_stats['files'] += 1 | |
| type_stats['size_mb'] += file_size | |
| # Check if valid | |
| with open(cache_file, 'r') as f: | |
| cache_data = json.load(f) | |
| expiry_time = datetime.fromisoformat(cache_data['expires_at']) | |
| if datetime.now() <= expiry_time: | |
| type_stats['valid_files'] += 1 | |
| else: | |
| type_stats['expired_files'] += 1 | |
| except Exception: | |
| type_stats['expired_files'] += 1 | |
| stats['by_type'][analysis_type] = type_stats | |
| stats['total_files'] += type_stats['files'] | |
| stats['total_size_mb'] += type_stats['size_mb'] | |
| stats['cache_dirs'][analysis_type] = str(cache_dir) | |
| return stats | |
| def cleanup_expired_cache(self) -> int: | |
| """Remove all expired cache files""" | |
| return self.clear_cache(older_than_hours=0) # Remove only expired files | |
| # Global cache manager instance | |
| _cache_manager = None | |
| def get_cache_manager() -> SimilarityCacheManager: | |
| """Get singleton cache manager instance""" | |
| global _cache_manager | |
| if _cache_manager is None: | |
| _cache_manager = SimilarityCacheManager() | |
| return _cache_manager | |
| # Convenience functions | |
| def cache_duplicate_analysis( | |
| products_count: int, | |
| threshold: float, | |
| results: Dict[str, Any], | |
| parameters: Dict[str, Any] | |
| ) -> str: | |
| """Cache duplicate analysis results""" | |
| cache_mgr = get_cache_manager() | |
| cache_key = cache_mgr.generate_cache_key('duplicates', products_count, threshold) | |
| cache_mgr.save_cache('duplicates', cache_key, results, parameters) | |
| return cache_key | |
| def load_duplicate_analysis( | |
| products_count: int, | |
| threshold: float | |
| ) -> Optional[Dict[str, Any]]: | |
| """Load cached duplicate analysis results""" | |
| cache_mgr = get_cache_manager() | |
| cache_key = cache_mgr.generate_cache_key('duplicates', products_count, threshold) | |
| return cache_mgr.load_cache('duplicates', cache_key) | |
| def cache_promo_analysis( | |
| promo_count: int, | |
| db_count: int, | |
| threshold: float, | |
| results: Dict[str, Any], | |
| parameters: Dict[str, Any] | |
| ) -> str: | |
| """Cache promo analysis results""" | |
| cache_mgr = get_cache_manager() | |
| cache_key = cache_mgr.generate_cache_key( | |
| 'promo_matches', | |
| promo_count + db_count, | |
| threshold, | |
| additional_params={'promo_count': promo_count, 'db_count': db_count} | |
| ) | |
| cache_mgr.save_cache('promo_matches', cache_key, results, parameters) | |
| return cache_key | |
| def load_promo_analysis( | |
| promo_count: int, | |
| db_count: int, | |
| threshold: float | |
| ) -> Optional[Dict[str, Any]]: | |
| """Load cached promo analysis results""" | |
| cache_mgr = get_cache_manager() | |
| cache_key = cache_mgr.generate_cache_key( | |
| 'promo_matches', | |
| promo_count + db_count, | |
| threshold, | |
| additional_params={'promo_count': promo_count, 'db_count': db_count} | |
| ) | |
| return cache_mgr.load_cache('promo_matches', cache_key) | |