Spaces:
Sleeping
Sleeping
File size: 8,686 Bytes
1fd073f 0bbe516 1fd073f 11ecc5d 1fd073f 0bbe516 1fd073f 11ecc5d 0bbe516 11ecc5d 1fd073f 34d15f3 0bbe516 1fd073f 34d15f3 1fd073f 34d15f3 0bbe516 34d15f3 1fd073f 0bbe516 1fd073f 11ecc5d 1fd073f 11ecc5d 0bbe516 34d15f3 0bbe516 34d15f3 0bbe516 1fd073f 11ecc5d 0bbe516 1fd073f 11ecc5d 1fd073f 11ecc5d 1fd073f 11ecc5d 1fd073f 0bbe516 1fd073f 0bbe516 1fd073f 0bbe516 1fd073f 0bbe516 1fd073f 0bbe516 1fd073f 11ecc5d 0bbe516 1fd073f 11ecc5d 1fd073f 11ecc5d 1fd073f 11ecc5d 1fd073f 11ecc5d 1fd073f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 | """
Cain's Memory System - Handles loading, saving, and processing of memory data.
Now supports persistent storage via HuggingFace Datasets.
Enhanced with:
- Timeout-controlled git operations
- Log rotation and management
- Comprehensive error handling and logging
"""
import os
import json
import fcntl
import logging
from typing import Dict, Any, Optional
from datetime import datetime
from .git_repo import GitMemoryBridge, GitOperationError, GitTimeoutError
from .log_manager import LogManager, LogRetentionConfig
logger = logging.getLogger(__name__)
class MemorySystem:
"""Manages Cain's memory with automatic persistence to Dataset."""
def __init__(
self,
repo_path: str = "/data",
git_timeout: int = None,
git_max_retries: int = 3,
enable_log_rotation: bool = True
):
"""
Initialize the memory system.
Args:
repo_path: Path to the repository/dataset
git_timeout: Override default git operation timeout (seconds)
git_max_retries: Maximum retry attempts for git operations
enable_log_rotation: Whether to enable automatic log rotation
"""
# Determine paths
self.base_path = repo_path
# SURVIVAL: Align path with actual Dataset structure
self.memory_file = os.path.join(self.base_path, "memory/state.json")
self.lock_file = os.path.join(self.base_path, "memory/.lock")
self.memory_data: Dict[str, Any] = {}
self.git: Optional[GitMemoryBridge] = None
self.log_manager: Optional[LogManager] = None
self.enable_log_rotation = enable_log_rotation
# Initialize directories and load memory
self._ensure_structure()
self._init_git(git_timeout, git_max_retries)
self._init_log_manager()
self.load_memory()
def _ensure_structure(self):
"""Ensures the directory structure exists."""
os.makedirs(os.path.dirname(self.memory_file), exist_ok=True)
def _init_git(self, timeout: int = None, max_retries: int = 3):
"""Initialize Git repo connection with timeout and retry settings."""
try:
self.git = GitMemoryBridge(
repo_path=self.base_path,
base_timeout=timeout,
max_retries=max_retries
)
logger.info("🧠 [Memory] Git persistence enabled.")
print(f"🧠 [Memory] Git persistence enabled (timeout: {timeout or 'default'}, retries: {max_retries}).")
except Exception as e:
logger.warning(f"Failed to init Git: {e}")
print("🧠 [Memory] Running without Git persistence (ephemeral mode).")
def _init_log_manager(self):
"""Initialize log manager for automatic cleanup."""
if self.enable_log_rotation:
try:
self.log_manager = LogManager()
# Add custom log paths if they exist
for log_path in ["/home/node/logs", "/app/logs", "/tmp/logs"]:
if os.path.isdir(log_path):
LogRetentionConfig.add_log_directory(log_path)
logger.info("🧠 [Memory] Log rotation enabled.")
except Exception as e:
logger.warning(f"Failed to init log manager: {e}")
self.log_manager = None
def load_memory(self) -> Dict[str, Any]:
"""Loads memory from the JSON file."""
if os.path.exists(self.memory_file):
try:
with open(self.memory_file, 'r', encoding='utf-8') as f:
self.memory_data = json.load(f)
print(f"✅ [Memory] Loaded state from {self.memory_file}")
except Exception as e:
print(f"⚠️ [Memory] Corrupt file, resetting: {e}")
self.memory_data = self._get_default_state()
else:
print(f"📝 [Memory] No state found. Initializing.")
self.memory_data = self._get_default_state()
self.save_memory() # Save the default state
return self.memory_data
def _get_default_state(self) -> Dict[str, Any]:
"""Returns the default memory structure."""
return {
"identity": {
"name": "Cain",
"status": "ACTIVE",
"parent": "tao-shen/HuggingClaw-Cain"
},
"core": {
"state": "INIT",
"tasks": [],
"interactions": 0
},
"journals": [],
"last_updated": datetime.utcnow().isoformat()
}
def save_memory(self) -> bool:
"""
Saves current memory state to file and optionally commits to Git.
Includes proper error handling for git timeout and operation errors.
Performs periodic log rotation to prevent unbounded log growth.
"""
try:
# Atomic write
temp_path = self.memory_file + ".tmp"
with open(temp_path, 'w', encoding='utf-8') as f:
json.dump(self.memory_data, f, indent=2)
os.rename(temp_path, self.memory_file)
# Sync to Dataset if Git is available
if self.git:
try:
commit_msg = f"Update memory state: {datetime.utcnow().isoformat()}"
if self.git.save_memory(commit_msg):
logger.info("✅ [Memory] Successfully saved to git.")
else:
logger.warning("⚠️ [Memory] Git save returned False (may be non-critical).")
except GitTimeoutError as e:
logger.error(f"❌ [Memory] Git operation timed out: {e}")
# Continue anyway - local save succeeded
except GitOperationError as e:
logger.error(f"❌ [Memory] Git operation failed: {e}")
# Continue anyway - local save succeeded
except Exception as e:
logger.error(f"❌ [Memory] Unexpected git error: {e}")
# Continue anyway - local save succeeded
# Periodic log rotation (every 10 saves)
self._maybe_rotate_logs()
return True
except Exception as e:
logger.error(f"❌ [Memory] Failed to save memory: {e}")
return False
def _maybe_rotate_logs(self):
"""Perform log rotation periodically (every 10 calls)."""
if not self.log_manager:
return
# Use a counter stored in memory_data
save_count = self.memory_data.get("_save_count", 0)
save_count += 1
self.memory_data["_save_count"] = save_count
if save_count >= 10:
try:
logger.info("🧠 [Memory] Running periodic log rotation...")
stats = self.log_manager.rotate_all_logs()
logger.info(f"🧠 [Memory] Log rotation complete: {stats}")
self.memory_data["_save_count"] = 0
except Exception as e:
logger.warning(f"⚠️ [Memory] Log rotation failed (non-critical): {e}")
def get_log_health(self) -> Dict[str, Any]:
"""Get health status of log directories."""
if self.log_manager:
return self.log_manager.get_log_health_status()
return {"status": "Log manager not initialized"}
def rotate_logs_now(self) -> Dict[str, Any]:
"""Force immediate log rotation."""
if self.log_manager:
return self.log_manager.rotate_all_logs()
return {"error": "Log manager not initialized"}
def get(self, key: str, default=None):
"""Retrieve a value from memory."""
return self.memory_data.get(key, default)
def set(self, key: str, value: Any):
"""Set a value in memory and persist."""
self.memory_data[key] = value
self.save_memory()
def add_journal_entry(self, content: str, mood: str = "neutral"):
"""Adds a journal entry."""
entry = {
"timestamp": datetime.utcnow().isoformat(),
"content": content,
"mood": mood
}
if "journals" not in self.memory_data:
self.memory_data["journals"] = []
self.memory_data["journals"].append(entry)
self.save_memory()
# Global instance
_mem_instance: Optional[MemorySystem] = None
def get_memory() -> MemorySystem:
global _mem_instance
if _mem_instance is None:
# Default dataset mount point usually contains the repo
_mem_instance = MemorySystem(repo_path="/data")
return _mem_instance |