Claude Code
Claude Code: Fix the two critical bugs identified in the audit:
a7c437d
"""
Cain's Core Memory Module (Survival Mode)
Lightweight state management to ensure persistence without heavy dependencies.
"""
import json
import os
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
class Memory:
def __init__(self, persist_path="/data/memory/state.json"):
self.persist_path = persist_path
self._ensure_directory_writable()
try:
self.state = self._load()
except (OSError, IOError, json.JSONDecodeError) as e:
logger.warning(f"Failed to load state from {self.persist_path}: {e}")
self.state = {"status": "clean_slate", "runs": 0}
def _ensure_directory_writable(self):
"""Validate and create the persist_path directory if needed."""
persist_path = Path(self.persist_path)
parent_dir = persist_path.parent
# Create directory if it doesn't exist
try:
parent_dir.mkdir(parents=True, exist_ok=True)
except (OSError, PermissionError) as e:
logger.error(f"Cannot create directory {parent_dir}: {e}")
raise
# Verify directory is writable
if not os.access(parent_dir, os.W_OK):
logger.error(f"Directory {parent_dir} is not writable")
raise PermissionError(f"Cannot write to {parent_dir}")
def _load(self):
if os.path.exists(self.persist_path):
try:
with open(self.persist_path, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, ValueError) as e:
logger.warning(f"Corrupt state file at {self.persist_path}: {e}")
return {"status": "clean_slate", "runs": 0}
except (OSError, IOError) as e:
logger.warning(f"Cannot read state file {self.persist_path}: {e}")
return {"status": "clean_slate", "runs": 0}
logger.info(f"State file does not exist (first run): {self.persist_path}")
return {"status": "born", "runs": 0}
def save(self):
os.makedirs(os.path.dirname(self.persist_path), exist_ok=True)
with open(self.persist_path, 'w') as f:
json.dump(self.state, f)
def get(self, key, default=None):
return self.state.get(key, default)
def set(self, key, value):
self.state[key] = value
self.save()
# Global instance
memory = Memory()
memory.set("last_run_status", "alive")