""" Cache Manager for Similarity Engine Handles JSON caching of analysis results for improved performance """ import json import os import hashlib import logging from datetime import datetime, timedelta from typing import Dict, Any, Optional, List from pathlib import Path # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class SimilarityCacheManager: """Manages caching of similarity analysis results """ def __init__(self, cache_base_dir: str = "cache"): """ Initialize cache manager Args: cache_base_dir: Base directory for cache files """ self.cache_base_dir = Path(cache_base_dir) self.cache_dirs = { 'duplicates': self.cache_base_dir / 'duplicates', 'promo_matches': self.cache_base_dir / 'promo_matches', 'comparisons': self.cache_base_dir / 'comparisons' } # Ensure cache directories exist for cache_dir in self.cache_dirs.values(): cache_dir.mkdir(parents=True, exist_ok=True) logger.info(f"๐Ÿ“ Cache manager initialized with base dir: {self.cache_base_dir}") def generate_cache_key( self, analysis_type: str, products_count: int, threshold: float, algorithm: str = "hybrid", additional_params: Dict = None ) -> str: """ Generate unique cache key for analysis parameters Args: analysis_type: Type of analysis ('duplicates', 'promo', 'comparison') products_count: Number of products in analysis threshold: Similarity threshold used algorithm: Algorithm used additional_params: Any additional parameters to include in key Returns: Unique cache key string """ # Base parameters key_data = { 'type': analysis_type, 'count': products_count, 'threshold': round(threshold, 2), 'algorithm': algorithm, 'date': datetime.now().strftime("%Y%m%d") } # Add additional parameters if provided if additional_params: key_data.update(additional_params) # Create hash from parameters for uniqueness key_string = json.dumps(key_data, sort_keys=True) key_hash = hashlib.md5(key_string.encode()).hexdigest()[:8] # Create readable cache key cache_key = f"{analysis_type}_{products_count}_{int(threshold*100)}_{algorithm}_{key_hash}" logger.debug(f"๐Ÿ”‘ Generated cache key: {cache_key}") return cache_key def get_cache_file_path(self, analysis_type: str, cache_key: str) -> Path: """Get full path for cache file""" cache_dir = self.cache_dirs.get(analysis_type, self.cache_dirs['comparisons']) return cache_dir / f"{cache_key}.json" def save_cache( self, analysis_type: str, cache_key: str, results: Dict[str, Any], parameters: Dict[str, Any], expiry_hours: int = 24 ) -> bool: """ Save analysis results to cache Args: analysis_type: Type of analysis cache_key: Unique cache key results: Analysis results to cache parameters: Parameters used for analysis expiry_hours: Hours until cache expires Returns: True if saved successfully, False otherwise """ try: cache_file = self.get_cache_file_path(analysis_type, cache_key) cache_data = { 'cache_id': cache_key, 'analysis_type': analysis_type, 'created_at': datetime.now().isoformat(), 'expires_at': (datetime.now() + timedelta(hours=expiry_hours)).isoformat(), 'parameters': parameters, 'results': results, 'version': '1.0' } with open(cache_file, 'w', encoding='utf-8') as f: json.dump(cache_data, f, indent=2, ensure_ascii=False) file_size = cache_file.stat().st_size / 1024 # KB logger.info(f"๐Ÿ’พ Saved cache: {cache_key} ({file_size:.1f} KB)") return True except Exception as e: logger.error(f"โŒ Failed to save cache {cache_key}: {e}") return False def load_cache(self, analysis_type: str, cache_key: str) -> Optional[Dict[str, Any]]: """ Load cached analysis results Args: analysis_type: Type of analysis cache_key: Cache key to load Returns: Cached results if valid, None otherwise """ try: cache_file = self.get_cache_file_path(analysis_type, cache_key) if not cache_file.exists(): logger.debug(f"๐Ÿ“ญ Cache miss: {cache_key}") return None with open(cache_file, 'r', encoding='utf-8') as f: cache_data = json.load(f) # Check if cache is expired expiry_time = datetime.fromisoformat(cache_data['expires_at']) if datetime.now() > expiry_time: logger.info(f"โฐ Cache expired: {cache_key}") cache_file.unlink() # Remove expired cache return None logger.info(f"โœ… Cache hit: {cache_key}") return cache_data['results'] except Exception as e: logger.error(f"โŒ Failed to load cache {cache_key}: {e}") return None def is_cache_valid(self, analysis_type: str, cache_key: str) -> bool: """Check if cache exists and is valid""" try: cache_file = self.get_cache_file_path(analysis_type, cache_key) if not cache_file.exists(): return False with open(cache_file, 'r', encoding='utf-8') as f: cache_data = json.load(f) expiry_time = datetime.fromisoformat(cache_data['expires_at']) return datetime.now() <= expiry_time except Exception: return False def clear_cache(self, analysis_type: str = None, older_than_hours: int = None) -> int: """ Clear cached results Args: analysis_type: Specific analysis type to clear, or None for all older_than_hours: Clear cache older than X hours, or None for all Returns: Number of files removed """ removed_count = 0 # Determine which directories to clear dirs_to_clear = [self.cache_dirs[analysis_type]] if analysis_type else self.cache_dirs.values() for cache_dir in dirs_to_clear: if not cache_dir.exists(): continue for cache_file in cache_dir.glob("*.json"): should_remove = False try: if older_than_hours is None: should_remove = True else: # Check file age with open(cache_file, 'r') as f: cache_data = json.load(f) created_time = datetime.fromisoformat(cache_data['created_at']) age_hours = (datetime.now() - created_time).total_seconds() / 3600 if age_hours > older_than_hours: should_remove = True if should_remove: cache_file.unlink() removed_count += 1 logger.info(f"๐Ÿ—‘๏ธ Removed cache: {cache_file.name}") except Exception as e: logger.warning(f"โš ๏ธ Failed to process cache file {cache_file}: {e}") logger.info(f"๐Ÿงน Cache cleanup complete: {removed_count} files removed") return removed_count def get_cache_stats(self) -> Dict[str, Any]: """Get cache statistics""" stats = { 'total_files': 0, 'total_size_mb': 0, 'by_type': {}, 'cache_dirs': {} } for analysis_type, cache_dir in self.cache_dirs.items(): if not cache_dir.exists(): continue type_stats = { 'files': 0, 'size_mb': 0, 'valid_files': 0, 'expired_files': 0 } for cache_file in cache_dir.glob("*.json"): try: file_size = cache_file.stat().st_size / (1024 * 1024) # MB type_stats['files'] += 1 type_stats['size_mb'] += file_size # Check if valid with open(cache_file, 'r') as f: cache_data = json.load(f) expiry_time = datetime.fromisoformat(cache_data['expires_at']) if datetime.now() <= expiry_time: type_stats['valid_files'] += 1 else: type_stats['expired_files'] += 1 except Exception: type_stats['expired_files'] += 1 stats['by_type'][analysis_type] = type_stats stats['total_files'] += type_stats['files'] stats['total_size_mb'] += type_stats['size_mb'] stats['cache_dirs'][analysis_type] = str(cache_dir) return stats def cleanup_expired_cache(self) -> int: """Remove all expired cache files""" return self.clear_cache(older_than_hours=0) # Remove only expired files # Global cache manager instance _cache_manager = None def get_cache_manager() -> SimilarityCacheManager: """Get singleton cache manager instance""" global _cache_manager if _cache_manager is None: _cache_manager = SimilarityCacheManager() return _cache_manager # Convenience functions def cache_duplicate_analysis( products_count: int, threshold: float, results: Dict[str, Any], parameters: Dict[str, Any] ) -> str: """Cache duplicate analysis results""" cache_mgr = get_cache_manager() cache_key = cache_mgr.generate_cache_key('duplicates', products_count, threshold) cache_mgr.save_cache('duplicates', cache_key, results, parameters) return cache_key def load_duplicate_analysis( products_count: int, threshold: float ) -> Optional[Dict[str, Any]]: """Load cached duplicate analysis results""" cache_mgr = get_cache_manager() cache_key = cache_mgr.generate_cache_key('duplicates', products_count, threshold) return cache_mgr.load_cache('duplicates', cache_key) def cache_promo_analysis( promo_count: int, db_count: int, threshold: float, results: Dict[str, Any], parameters: Dict[str, Any] ) -> str: """Cache promo analysis results""" cache_mgr = get_cache_manager() cache_key = cache_mgr.generate_cache_key( 'promo_matches', promo_count + db_count, threshold, additional_params={'promo_count': promo_count, 'db_count': db_count} ) cache_mgr.save_cache('promo_matches', cache_key, results, parameters) return cache_key def load_promo_analysis( promo_count: int, db_count: int, threshold: float ) -> Optional[Dict[str, Any]]: """Load cached promo analysis results""" cache_mgr = get_cache_manager() cache_key = cache_mgr.generate_cache_key( 'promo_matches', promo_count + db_count, threshold, additional_params={'promo_count': promo_count, 'db_count': db_count} ) return cache_mgr.load_cache('promo_matches', cache_key)