AFML / afml /cache /selective_cleaner.py
akshayboora's picture
Upload 940 files
669d6a1 verified
"""
Cache cleanup utility for orphaned function versions.
This module is now focused on MAINTENANCE rather than active cache invalidation:
- Identifies orphaned cache entries from old function versions
- Provides size-based and age-based cleanup
- Works as a scheduled/manual maintenance tool
It NO LONGER does active change detection (that's handled by auto_versioning).
"""
import hashlib
import inspect
import json
import os
import time
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Optional, Union
from loguru import logger
class FunctionVersionTracker:
"""
Tracks function versions to identify orphaned caches.
This is now a PASSIVE tracker - it records current versions
and helps identify old versions that can be cleaned up.
"""
def __init__(self):
# Import CACHE_DIRS at runtime to avoid circular import
from . import CACHE_DIRS
self.tracker_file = CACHE_DIRS["base"] / "function_versions.json"
self.current_versions: Dict[str, Dict] = {}
self._load_version_data()
def _load_version_data(self):
"""Load existing version tracking data."""
if self.tracker_file.exists():
try:
with open(self.tracker_file, "r") as f:
self.current_versions = json.load(f)
logger.debug(
"Loaded version data for {} functions", len(self.current_versions)
)
except Exception as e:
logger.warning("Failed to load version tracker: {}", e)
self.current_versions = {}
def _save_version_data(self):
"""Save version tracking data."""
try:
with open(self.tracker_file, "w") as f:
json.dump(self.current_versions, f, indent=2)
except Exception as e:
logger.warning("Failed to save version tracker: {}", e)
def _get_function_hash(self, func) -> Optional[str]:
"""Get hash of function source code."""
try:
source = inspect.getsource(func)
return hashlib.md5(source.encode()).hexdigest()[:12]
except (OSError, TypeError):
return None
def _get_file_mtime(self, func) -> Optional[float]:
"""Get modification time of function's source file."""
try:
file_path = inspect.getfile(func)
return Path(file_path).stat().st_mtime
except (OSError, TypeError):
return None
def record_current_version(self, func) -> Dict[str, any]:
"""
Record the current version of a function.
Returns:
Dict with version info: {hash, mtime, module, timestamp}
"""
func_name = f"{func.__module__}.{func.__qualname__}"
version_info = {
"hash": self._get_function_hash(func),
"mtime": self._get_file_mtime(func),
"module": func.__module__,
"last_seen": time.time(),
}
self.current_versions[func_name] = version_info
self._save_version_data()
return version_info
def get_all_current_versions(self) -> Dict[str, Dict]:
"""Get all currently tracked function versions."""
return self.current_versions.copy()
def scan_cacheable_functions(self) -> Dict[str, Dict]:
"""
Scan for all @cacheable functions and record their versions.
Returns:
Dict of function_name -> version_info
"""
import gc
import types
scanned = {}
try:
for obj in gc.get_objects():
try:
if (
isinstance(obj, types.FunctionType)
and hasattr(obj, "_afml_cacheable")
and obj._afml_cacheable
):
func_name = f"{obj.__module__}.{obj.__qualname__}"
version_info = self.record_current_version(obj)
scanned[func_name] = version_info
except (ReferenceError, AttributeError):
continue
except Exception as e:
logger.debug("Error during function scan: {}", e)
logger.info("Scanned {} cacheable functions", len(scanned))
return scanned
# Global version tracker
_version_tracker: Optional[FunctionVersionTracker] = None
def get_version_tracker() -> FunctionVersionTracker:
"""Get global version tracker instance."""
global _version_tracker
if _version_tracker is None:
_version_tracker = FunctionVersionTracker()
return _version_tracker
# =============================================================================
# Cache Cleanup Utilities
# =============================================================================
def find_orphaned_caches(
modules: Optional[Union[str, List[str]]] = None, dry_run: bool = True
) -> Dict[str, List[str]]:
"""
Find cache entries for old function versions.
This identifies caches that were created by previous versions
of functions (when auto_versioning is enabled).
Args:
modules: Module name(s) to check (None = all modules)
dry_run: If True, only report (don't delete)
Returns:
Dict with 'orphaned_files', 'current_functions', 'total_size_mb'
"""
if isinstance(modules, str):
modules = [modules]
from . import memory
cache_dir = Path(memory.location)
if not cache_dir.exists():
return {"orphaned_files": [], "current_functions": [], "total_size_mb": 0.0}
# Scan current function versions
tracker = get_version_tracker()
current_versions = tracker.scan_cacheable_functions()
# Filter by modules if specified
if modules:
current_versions = {
name: info
for name, info in current_versions.items()
if any(name.startswith(mod) for mod in modules)
}
logger.info(
"Checking for orphaned caches across {} functions", len(current_versions)
)
# Build list of current version hashes
current_hashes = {
info["hash"] for info in current_versions.values() if info["hash"]
}
# Scan cache directory for versioned cache files
orphaned_files = []
total_size = 0.0
for cache_file in cache_dir.rglob("*"):
if not cache_file.is_file():
continue
# Look for version markers in cache filenames/paths
cache_path_str = str(cache_file)
# Check if this is a versioned cache (contains v_<hash>)
if "_v_" in cache_path_str:
# Extract version hash from path
for current_hash in current_hashes:
if f"v_{current_hash}" in cache_path_str:
break # Found current version
else:
# No current version found - this is orphaned
file_size = cache_file.stat().st_size / (1024 * 1024)
orphaned_files.append(
{
"path": str(cache_file),
"size_mb": file_size,
"mtime": cache_file.stat().st_mtime,
}
)
total_size += file_size
result = {
"orphaned_files": orphaned_files,
"current_functions": list(current_versions.keys()),
"total_size_mb": round(total_size, 2),
"orphaned_count": len(orphaned_files),
}
if orphaned_files:
logger.info(
"Found {} orphaned cache files ({:.2f} MB)", len(orphaned_files), total_size
)
else:
logger.info("No orphaned caches found")
return result
def clean_orphaned_caches(
modules: Optional[Union[str, List[str]]] = None,
min_age_hours: int = 24,
) -> Dict[str, any]:
"""
Remove orphaned cache entries from old function versions.
Args:
modules: Module name(s) to clean (None = all)
min_age_hours: Only remove orphaned caches older than this
Returns:
Dict with cleanup results
"""
# Find orphaned caches
orphaned = find_orphaned_caches(modules=modules, dry_run=True)
if not orphaned["orphaned_files"]:
logger.info("No orphaned caches to clean")
return {
"removed_count": 0,
"removed_size_mb": 0.0,
"kept_count": 0,
}
# Filter by age
cutoff_time = time.time() - (min_age_hours * 3600)
to_remove = [f for f in orphaned["orphaned_files"] if f["mtime"] < cutoff_time]
# Remove files
removed_count = 0
removed_size = 0.0
errors = []
for file_info in to_remove:
try:
file_path = Path(file_info["path"])
if file_path.exists():
file_path.unlink()
removed_count += 1
removed_size += file_info["size_mb"]
except Exception as e:
errors.append(f"{file_info['path']}: {e}")
logger.debug("Failed to remove {}: {}", file_info["path"], e)
kept_count = len(orphaned["orphaned_files"]) - removed_count
result = {
"removed_count": removed_count,
"removed_size_mb": round(removed_size, 2),
"kept_count": kept_count, # Too recent to remove
"errors": errors,
}
logger.info(
"Cleaned {} orphaned caches ({:.2f} MB), kept {} recent",
removed_count,
removed_size,
kept_count,
)
return result
def cleanup_by_size(max_size_mb: int) -> float:
"""Remove oldest cache files if total size exceeds limit."""
from . import memory
cache_dir = Path(memory.location)
if not cache_dir.exists():
return 0.0
# Get all cache files with sizes and modification times
cache_files = []
total_size = 0
for file_path in cache_dir.rglob("*"):
if file_path.is_file():
stat = file_path.stat()
size_mb = stat.st_size / (1024 * 1024)
cache_files.append((file_path, size_mb, stat.st_mtime))
total_size += size_mb
if total_size <= max_size_mb:
logger.info(
"Cache size {:.1f} MB is under limit {:.1f} MB", total_size, max_size_mb
)
return 0.0
# Sort by modification time (oldest first)
cache_files.sort(key=lambda x: x[2])
# Remove oldest files until under size limit
size_to_remove = total_size - max_size_mb
removed_size = 0.0
for file_path, size_mb, _ in cache_files:
if removed_size >= size_to_remove:
break
try:
file_path.unlink()
removed_size += size_mb
except Exception as e:
logger.debug("Failed to remove {}: {}", file_path, e)
logger.info(
"Removed {:.1f} MB of old caches (limit: {:.1f} MB)", removed_size, max_size_mb
)
return removed_size
def cleanup_by_age(max_age_days: int) -> int:
"""Remove cache files older than specified age."""
from . import memory
cache_dir = Path(memory.location)
if not cache_dir.exists():
return 0
cutoff_time = time.time() - (max_age_days * 24 * 3600)
removed_count = 0
for file_path in cache_dir.rglob("*"):
if file_path.is_file():
try:
if file_path.stat().st_mtime < cutoff_time:
file_path.unlink()
removed_count += 1
except Exception as e:
logger.debug("Failed to remove {}: {}", file_path, e)
logger.info("Removed {} caches older than {} days", removed_count, max_age_days)
return removed_count
# =============================================================================
# Main Cache Maintenance Function
# =============================================================================
def cache_maintenance(
clean_orphaned: bool = True,
max_cache_size_mb: Optional[int] = None,
max_age_days: Optional[int] = None,
min_orphan_age_hours: int = 24,
) -> Dict[str, Union[int, float, List[str]]]:
"""
Perform comprehensive cache maintenance.
This is now focused on CLEANUP rather than change detection:
- Removes orphaned caches from old function versions
- Enforces size limits
- Removes stale old caches
Args:
clean_orphaned: Clean orphaned caches from old versions
max_cache_size_mb: Clear oldest caches if total size exceeds this
max_age_days: Clear caches older than this many days
min_orphan_age_hours: Only remove orphaned caches older than this
Returns:
Maintenance report
"""
report = {
"orphaned_removed": 0,
"orphaned_size_mb": 0.0,
"size_cleared_mb": 0.0,
"old_files_removed": 0,
"functions_scanned": 0,
}
try:
# Scan current functions
tracker = get_version_tracker()
versions = tracker.scan_cacheable_functions()
report["functions_scanned"] = len(versions)
# Clean orphaned caches
if clean_orphaned:
try:
result = clean_orphaned_caches(min_age_hours=min_orphan_age_hours)
report["orphaned_removed"] = result["removed_count"]
report["orphaned_size_mb"] = result["removed_size_mb"]
except Exception as e:
logger.warning("Orphaned cache cleanup failed: {}", e)
# Size-based cleanup
if max_cache_size_mb:
try:
size_cleared = cleanup_by_size(max_cache_size_mb)
report["size_cleared_mb"] = size_cleared
except Exception as e:
logger.warning("Size-based cleanup failed: {}", e)
# Age-based cleanup
if max_age_days:
try:
files_removed = cleanup_by_age(max_age_days)
report["old_files_removed"] = files_removed
except Exception as e:
logger.warning("Age-based cleanup failed: {}", e)
logger.info(
"Cache maintenance completed: {}", _format_maintenance_report(report)
)
except Exception as e:
logger.error("Cache maintenance failed: {}", e)
report["error"] = str(e)
return report
def _format_maintenance_report(report: Dict) -> str:
"""Format maintenance report for logging."""
parts = []
if report["functions_scanned"]:
parts.append(f"{report['functions_scanned']} functions scanned")
if report["orphaned_removed"]:
parts.append(
f"{report['orphaned_removed']} orphaned removed ({report['orphaned_size_mb']:.1f}MB)"
)
if report["size_cleared_mb"]:
parts.append(f"{report['size_cleared_mb']:.1f}MB size-cleared")
if report["old_files_removed"]:
parts.append(f"{report['old_files_removed']} old files removed")
return ", ".join(parts) if parts else "no cleanup needed"
# =============================================================================
# Analysis Functions
# =============================================================================
def analyze_cache_versions() -> Dict[str, any]:
"""
Analyze cache fragmentation by function versions.
Returns info about how many versions exist for each function.
"""
from . import memory
cache_dir = Path(memory.location)
if not cache_dir.exists():
return {}
# Count versions per function
version_counts = defaultdict(lambda: {"versions": set(), "total_size_mb": 0.0})
for cache_file in cache_dir.rglob("*"):
if not cache_file.is_file():
continue
cache_path = str(cache_file)
# Extract function name and version from path
# Path typically contains: module_name/function_name/hash/...
parts = cache_path.split(os.sep)
for i, part in enumerate(parts):
if "_v_" in part:
# Found a versioned cache
func_name = parts[i - 1] if i > 0 else "unknown"
version_hash = part.split("_v_")[1].split("_")[0]
version_counts[func_name]["versions"].add(version_hash)
file_size = cache_file.stat().st_size / (1024 * 1024)
version_counts[func_name]["total_size_mb"] += file_size
# Format results
analysis = {}
for func_name, data in version_counts.items():
analysis[func_name] = {
"version_count": len(data["versions"]),
"total_size_mb": round(data["total_size_mb"], 2),
"avg_size_per_version_mb": (
round(data["total_size_mb"] / len(data["versions"]), 2)
if data["versions"]
else 0.0
),
}
return analysis
def print_version_analysis():
"""Print analysis of cache versions."""
analysis = analyze_cache_versions()
if not analysis:
print("\nNo versioned caches found.")
return
print("\n" + "=" * 70)
print("CACHE VERSION ANALYSIS")
print("=" * 70)
# Sort by version count
sorted_funcs = sorted(
analysis.items(), key=lambda x: x[1]["version_count"], reverse=True
)
total_versions = sum(info["version_count"] for _, info in sorted_funcs)
total_size = sum(info["total_size_mb"] for _, info in sorted_funcs)
print(f"\nOverall:")
print(f" Functions with versions: {len(sorted_funcs)}")
print(f" Total versions: {total_versions}")
print(f" Total size: {total_size:.2f} MB")
print(f"\nTop fragmented functions:")
for i, (func_name, info) in enumerate(sorted_funcs[:10], 1):
if info["version_count"] > 1:
print(f" {i}. {func_name}")
print(f" Versions: {info['version_count']}")
print(f" Size: {info['total_size_mb']:.2f} MB")
print(f" Avg per version: {info['avg_size_per_version_mb']:.2f} MB")
print("=" * 70 + "\n")
# =============================================================================
# Convenience functions for common use cases
# =============================================================================
def clear_orphaned_ml_caches():
"""Clear orphaned caches for ML-related functions."""
ml_modules = [
"afml.ensemble",
"afml.clustering",
"afml.feature_importance",
"afml.cross_validation",
"afml.backtester",
]
return clean_orphaned_caches(modules=ml_modules)
def clear_orphaned_labeling_caches():
"""Clear orphaned caches for labeling functions."""
return clean_orphaned_caches(modules=["afml.labeling"])
def clear_orphaned_features_caches():
"""Clear orphaned caches for feature functions."""
return clean_orphaned_caches(modules=["afml.features", "afml.strategies"])
__all__ = [
# Core tracker
"FunctionVersionTracker",
"get_version_tracker",
# Main maintenance function
"cache_maintenance",
# Cleanup functions
"find_orphaned_caches",
"clean_orphaned_caches",
"cleanup_by_size",
"cleanup_by_age",
# Analysis
"analyze_cache_versions",
"print_version_analysis",
# Convenience functions
"clear_orphaned_ml_caches",
"clear_orphaned_labeling_caches",
"clear_orphaned_features_caches",
]