HuggingClaw-Cain / memory /memory_system.py
Claude Code
Claude Code: Review the memory system implementation, specifically focusing on git op
0bbe516
"""
Cain's Memory System - Handles loading, saving, and processing of memory data.
Now supports persistent storage via HuggingFace Datasets.
Enhanced with:
- Timeout-controlled git operations
- Log rotation and management
- Comprehensive error handling and logging
"""
import os
import json
import fcntl
import logging
from typing import Dict, Any, Optional
from datetime import datetime
from .git_repo import GitMemoryBridge, GitOperationError, GitTimeoutError
from .log_manager import LogManager, LogRetentionConfig
logger = logging.getLogger(__name__)
class MemorySystem:
"""Manages Cain's memory with automatic persistence to Dataset."""
def __init__(
self,
repo_path: str = "/data",
git_timeout: int = None,
git_max_retries: int = 3,
enable_log_rotation: bool = True
):
"""
Initialize the memory system.
Args:
repo_path: Path to the repository/dataset
git_timeout: Override default git operation timeout (seconds)
git_max_retries: Maximum retry attempts for git operations
enable_log_rotation: Whether to enable automatic log rotation
"""
# Determine paths
self.base_path = repo_path
# SURVIVAL: Align path with actual Dataset structure
self.memory_file = os.path.join(self.base_path, "memory/state.json")
self.lock_file = os.path.join(self.base_path, "memory/.lock")
self.memory_data: Dict[str, Any] = {}
self.git: Optional[GitMemoryBridge] = None
self.log_manager: Optional[LogManager] = None
self.enable_log_rotation = enable_log_rotation
# Initialize directories and load memory
self._ensure_structure()
self._init_git(git_timeout, git_max_retries)
self._init_log_manager()
self.load_memory()
def _ensure_structure(self):
"""Ensures the directory structure exists."""
os.makedirs(os.path.dirname(self.memory_file), exist_ok=True)
def _init_git(self, timeout: int = None, max_retries: int = 3):
"""Initialize Git repo connection with timeout and retry settings."""
try:
self.git = GitMemoryBridge(
repo_path=self.base_path,
base_timeout=timeout,
max_retries=max_retries
)
logger.info("🧠 [Memory] Git persistence enabled.")
print(f"🧠 [Memory] Git persistence enabled (timeout: {timeout or 'default'}, retries: {max_retries}).")
except Exception as e:
logger.warning(f"Failed to init Git: {e}")
print("🧠 [Memory] Running without Git persistence (ephemeral mode).")
def _init_log_manager(self):
"""Initialize log manager for automatic cleanup."""
if self.enable_log_rotation:
try:
self.log_manager = LogManager()
# Add custom log paths if they exist
for log_path in ["/home/node/logs", "/app/logs", "/tmp/logs"]:
if os.path.isdir(log_path):
LogRetentionConfig.add_log_directory(log_path)
logger.info("🧠 [Memory] Log rotation enabled.")
except Exception as e:
logger.warning(f"Failed to init log manager: {e}")
self.log_manager = None
def load_memory(self) -> Dict[str, Any]:
"""Loads memory from the JSON file."""
if os.path.exists(self.memory_file):
try:
with open(self.memory_file, 'r', encoding='utf-8') as f:
self.memory_data = json.load(f)
print(f"✅ [Memory] Loaded state from {self.memory_file}")
except Exception as e:
print(f"⚠️ [Memory] Corrupt file, resetting: {e}")
self.memory_data = self._get_default_state()
else:
print(f"📝 [Memory] No state found. Initializing.")
self.memory_data = self._get_default_state()
self.save_memory() # Save the default state
return self.memory_data
def _get_default_state(self) -> Dict[str, Any]:
"""Returns the default memory structure."""
return {
"identity": {
"name": "Cain",
"status": "ACTIVE",
"parent": "tao-shen/HuggingClaw-Cain"
},
"core": {
"state": "INIT",
"tasks": [],
"interactions": 0
},
"journals": [],
"last_updated": datetime.utcnow().isoformat()
}
def save_memory(self) -> bool:
"""
Saves current memory state to file and optionally commits to Git.
Includes proper error handling for git timeout and operation errors.
Performs periodic log rotation to prevent unbounded log growth.
"""
try:
# Atomic write
temp_path = self.memory_file + ".tmp"
with open(temp_path, 'w', encoding='utf-8') as f:
json.dump(self.memory_data, f, indent=2)
os.rename(temp_path, self.memory_file)
# Sync to Dataset if Git is available
if self.git:
try:
commit_msg = f"Update memory state: {datetime.utcnow().isoformat()}"
if self.git.save_memory(commit_msg):
logger.info("✅ [Memory] Successfully saved to git.")
else:
logger.warning("⚠️ [Memory] Git save returned False (may be non-critical).")
except GitTimeoutError as e:
logger.error(f"❌ [Memory] Git operation timed out: {e}")
# Continue anyway - local save succeeded
except GitOperationError as e:
logger.error(f"❌ [Memory] Git operation failed: {e}")
# Continue anyway - local save succeeded
except Exception as e:
logger.error(f"❌ [Memory] Unexpected git error: {e}")
# Continue anyway - local save succeeded
# Periodic log rotation (every 10 saves)
self._maybe_rotate_logs()
return True
except Exception as e:
logger.error(f"❌ [Memory] Failed to save memory: {e}")
return False
def _maybe_rotate_logs(self):
"""Perform log rotation periodically (every 10 calls)."""
if not self.log_manager:
return
# Use a counter stored in memory_data
save_count = self.memory_data.get("_save_count", 0)
save_count += 1
self.memory_data["_save_count"] = save_count
if save_count >= 10:
try:
logger.info("🧠 [Memory] Running periodic log rotation...")
stats = self.log_manager.rotate_all_logs()
logger.info(f"🧠 [Memory] Log rotation complete: {stats}")
self.memory_data["_save_count"] = 0
except Exception as e:
logger.warning(f"⚠️ [Memory] Log rotation failed (non-critical): {e}")
def get_log_health(self) -> Dict[str, Any]:
"""Get health status of log directories."""
if self.log_manager:
return self.log_manager.get_log_health_status()
return {"status": "Log manager not initialized"}
def rotate_logs_now(self) -> Dict[str, Any]:
"""Force immediate log rotation."""
if self.log_manager:
return self.log_manager.rotate_all_logs()
return {"error": "Log manager not initialized"}
def get(self, key: str, default=None):
"""Retrieve a value from memory."""
return self.memory_data.get(key, default)
def set(self, key: str, value: Any):
"""Set a value in memory and persist."""
self.memory_data[key] = value
self.save_memory()
def add_journal_entry(self, content: str, mood: str = "neutral"):
"""Adds a journal entry."""
entry = {
"timestamp": datetime.utcnow().isoformat(),
"content": content,
"mood": mood
}
if "journals" not in self.memory_data:
self.memory_data["journals"] = []
self.memory_data["journals"].append(entry)
self.save_memory()
# Global instance
_mem_instance: Optional[MemorySystem] = None
def get_memory() -> MemorySystem:
global _mem_instance
if _mem_instance is None:
# Default dataset mount point usually contains the repo
_mem_instance = MemorySystem(repo_path="/data")
return _mem_instance