""" Cache cleanup utility for orphaned function versions. This module is now focused on MAINTENANCE rather than active cache invalidation: - Identifies orphaned cache entries from old function versions - Provides size-based and age-based cleanup - Works as a scheduled/manual maintenance tool It NO LONGER does active change detection (that's handled by auto_versioning). """ import hashlib import inspect import json import os import time from collections import defaultdict from pathlib import Path from typing import Dict, List, Optional, Union from loguru import logger class FunctionVersionTracker: """ Tracks function versions to identify orphaned caches. This is now a PASSIVE tracker - it records current versions and helps identify old versions that can be cleaned up. """ def __init__(self): # Import CACHE_DIRS at runtime to avoid circular import from . import CACHE_DIRS self.tracker_file = CACHE_DIRS["base"] / "function_versions.json" self.current_versions: Dict[str, Dict] = {} self._load_version_data() def _load_version_data(self): """Load existing version tracking data.""" if self.tracker_file.exists(): try: with open(self.tracker_file, "r") as f: self.current_versions = json.load(f) logger.debug( "Loaded version data for {} functions", len(self.current_versions) ) except Exception as e: logger.warning("Failed to load version tracker: {}", e) self.current_versions = {} def _save_version_data(self): """Save version tracking data.""" try: with open(self.tracker_file, "w") as f: json.dump(self.current_versions, f, indent=2) except Exception as e: logger.warning("Failed to save version tracker: {}", e) def _get_function_hash(self, func) -> Optional[str]: """Get hash of function source code.""" try: source = inspect.getsource(func) return hashlib.md5(source.encode()).hexdigest()[:12] except (OSError, TypeError): return None def _get_file_mtime(self, func) -> Optional[float]: """Get modification time of function's source file.""" try: file_path = inspect.getfile(func) return Path(file_path).stat().st_mtime except (OSError, TypeError): return None def record_current_version(self, func) -> Dict[str, any]: """ Record the current version of a function. Returns: Dict with version info: {hash, mtime, module, timestamp} """ func_name = f"{func.__module__}.{func.__qualname__}" version_info = { "hash": self._get_function_hash(func), "mtime": self._get_file_mtime(func), "module": func.__module__, "last_seen": time.time(), } self.current_versions[func_name] = version_info self._save_version_data() return version_info def get_all_current_versions(self) -> Dict[str, Dict]: """Get all currently tracked function versions.""" return self.current_versions.copy() def scan_cacheable_functions(self) -> Dict[str, Dict]: """ Scan for all @cacheable functions and record their versions. Returns: Dict of function_name -> version_info """ import gc import types scanned = {} try: for obj in gc.get_objects(): try: if ( isinstance(obj, types.FunctionType) and hasattr(obj, "_afml_cacheable") and obj._afml_cacheable ): func_name = f"{obj.__module__}.{obj.__qualname__}" version_info = self.record_current_version(obj) scanned[func_name] = version_info except (ReferenceError, AttributeError): continue except Exception as e: logger.debug("Error during function scan: {}", e) logger.info("Scanned {} cacheable functions", len(scanned)) return scanned # Global version tracker _version_tracker: Optional[FunctionVersionTracker] = None def get_version_tracker() -> FunctionVersionTracker: """Get global version tracker instance.""" global _version_tracker if _version_tracker is None: _version_tracker = FunctionVersionTracker() return _version_tracker # ============================================================================= # Cache Cleanup Utilities # ============================================================================= def find_orphaned_caches( modules: Optional[Union[str, List[str]]] = None, dry_run: bool = True ) -> Dict[str, List[str]]: """ Find cache entries for old function versions. This identifies caches that were created by previous versions of functions (when auto_versioning is enabled). Args: modules: Module name(s) to check (None = all modules) dry_run: If True, only report (don't delete) Returns: Dict with 'orphaned_files', 'current_functions', 'total_size_mb' """ if isinstance(modules, str): modules = [modules] from . import memory cache_dir = Path(memory.location) if not cache_dir.exists(): return {"orphaned_files": [], "current_functions": [], "total_size_mb": 0.0} # Scan current function versions tracker = get_version_tracker() current_versions = tracker.scan_cacheable_functions() # Filter by modules if specified if modules: current_versions = { name: info for name, info in current_versions.items() if any(name.startswith(mod) for mod in modules) } logger.info( "Checking for orphaned caches across {} functions", len(current_versions) ) # Build list of current version hashes current_hashes = { info["hash"] for info in current_versions.values() if info["hash"] } # Scan cache directory for versioned cache files orphaned_files = [] total_size = 0.0 for cache_file in cache_dir.rglob("*"): if not cache_file.is_file(): continue # Look for version markers in cache filenames/paths cache_path_str = str(cache_file) # Check if this is a versioned cache (contains v_) if "_v_" in cache_path_str: # Extract version hash from path for current_hash in current_hashes: if f"v_{current_hash}" in cache_path_str: break # Found current version else: # No current version found - this is orphaned file_size = cache_file.stat().st_size / (1024 * 1024) orphaned_files.append( { "path": str(cache_file), "size_mb": file_size, "mtime": cache_file.stat().st_mtime, } ) total_size += file_size result = { "orphaned_files": orphaned_files, "current_functions": list(current_versions.keys()), "total_size_mb": round(total_size, 2), "orphaned_count": len(orphaned_files), } if orphaned_files: logger.info( "Found {} orphaned cache files ({:.2f} MB)", len(orphaned_files), total_size ) else: logger.info("No orphaned caches found") return result def clean_orphaned_caches( modules: Optional[Union[str, List[str]]] = None, min_age_hours: int = 24, ) -> Dict[str, any]: """ Remove orphaned cache entries from old function versions. Args: modules: Module name(s) to clean (None = all) min_age_hours: Only remove orphaned caches older than this Returns: Dict with cleanup results """ # Find orphaned caches orphaned = find_orphaned_caches(modules=modules, dry_run=True) if not orphaned["orphaned_files"]: logger.info("No orphaned caches to clean") return { "removed_count": 0, "removed_size_mb": 0.0, "kept_count": 0, } # Filter by age cutoff_time = time.time() - (min_age_hours * 3600) to_remove = [f for f in orphaned["orphaned_files"] if f["mtime"] < cutoff_time] # Remove files removed_count = 0 removed_size = 0.0 errors = [] for file_info in to_remove: try: file_path = Path(file_info["path"]) if file_path.exists(): file_path.unlink() removed_count += 1 removed_size += file_info["size_mb"] except Exception as e: errors.append(f"{file_info['path']}: {e}") logger.debug("Failed to remove {}: {}", file_info["path"], e) kept_count = len(orphaned["orphaned_files"]) - removed_count result = { "removed_count": removed_count, "removed_size_mb": round(removed_size, 2), "kept_count": kept_count, # Too recent to remove "errors": errors, } logger.info( "Cleaned {} orphaned caches ({:.2f} MB), kept {} recent", removed_count, removed_size, kept_count, ) return result def cleanup_by_size(max_size_mb: int) -> float: """Remove oldest cache files if total size exceeds limit.""" from . import memory cache_dir = Path(memory.location) if not cache_dir.exists(): return 0.0 # Get all cache files with sizes and modification times cache_files = [] total_size = 0 for file_path in cache_dir.rglob("*"): if file_path.is_file(): stat = file_path.stat() size_mb = stat.st_size / (1024 * 1024) cache_files.append((file_path, size_mb, stat.st_mtime)) total_size += size_mb if total_size <= max_size_mb: logger.info( "Cache size {:.1f} MB is under limit {:.1f} MB", total_size, max_size_mb ) return 0.0 # Sort by modification time (oldest first) cache_files.sort(key=lambda x: x[2]) # Remove oldest files until under size limit size_to_remove = total_size - max_size_mb removed_size = 0.0 for file_path, size_mb, _ in cache_files: if removed_size >= size_to_remove: break try: file_path.unlink() removed_size += size_mb except Exception as e: logger.debug("Failed to remove {}: {}", file_path, e) logger.info( "Removed {:.1f} MB of old caches (limit: {:.1f} MB)", removed_size, max_size_mb ) return removed_size def cleanup_by_age(max_age_days: int) -> int: """Remove cache files older than specified age.""" from . import memory cache_dir = Path(memory.location) if not cache_dir.exists(): return 0 cutoff_time = time.time() - (max_age_days * 24 * 3600) removed_count = 0 for file_path in cache_dir.rglob("*"): if file_path.is_file(): try: if file_path.stat().st_mtime < cutoff_time: file_path.unlink() removed_count += 1 except Exception as e: logger.debug("Failed to remove {}: {}", file_path, e) logger.info("Removed {} caches older than {} days", removed_count, max_age_days) return removed_count # ============================================================================= # Main Cache Maintenance Function # ============================================================================= def cache_maintenance( clean_orphaned: bool = True, max_cache_size_mb: Optional[int] = None, max_age_days: Optional[int] = None, min_orphan_age_hours: int = 24, ) -> Dict[str, Union[int, float, List[str]]]: """ Perform comprehensive cache maintenance. This is now focused on CLEANUP rather than change detection: - Removes orphaned caches from old function versions - Enforces size limits - Removes stale old caches Args: clean_orphaned: Clean orphaned caches from old versions max_cache_size_mb: Clear oldest caches if total size exceeds this max_age_days: Clear caches older than this many days min_orphan_age_hours: Only remove orphaned caches older than this Returns: Maintenance report """ report = { "orphaned_removed": 0, "orphaned_size_mb": 0.0, "size_cleared_mb": 0.0, "old_files_removed": 0, "functions_scanned": 0, } try: # Scan current functions tracker = get_version_tracker() versions = tracker.scan_cacheable_functions() report["functions_scanned"] = len(versions) # Clean orphaned caches if clean_orphaned: try: result = clean_orphaned_caches(min_age_hours=min_orphan_age_hours) report["orphaned_removed"] = result["removed_count"] report["orphaned_size_mb"] = result["removed_size_mb"] except Exception as e: logger.warning("Orphaned cache cleanup failed: {}", e) # Size-based cleanup if max_cache_size_mb: try: size_cleared = cleanup_by_size(max_cache_size_mb) report["size_cleared_mb"] = size_cleared except Exception as e: logger.warning("Size-based cleanup failed: {}", e) # Age-based cleanup if max_age_days: try: files_removed = cleanup_by_age(max_age_days) report["old_files_removed"] = files_removed except Exception as e: logger.warning("Age-based cleanup failed: {}", e) logger.info( "Cache maintenance completed: {}", _format_maintenance_report(report) ) except Exception as e: logger.error("Cache maintenance failed: {}", e) report["error"] = str(e) return report def _format_maintenance_report(report: Dict) -> str: """Format maintenance report for logging.""" parts = [] if report["functions_scanned"]: parts.append(f"{report['functions_scanned']} functions scanned") if report["orphaned_removed"]: parts.append( f"{report['orphaned_removed']} orphaned removed ({report['orphaned_size_mb']:.1f}MB)" ) if report["size_cleared_mb"]: parts.append(f"{report['size_cleared_mb']:.1f}MB size-cleared") if report["old_files_removed"]: parts.append(f"{report['old_files_removed']} old files removed") return ", ".join(parts) if parts else "no cleanup needed" # ============================================================================= # Analysis Functions # ============================================================================= def analyze_cache_versions() -> Dict[str, any]: """ Analyze cache fragmentation by function versions. Returns info about how many versions exist for each function. """ from . import memory cache_dir = Path(memory.location) if not cache_dir.exists(): return {} # Count versions per function version_counts = defaultdict(lambda: {"versions": set(), "total_size_mb": 0.0}) for cache_file in cache_dir.rglob("*"): if not cache_file.is_file(): continue cache_path = str(cache_file) # Extract function name and version from path # Path typically contains: module_name/function_name/hash/... parts = cache_path.split(os.sep) for i, part in enumerate(parts): if "_v_" in part: # Found a versioned cache func_name = parts[i - 1] if i > 0 else "unknown" version_hash = part.split("_v_")[1].split("_")[0] version_counts[func_name]["versions"].add(version_hash) file_size = cache_file.stat().st_size / (1024 * 1024) version_counts[func_name]["total_size_mb"] += file_size # Format results analysis = {} for func_name, data in version_counts.items(): analysis[func_name] = { "version_count": len(data["versions"]), "total_size_mb": round(data["total_size_mb"], 2), "avg_size_per_version_mb": ( round(data["total_size_mb"] / len(data["versions"]), 2) if data["versions"] else 0.0 ), } return analysis def print_version_analysis(): """Print analysis of cache versions.""" analysis = analyze_cache_versions() if not analysis: print("\nNo versioned caches found.") return print("\n" + "=" * 70) print("CACHE VERSION ANALYSIS") print("=" * 70) # Sort by version count sorted_funcs = sorted( analysis.items(), key=lambda x: x[1]["version_count"], reverse=True ) total_versions = sum(info["version_count"] for _, info in sorted_funcs) total_size = sum(info["total_size_mb"] for _, info in sorted_funcs) print(f"\nOverall:") print(f" Functions with versions: {len(sorted_funcs)}") print(f" Total versions: {total_versions}") print(f" Total size: {total_size:.2f} MB") print(f"\nTop fragmented functions:") for i, (func_name, info) in enumerate(sorted_funcs[:10], 1): if info["version_count"] > 1: print(f" {i}. {func_name}") print(f" Versions: {info['version_count']}") print(f" Size: {info['total_size_mb']:.2f} MB") print(f" Avg per version: {info['avg_size_per_version_mb']:.2f} MB") print("=" * 70 + "\n") # ============================================================================= # Convenience functions for common use cases # ============================================================================= def clear_orphaned_ml_caches(): """Clear orphaned caches for ML-related functions.""" ml_modules = [ "afml.ensemble", "afml.clustering", "afml.feature_importance", "afml.cross_validation", "afml.backtester", ] return clean_orphaned_caches(modules=ml_modules) def clear_orphaned_labeling_caches(): """Clear orphaned caches for labeling functions.""" return clean_orphaned_caches(modules=["afml.labeling"]) def clear_orphaned_features_caches(): """Clear orphaned caches for feature functions.""" return clean_orphaned_caches(modules=["afml.features", "afml.strategies"]) __all__ = [ # Core tracker "FunctionVersionTracker", "get_version_tracker", # Main maintenance function "cache_maintenance", # Cleanup functions "find_orphaned_caches", "clean_orphaned_caches", "cleanup_by_size", "cleanup_by_age", # Analysis "analyze_cache_versions", "print_version_analysis", # Convenience functions "clear_orphaned_ml_caches", "clear_orphaned_labeling_caches", "clear_orphaned_features_caches", ]