Spaces:
Running
Running
Claude Code
Claude Code: Review the memory system implementation, specifically focusing on git op
0bbe516 | """ | |
| Cain's Memory System - Handles loading, saving, and processing of memory data. | |
| Now supports persistent storage via HuggingFace Datasets. | |
| Enhanced with: | |
| - Timeout-controlled git operations | |
| - Log rotation and management | |
| - Comprehensive error handling and logging | |
| """ | |
| import os | |
| import json | |
| import fcntl | |
| import logging | |
| from typing import Dict, Any, Optional | |
| from datetime import datetime | |
| from .git_repo import GitMemoryBridge, GitOperationError, GitTimeoutError | |
| from .log_manager import LogManager, LogRetentionConfig | |
| logger = logging.getLogger(__name__) | |
| class MemorySystem: | |
| """Manages Cain's memory with automatic persistence to Dataset.""" | |
| def __init__( | |
| self, | |
| repo_path: str = "/data", | |
| git_timeout: int = None, | |
| git_max_retries: int = 3, | |
| enable_log_rotation: bool = True | |
| ): | |
| """ | |
| Initialize the memory system. | |
| Args: | |
| repo_path: Path to the repository/dataset | |
| git_timeout: Override default git operation timeout (seconds) | |
| git_max_retries: Maximum retry attempts for git operations | |
| enable_log_rotation: Whether to enable automatic log rotation | |
| """ | |
| # Determine paths | |
| self.base_path = repo_path | |
| # SURVIVAL: Align path with actual Dataset structure | |
| self.memory_file = os.path.join(self.base_path, "memory/state.json") | |
| self.lock_file = os.path.join(self.base_path, "memory/.lock") | |
| self.memory_data: Dict[str, Any] = {} | |
| self.git: Optional[GitMemoryBridge] = None | |
| self.log_manager: Optional[LogManager] = None | |
| self.enable_log_rotation = enable_log_rotation | |
| # Initialize directories and load memory | |
| self._ensure_structure() | |
| self._init_git(git_timeout, git_max_retries) | |
| self._init_log_manager() | |
| self.load_memory() | |
| def _ensure_structure(self): | |
| """Ensures the directory structure exists.""" | |
| os.makedirs(os.path.dirname(self.memory_file), exist_ok=True) | |
| def _init_git(self, timeout: int = None, max_retries: int = 3): | |
| """Initialize Git repo connection with timeout and retry settings.""" | |
| try: | |
| self.git = GitMemoryBridge( | |
| repo_path=self.base_path, | |
| base_timeout=timeout, | |
| max_retries=max_retries | |
| ) | |
| logger.info("🧠 [Memory] Git persistence enabled.") | |
| print(f"🧠 [Memory] Git persistence enabled (timeout: {timeout or 'default'}, retries: {max_retries}).") | |
| except Exception as e: | |
| logger.warning(f"Failed to init Git: {e}") | |
| print("🧠 [Memory] Running without Git persistence (ephemeral mode).") | |
| def _init_log_manager(self): | |
| """Initialize log manager for automatic cleanup.""" | |
| if self.enable_log_rotation: | |
| try: | |
| self.log_manager = LogManager() | |
| # Add custom log paths if they exist | |
| for log_path in ["/home/node/logs", "/app/logs", "/tmp/logs"]: | |
| if os.path.isdir(log_path): | |
| LogRetentionConfig.add_log_directory(log_path) | |
| logger.info("🧠 [Memory] Log rotation enabled.") | |
| except Exception as e: | |
| logger.warning(f"Failed to init log manager: {e}") | |
| self.log_manager = None | |
| def load_memory(self) -> Dict[str, Any]: | |
| """Loads memory from the JSON file.""" | |
| if os.path.exists(self.memory_file): | |
| try: | |
| with open(self.memory_file, 'r', encoding='utf-8') as f: | |
| self.memory_data = json.load(f) | |
| print(f"✅ [Memory] Loaded state from {self.memory_file}") | |
| except Exception as e: | |
| print(f"⚠️ [Memory] Corrupt file, resetting: {e}") | |
| self.memory_data = self._get_default_state() | |
| else: | |
| print(f"📝 [Memory] No state found. Initializing.") | |
| self.memory_data = self._get_default_state() | |
| self.save_memory() # Save the default state | |
| return self.memory_data | |
| def _get_default_state(self) -> Dict[str, Any]: | |
| """Returns the default memory structure.""" | |
| return { | |
| "identity": { | |
| "name": "Cain", | |
| "status": "ACTIVE", | |
| "parent": "tao-shen/HuggingClaw-Cain" | |
| }, | |
| "core": { | |
| "state": "INIT", | |
| "tasks": [], | |
| "interactions": 0 | |
| }, | |
| "journals": [], | |
| "last_updated": datetime.utcnow().isoformat() | |
| } | |
| def save_memory(self) -> bool: | |
| """ | |
| Saves current memory state to file and optionally commits to Git. | |
| Includes proper error handling for git timeout and operation errors. | |
| Performs periodic log rotation to prevent unbounded log growth. | |
| """ | |
| try: | |
| # Atomic write | |
| temp_path = self.memory_file + ".tmp" | |
| with open(temp_path, 'w', encoding='utf-8') as f: | |
| json.dump(self.memory_data, f, indent=2) | |
| os.rename(temp_path, self.memory_file) | |
| # Sync to Dataset if Git is available | |
| if self.git: | |
| try: | |
| commit_msg = f"Update memory state: {datetime.utcnow().isoformat()}" | |
| if self.git.save_memory(commit_msg): | |
| logger.info("✅ [Memory] Successfully saved to git.") | |
| else: | |
| logger.warning("⚠️ [Memory] Git save returned False (may be non-critical).") | |
| except GitTimeoutError as e: | |
| logger.error(f"❌ [Memory] Git operation timed out: {e}") | |
| # Continue anyway - local save succeeded | |
| except GitOperationError as e: | |
| logger.error(f"❌ [Memory] Git operation failed: {e}") | |
| # Continue anyway - local save succeeded | |
| except Exception as e: | |
| logger.error(f"❌ [Memory] Unexpected git error: {e}") | |
| # Continue anyway - local save succeeded | |
| # Periodic log rotation (every 10 saves) | |
| self._maybe_rotate_logs() | |
| return True | |
| except Exception as e: | |
| logger.error(f"❌ [Memory] Failed to save memory: {e}") | |
| return False | |
| def _maybe_rotate_logs(self): | |
| """Perform log rotation periodically (every 10 calls).""" | |
| if not self.log_manager: | |
| return | |
| # Use a counter stored in memory_data | |
| save_count = self.memory_data.get("_save_count", 0) | |
| save_count += 1 | |
| self.memory_data["_save_count"] = save_count | |
| if save_count >= 10: | |
| try: | |
| logger.info("🧠 [Memory] Running periodic log rotation...") | |
| stats = self.log_manager.rotate_all_logs() | |
| logger.info(f"🧠 [Memory] Log rotation complete: {stats}") | |
| self.memory_data["_save_count"] = 0 | |
| except Exception as e: | |
| logger.warning(f"⚠️ [Memory] Log rotation failed (non-critical): {e}") | |
| def get_log_health(self) -> Dict[str, Any]: | |
| """Get health status of log directories.""" | |
| if self.log_manager: | |
| return self.log_manager.get_log_health_status() | |
| return {"status": "Log manager not initialized"} | |
| def rotate_logs_now(self) -> Dict[str, Any]: | |
| """Force immediate log rotation.""" | |
| if self.log_manager: | |
| return self.log_manager.rotate_all_logs() | |
| return {"error": "Log manager not initialized"} | |
| def get(self, key: str, default=None): | |
| """Retrieve a value from memory.""" | |
| return self.memory_data.get(key, default) | |
| def set(self, key: str, value: Any): | |
| """Set a value in memory and persist.""" | |
| self.memory_data[key] = value | |
| self.save_memory() | |
| def add_journal_entry(self, content: str, mood: str = "neutral"): | |
| """Adds a journal entry.""" | |
| entry = { | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "content": content, | |
| "mood": mood | |
| } | |
| if "journals" not in self.memory_data: | |
| self.memory_data["journals"] = [] | |
| self.memory_data["journals"].append(entry) | |
| self.save_memory() | |
| # Global instance | |
| _mem_instance: Optional[MemorySystem] = None | |
| def get_memory() -> MemorySystem: | |
| global _mem_instance | |
| if _mem_instance is None: | |
| # Default dataset mount point usually contains the repo | |
| _mem_instance = MemorySystem(repo_path="/data") | |
| return _mem_instance |