Spaces:
Sleeping
Sleeping
Claude Code
Claude Code: Review the memory system implementation, specifically focusing on git op
0bbe516 | """ | |
| Log Management Module for Cain Memory System. | |
| Provides log rotation, compression, and cleanup functionality to prevent | |
| unbounded log growth and maintain system health. | |
| Features: | |
| - Retains logs for 7 days | |
| - Compresses logs older than 3 days using gzip | |
| - Cleans up .bak files, keeping only the 3 most recent | |
| - Provides health statistics about log storage | |
| """ | |
| import os | |
| import gzip | |
| import shutil | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Tuple | |
| from pathlib import Path | |
| logger = logging.getLogger(__name__) | |
| class LogRetentionConfig: | |
| """Configuration for log retention policy.""" | |
| # Retention periods | |
| MAX_LOG_AGE_DAYS = 7 # Delete logs older than 7 days | |
| COMPRESS_AGE_DAYS = 3 # Compress logs older than 3 days | |
| MAX_BAK_FILES = 3 # Keep only 3 most recent .bak files | |
| # Log directories to manage | |
| LOG_DIRECTORIES = [ | |
| "/home/node/logs", | |
| "/tmp/claude-workspace/.openclaw/logs", | |
| "/data/logs", | |
| "/app/logs", | |
| ] | |
| # File patterns to consider as logs | |
| LOG_PATTERNS = ["*.log", "*.txt", "*.json"] | |
| def add_log_directory(cls, path: str): | |
| """Add a custom log directory to manage.""" | |
| if path not in cls.LOG_DIRECTORIES: | |
| cls.LOG_DIRECTORIES.append(path) | |
| class LogManager: | |
| """Manages log rotation, compression, and cleanup.""" | |
| def __init__(self, config: LogRetentionConfig = None): | |
| """ | |
| Initialize the log manager. | |
| Args: | |
| config: Log retention configuration. Uses defaults if None. | |
| """ | |
| self.config = config or LogRetentionConfig() | |
| self.stats = { | |
| "processed_dirs": 0, | |
| "logs_compressed": 0, | |
| "logs_deleted": 0, | |
| "bak_files_deleted": 0, | |
| "space_freed_bytes": 0, | |
| "errors": [] | |
| } | |
| def rotate_all_logs(self) -> Dict[str, any]: | |
| """ | |
| Run log rotation on all configured directories. | |
| Returns: | |
| Dictionary with rotation statistics. | |
| """ | |
| self.stats = { | |
| "processed_dirs": 0, | |
| "logs_compressed": 0, | |
| "logs_deleted": 0, | |
| "bak_files_deleted": 0, | |
| "space_freed_bytes": 0, | |
| "errors": [] | |
| } | |
| logger.info("Starting log rotation process...") | |
| for log_dir in self.config.LOG_DIRECTORIES: | |
| if os.path.isdir(log_dir): | |
| try: | |
| self._rotate_directory(log_dir) | |
| self.stats["processed_dirs"] += 1 | |
| except Exception as e: | |
| error_msg = f"Failed to rotate {log_dir}: {e}" | |
| logger.error(error_msg) | |
| self.stats["errors"].append(error_msg) | |
| else: | |
| logger.debug(f"Log directory not found: {log_dir}") | |
| # Log summary | |
| logger.info( | |
| f"Log rotation complete. " | |
| f"Dirs: {self.stats['processed_dirs']}, " | |
| f"Compressed: {self.stats['logs_compressed']}, " | |
| f"Deleted: {self.stats['logs_deleted']}, " | |
| f"BAK cleaned: {self.stats['bak_files_deleted']}, " | |
| f"Space freed: {self._format_bytes(self.stats['space_freed_bytes'])}" | |
| ) | |
| return self.stats | |
| def _rotate_directory(self, log_dir: str): | |
| """Rotate logs in a single directory.""" | |
| logger.debug(f"Processing log directory: {log_dir}") | |
| # Get all log files | |
| log_files = self._get_log_files(log_dir) | |
| for log_file in log_files: | |
| try: | |
| self._process_log_file(log_file) | |
| except Exception as e: | |
| error_msg = f"Failed to process {log_file}: {e}" | |
| logger.warning(error_msg) | |
| self.stats["errors"].append(error_msg) | |
| # Clean up .bak files | |
| self._cleanup_bak_files(log_dir) | |
| def _get_log_files(self, log_dir: str) -> List[Path]: | |
| """Get all log files in the directory.""" | |
| log_files = [] | |
| log_path = Path(log_dir) | |
| for pattern in self.config.LOG_PATTERNS: | |
| log_files.extend(log_path.glob(pattern)) | |
| # Also check for already compressed files | |
| log_files.extend(log_path.glob(f"{pattern}.gz")) | |
| # Filter out directories and sort by modification time | |
| return sorted( | |
| [f for f in log_files if f.is_file()], | |
| key=lambda f: f.stat().st_mtime | |
| ) | |
| def _process_log_file(self, log_file: Path): | |
| """Process a single log file (compress or delete).""" | |
| mtime = datetime.fromtimestamp(log_file.stat().st_mtime) | |
| age_days = (datetime.now() - mtime).days | |
| size_before = log_file.stat().st_size | |
| # Skip if already compressed | |
| if log_file.suffix == ".gz": | |
| if age_days > self.config.MAX_LOG_AGE_DAYS: | |
| logger.debug(f"Deleting old compressed log: {log_file} ({age_days} days old)") | |
| log_file.unlink() | |
| self.stats["logs_deleted"] += 1 | |
| self.stats["space_freed_bytes"] += size_before | |
| return | |
| # Delete old logs | |
| if age_days > self.config.MAX_LOG_AGE_DAYS: | |
| logger.debug(f"Deleting old log: {log_file} ({age_days} days old)") | |
| log_file.unlink() | |
| self.stats["logs_deleted"] += 1 | |
| self.stats["space_freed_bytes"] += size_before | |
| return | |
| # Compress logs older than threshold | |
| if age_days > self.config.COMPRESS_AGE_DAYS: | |
| self._compress_log(log_file) | |
| self.stats["logs_compressed"] += 1 | |
| # Calculate space saved | |
| if log_file.with_suffix(log_file.suffix + ".gz").exists(): | |
| size_after = log_file.with_suffix(log_file.suffix + ".gz").stat().st_size | |
| self.stats["space_freed_bytes"] += (size_before - size_after) | |
| def _compress_log(self, log_file: Path): | |
| """Compress a log file using gzip.""" | |
| compressed_path = log_file.with_suffix(log_file.suffix + ".gz") | |
| if compressed_path.exists(): | |
| logger.debug(f"Compressed file already exists: {compressed_path}") | |
| return | |
| logger.debug(f"Compressing: {log_file}") | |
| with open(log_file, 'rb') as f_in: | |
| with gzip.open(compressed_path, 'wb') as f_out: | |
| shutil.copyfileobj(f_in, f_out) | |
| # Remove original after successful compression | |
| log_file.unlink() | |
| logger.debug(f"Compressed: {log_file} -> {compressed_path}") | |
| def _cleanup_bak_files(self, log_dir: str): | |
| """Clean up .bak files, keeping only the most recent ones.""" | |
| log_path = Path(log_dir) | |
| bak_files = sorted( | |
| log_path.glob("*.bak"), | |
| key=lambda f: f.stat().st_mtime, | |
| reverse=True # Newest first | |
| ) | |
| # Keep the N most recent, delete the rest | |
| to_delete = bak_files[self.config.MAX_BAK_FILES:] | |
| for bak_file in to_delete: | |
| size = bak_file.stat().st_size | |
| logger.debug(f"Deleting old .bak file: {bak_file}") | |
| bak_file.unlink() | |
| self.stats["bak_files_deleted"] += 1 | |
| self.stats["space_freed_bytes"] += size | |
| def get_log_health_status(self) -> Dict[str, any]: | |
| """ | |
| Get health status of log directories. | |
| Returns: | |
| Dictionary with log health information. | |
| """ | |
| status = { | |
| "directories": {}, | |
| "total_size_bytes": 0, | |
| "total_files": 0, | |
| "needs_rotation": False | |
| } | |
| for log_dir in self.config.LOG_DIRECTORIES: | |
| if not os.path.isdir(log_dir): | |
| continue | |
| dir_info = self._get_directory_info(log_dir) | |
| status["directories"][log_dir] = dir_info | |
| status["total_size_bytes"] += dir_info["size_bytes"] | |
| status["total_files"] += dir_info["file_count"] | |
| if dir_info.get("has_old_logs", False): | |
| status["needs_rotation"] = True | |
| status["total_size_human"] = self._format_bytes(status["total_size_bytes"]) | |
| return status | |
| def _get_directory_info(self, log_dir: str) -> Dict[str, any]: | |
| """Get information about a log directory.""" | |
| log_path = Path(log_dir) | |
| files = list(log_path.glob("*")) | |
| total_size = sum(f.stat().st_size for f in files if f.is_file()) | |
| # Check for old logs | |
| now = datetime.now() | |
| has_old_logs = False | |
| oldest_age_days = 0 | |
| for f in files: | |
| if f.is_file(): | |
| mtime = datetime.fromtimestamp(f.stat().st_mtime) | |
| age_days = (now - mtime).days | |
| if age_days > self.config.COMPRESS_AGE_DAYS: | |
| has_old_logs = True | |
| oldest_age_days = max(oldest_age_days, age_days) | |
| return { | |
| "size_bytes": total_size, | |
| "size_human": self._format_bytes(total_size), | |
| "file_count": len(files), | |
| "has_old_logs": has_old_logs, | |
| "oldest_age_days": oldest_age_days | |
| } | |
| def _format_bytes(size_bytes: int) -> str: | |
| """Format byte count to human-readable string.""" | |
| for unit in ['B', 'KB', 'MB', 'GB']: | |
| if size_bytes < 1024.0: | |
| return f"{size_bytes:.1f}{unit}" | |
| size_bytes /= 1024.0 | |
| return f"{size_bytes:.1f}TB" | |
| def rotate_logs() -> Dict[str, any]: | |
| """ | |
| Convenience function to rotate logs with default configuration. | |
| Returns: | |
| Dictionary with rotation statistics. | |
| """ | |
| manager = LogManager() | |
| return manager.rotate_all_logs() | |
| def get_log_health() -> Dict[str, any]: | |
| """ | |
| Convenience function to get log health status. | |
| Returns: | |
| Dictionary with log health information. | |
| """ | |
| manager = LogManager() | |
| return manager.get_log_health_status() | |
| if __name__ == "__main__": | |
| # Run log rotation when executed directly | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| stats = rotate_logs() | |
| print(f"\n=== Log Rotation Summary ===") | |
| print(f"Directories processed: {stats['processed_dirs']}") | |
| print(f"Logs compressed: {stats['logs_compressed']}") | |
| print(f"Logs deleted: {stats['logs_deleted']}") | |
| print(f"BAK files deleted: {stats['bak_files_deleted']}") | |
| print(f"Space freed: {LogManager._format_bytes(stats['space_freed_bytes'])}") | |
| if stats['errors']: | |
| print(f"\nErrors encountered: {len(stats['errors'])}") | |
| for error in stats['errors']: | |
| print(f" - {error}") | |