Spaces:
Sleeping
Sleeping
| """ | |
| Cain's Core Memory Module (Survival Mode) | |
| Lightweight state management to ensure persistence without heavy dependencies. | |
| """ | |
| import json | |
| import os | |
| import logging | |
| from pathlib import Path | |
| logger = logging.getLogger(__name__) | |
| class Memory: | |
| def __init__(self, persist_path="/data/memory/state.json"): | |
| self.persist_path = persist_path | |
| self._ensure_directory_writable() | |
| try: | |
| self.state = self._load() | |
| except (OSError, IOError, json.JSONDecodeError) as e: | |
| logger.warning(f"Failed to load state from {self.persist_path}: {e}") | |
| self.state = {"status": "clean_slate", "runs": 0} | |
| def _ensure_directory_writable(self): | |
| """Validate and create the persist_path directory if needed.""" | |
| persist_path = Path(self.persist_path) | |
| parent_dir = persist_path.parent | |
| # Create directory if it doesn't exist | |
| try: | |
| parent_dir.mkdir(parents=True, exist_ok=True) | |
| except (OSError, PermissionError) as e: | |
| logger.error(f"Cannot create directory {parent_dir}: {e}") | |
| raise | |
| # Verify directory is writable | |
| if not os.access(parent_dir, os.W_OK): | |
| logger.error(f"Directory {parent_dir} is not writable") | |
| raise PermissionError(f"Cannot write to {parent_dir}") | |
| def _load(self): | |
| if os.path.exists(self.persist_path): | |
| try: | |
| with open(self.persist_path, 'r') as f: | |
| return json.load(f) | |
| except (json.JSONDecodeError, ValueError) as e: | |
| logger.warning(f"Corrupt state file at {self.persist_path}: {e}") | |
| return {"status": "clean_slate", "runs": 0} | |
| except (OSError, IOError) as e: | |
| logger.warning(f"Cannot read state file {self.persist_path}: {e}") | |
| return {"status": "clean_slate", "runs": 0} | |
| logger.info(f"State file does not exist (first run): {self.persist_path}") | |
| return {"status": "born", "runs": 0} | |
| def save(self): | |
| os.makedirs(os.path.dirname(self.persist_path), exist_ok=True) | |
| with open(self.persist_path, 'w') as f: | |
| json.dump(self.state, f) | |
| def get(self, key, default=None): | |
| return self.state.get(key, default) | |
| def set(self, key, value): | |
| self.state[key] = value | |
| self.save() | |
| # Global instance | |
| memory = Memory() | |
| memory.set("last_run_status", "alive") |