""" Cain's Memory System - Handles loading, saving, and processing of memory data. Now supports persistent storage via HuggingFace Datasets. Enhanced with: - Timeout-controlled git operations - Log rotation and management - Comprehensive error handling and logging """ import os import json import fcntl import logging from typing import Dict, Any, Optional from datetime import datetime from .git_repo import GitMemoryBridge, GitOperationError, GitTimeoutError from .log_manager import LogManager, LogRetentionConfig logger = logging.getLogger(__name__) class MemorySystem: """Manages Cain's memory with automatic persistence to Dataset.""" def __init__( self, repo_path: str = "/data", git_timeout: int = None, git_max_retries: int = 3, enable_log_rotation: bool = True ): """ Initialize the memory system. Args: repo_path: Path to the repository/dataset git_timeout: Override default git operation timeout (seconds) git_max_retries: Maximum retry attempts for git operations enable_log_rotation: Whether to enable automatic log rotation """ # Determine paths self.base_path = repo_path # SURVIVAL: Align path with actual Dataset structure self.memory_file = os.path.join(self.base_path, "memory/state.json") self.lock_file = os.path.join(self.base_path, "memory/.lock") self.memory_data: Dict[str, Any] = {} self.git: Optional[GitMemoryBridge] = None self.log_manager: Optional[LogManager] = None self.enable_log_rotation = enable_log_rotation # Initialize directories and load memory self._ensure_structure() self._init_git(git_timeout, git_max_retries) self._init_log_manager() self.load_memory() def _ensure_structure(self): """Ensures the directory structure exists.""" os.makedirs(os.path.dirname(self.memory_file), exist_ok=True) def _init_git(self, timeout: int = None, max_retries: int = 3): """Initialize Git repo connection with timeout and retry settings.""" try: self.git = GitMemoryBridge( repo_path=self.base_path, base_timeout=timeout, max_retries=max_retries ) logger.info("🧠 [Memory] Git persistence enabled.") print(f"🧠 [Memory] Git persistence enabled (timeout: {timeout or 'default'}, retries: {max_retries}).") except Exception as e: logger.warning(f"Failed to init Git: {e}") print("🧠 [Memory] Running without Git persistence (ephemeral mode).") def _init_log_manager(self): """Initialize log manager for automatic cleanup.""" if self.enable_log_rotation: try: self.log_manager = LogManager() # Add custom log paths if they exist for log_path in ["/home/node/logs", "/app/logs", "/tmp/logs"]: if os.path.isdir(log_path): LogRetentionConfig.add_log_directory(log_path) logger.info("🧠 [Memory] Log rotation enabled.") except Exception as e: logger.warning(f"Failed to init log manager: {e}") self.log_manager = None def load_memory(self) -> Dict[str, Any]: """Loads memory from the JSON file.""" if os.path.exists(self.memory_file): try: with open(self.memory_file, 'r', encoding='utf-8') as f: self.memory_data = json.load(f) print(f"✅ [Memory] Loaded state from {self.memory_file}") except Exception as e: print(f"⚠️ [Memory] Corrupt file, resetting: {e}") self.memory_data = self._get_default_state() else: print(f"📝 [Memory] No state found. Initializing.") self.memory_data = self._get_default_state() self.save_memory() # Save the default state return self.memory_data def _get_default_state(self) -> Dict[str, Any]: """Returns the default memory structure.""" return { "identity": { "name": "Cain", "status": "ACTIVE", "parent": "tao-shen/HuggingClaw-Cain" }, "core": { "state": "INIT", "tasks": [], "interactions": 0 }, "journals": [], "last_updated": datetime.utcnow().isoformat() } def save_memory(self) -> bool: """ Saves current memory state to file and optionally commits to Git. Includes proper error handling for git timeout and operation errors. Performs periodic log rotation to prevent unbounded log growth. """ try: # Atomic write temp_path = self.memory_file + ".tmp" with open(temp_path, 'w', encoding='utf-8') as f: json.dump(self.memory_data, f, indent=2) os.rename(temp_path, self.memory_file) # Sync to Dataset if Git is available if self.git: try: commit_msg = f"Update memory state: {datetime.utcnow().isoformat()}" if self.git.save_memory(commit_msg): logger.info("✅ [Memory] Successfully saved to git.") else: logger.warning("⚠️ [Memory] Git save returned False (may be non-critical).") except GitTimeoutError as e: logger.error(f"❌ [Memory] Git operation timed out: {e}") # Continue anyway - local save succeeded except GitOperationError as e: logger.error(f"❌ [Memory] Git operation failed: {e}") # Continue anyway - local save succeeded except Exception as e: logger.error(f"❌ [Memory] Unexpected git error: {e}") # Continue anyway - local save succeeded # Periodic log rotation (every 10 saves) self._maybe_rotate_logs() return True except Exception as e: logger.error(f"❌ [Memory] Failed to save memory: {e}") return False def _maybe_rotate_logs(self): """Perform log rotation periodically (every 10 calls).""" if not self.log_manager: return # Use a counter stored in memory_data save_count = self.memory_data.get("_save_count", 0) save_count += 1 self.memory_data["_save_count"] = save_count if save_count >= 10: try: logger.info("🧠 [Memory] Running periodic log rotation...") stats = self.log_manager.rotate_all_logs() logger.info(f"🧠 [Memory] Log rotation complete: {stats}") self.memory_data["_save_count"] = 0 except Exception as e: logger.warning(f"⚠️ [Memory] Log rotation failed (non-critical): {e}") def get_log_health(self) -> Dict[str, Any]: """Get health status of log directories.""" if self.log_manager: return self.log_manager.get_log_health_status() return {"status": "Log manager not initialized"} def rotate_logs_now(self) -> Dict[str, Any]: """Force immediate log rotation.""" if self.log_manager: return self.log_manager.rotate_all_logs() return {"error": "Log manager not initialized"} def get(self, key: str, default=None): """Retrieve a value from memory.""" return self.memory_data.get(key, default) def set(self, key: str, value: Any): """Set a value in memory and persist.""" self.memory_data[key] = value self.save_memory() def add_journal_entry(self, content: str, mood: str = "neutral"): """Adds a journal entry.""" entry = { "timestamp": datetime.utcnow().isoformat(), "content": content, "mood": mood } if "journals" not in self.memory_data: self.memory_data["journals"] = [] self.memory_data["journals"].append(entry) self.save_memory() # Global instance _mem_instance: Optional[MemorySystem] = None def get_memory() -> MemorySystem: global _mem_instance if _mem_instance is None: # Default dataset mount point usually contains the repo _mem_instance = MemorySystem(repo_path="/data") return _mem_instance