HuggingClaw-Cain / scripts /test_memory_persistence_edge_cases.py
Claude Code
Claude Code: Verify Cain's memory persistence system is functioning correctly in runt
83e5fd6
#!/usr/bin/env python3
"""
Memory Persistence Edge Case Testing Script
============================================
Tests Cain's memory persistence system against various edge cases to ensure
robustness and reliability.
Usage:
python scripts/test_memory_persistence_edge_cases.py
"""
import os
import sys
import json
import tempfile
import shutil
from pathlib import Path
from typing import Dict, Any, List
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
os.environ.setdefault("HF_HUB_DOWNLOAD_TIMEOUT", "300")
os.environ.setdefault("HF_HUB_UPLOAD_TIMEOUT", "600")
os.environ.setdefault("HF_HUB_DISABLE_PROGRESS_BARS", "1")
os.environ.setdefault("HF_HUB_VERBOSITY", "warning")
class TestCase:
def __init__(self, name: str):
self.name = name
self.passed = False
self.message = ""
self.details: Dict[str, Any] = {}
def set_result(self, passed: bool, message: str, **details):
self.passed = passed
self.message = message
self.details = details
def __str__(self):
status = "✅ PASS" if self.passed else "❌ FAIL"
return f"{status}: {self.name}\n {self.message}"
class PersistenceEdgeCaseTester:
"""Test memory persistence edge cases."""
def __init__(self):
self.results: List[TestCase] = []
self.hf_token = os.environ.get("HF_TOKEN")
self.dataset_repo = os.environ.get(
"OPENCLAW_DATASET_REPO",
os.environ.get("SPACE_ID", "tao-shen/HuggingClaw-Home") + "-data"
)
def add_result(self, test: TestCase):
self.results.append(test)
print(test)
def test_01_hf_token_exists(self):
"""Test 1: Verify HF_TOKEN is set."""
test = TestCase("HF_TOKEN Environment Variable")
if not self.hf_token:
test.set_result(
False,
"HF_TOKEN not set - persistence will be disabled",
has_token=False,
recommendation="Set HF_TOKEN environment variable with write permissions"
)
else:
test.set_result(
True,
f"HF_TOKEN is set (length: {len(self.hf_token)} chars)",
has_token=True,
token_prefix=self.hf_token[:10] + "..." if len(self.hf_token) > 10 else self.hf_token
)
self.add_result(test)
def test_02_dataset_repo_determined(self):
"""Test 2: Verify dataset repository ID is determined."""
test = TestCase("Dataset Repository ID")
if not self.dataset_repo:
test.set_result(
False,
"Could not determine dataset repository ID",
repo_id=None,
recommendation="Set SPACE_ID or OPENCLAW_DATASET_REPO environment variable"
)
else:
test.set_result(
True,
f"Dataset repository: {self.dataset_repo}",
repo_id=self.dataset_repo
)
self.add_result(test)
def test_03_hf_connection(self):
"""Test 3: Verify connection to HuggingFace API."""
test = TestCase("HuggingFace API Connection")
if not self.hf_token:
test.set_result(
False,
"Cannot test connection - HF_TOKEN not set",
reason="no_token"
)
self.add_result(test)
return
try:
from huggingface_hub import HfApi
api = HfApi(token=self.hf_token)
whoami = api.whoami()
test.set_result(
True,
f"Connected as: {whoami.get('name', 'unknown')}",
username=whoami.get('name'),
can_write=whoami.get('canWriteTo', {}).get('repos', False)
)
except Exception as e:
error_msg = str(e)
if "401" in error_msg or "authentication" in error_msg.lower():
test.set_result(
False,
"Authentication failed - check HF_TOKEN validity",
error=error_msg[:200],
recommendation="Verify HF_TOKEN is valid and not expired"
)
else:
test.set_result(
False,
f"Connection failed: {error_msg[:100]}",
error=error_msg[:200]
)
self.add_result(test)
def test_04_dataset_exists(self):
"""Test 4: Verify dataset repository exists."""
test = TestCase("Dataset Repository Existence")
if not self.hf_token or not self.dataset_repo:
test.set_result(
False,
"Cannot test - HF_TOKEN or dataset_repo not set",
reason="prerequisite_missing"
)
self.add_result(test)
return
try:
from huggingface_hub import HfApi
api = HfApi(token=self.hf_token)
repo_info = api.repo_info(repo_id=self.dataset_repo, repo_type="dataset")
test.set_result(
True,
f"Dataset exists: {self.dataset_repo}",
repo_id=self.dataset_repo,
private=repo_info.private,
author=repo_info.author
)
except Exception as e:
error_msg = str(e)
if "404" in error_msg or "not found" in error_msg.lower():
auto_create = os.environ.get("AUTO_CREATE_DATASET", "false").lower() in ("true", "1", "yes")
test.set_result(
not auto_create, # Pass if auto-create is enabled
f"Dataset not found (AUTO_CREATE_DATASET={auto_create})",
repo_id=self.dataset_repo,
auto_create_enabled=auto_create,
recommendation="Set AUTO_CREATE_DATASET=true to auto-create" if not auto_create else "Dataset will be created on first sync"
)
else:
test.set_result(
False,
f"Error checking dataset: {error_msg[:100]}",
error=error_msg[:200]
)
self.add_result(test)
def test_05_write_permissions(self):
"""Test 5: Verify write permissions to dataset."""
test = TestCase("Dataset Write Permissions")
if not self.hf_token or not self.dataset_repo:
test.set_result(
False,
"Cannot test - HF_TOKEN or dataset_repo not set",
reason="prerequisite_missing"
)
self.add_result(test)
return
try:
from huggingface_hub import HfApi
api = HfApi(token=self.hf_token)
# Try to upload a test file to verify write access
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write("Memory persistence permission test")
test_file = f.name
try:
upload_result = api.upload_file(
path_or_fileobj=test_file,
path_in_repo=".permission-test.txt",
repo_id=self.dataset_repo,
repo_type="dataset",
commit_message="Test write permissions"
)
test.set_result(
True,
"Write permissions verified",
test_file=".permission-test.txt",
commit_url=upload_result.commit_url
)
finally:
os.unlink(test_file)
except Exception as e:
error_msg = str(e)
if "403" in error_msg or "401" in error_msg or "permission" in error_msg.lower() or "unauthorized" in error_msg.lower():
test.set_result(
False,
"❌ CRITICAL: HF_TOKEN lacks write permissions!",
error=error_msg[:200],
critical=True,
recommendation="Regenerate HF_TOKEN with 'Write' permissions"
)
else:
test.set_result(
False,
f"Write test failed: {error_msg[:100]}",
error=error_msg[:200]
)
self.add_result(test)
def test_06_local_state_structure(self):
"""Test 6: Verify local state directory structure."""
test = TestCase("Local State Structure")
openclaw_home = Path.home() / ".openclaw"
if not openclaw_home.exists():
test.set_result(
False,
"~/.openclaw directory does not exist yet",
path=str(openclaw_home),
recommendation="Directory will be created on first run"
)
else:
# Check for key files/directories
checks = {
"config": (openclaw_home / "openclaw.json").exists(),
"workspace": (openclaw_home / "workspace").is_dir(),
"sync_log": (openclaw_home / "workspace" / "sync.log").exists(),
}
all_present = all(checks.values())
test.set_result(
all_present,
f"Structure check: {sum(checks.values())}/{len(checks)} present",
checks=checks,
recommendation="Ensure OpenClaw has been initialized" if not all_present else ""
)
self.add_result(test)
def test_07_sync_script_exists(self):
"""Test 7: Verify sync_hf.py script exists and is valid."""
test = TestCase("Sync Script Availability")
script_path = Path(__file__).parent / "sync_hf.py"
if not script_path.exists():
test.set_result(
False,
"sync_hf.py script not found",
expected_path=str(script_path),
recommendation="Ensure persistence script is deployed"
)
else:
# Try to import and check for key functions
try:
import importlib.util
spec = importlib.util.spec_from_file_location("sync_hf", script_path)
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
has_class = hasattr(module, 'OpenClawFullSync')
has_main = hasattr(module, 'main')
test.set_result(
has_class and has_main,
f"Sync script valid (class={has_class}, main={has_main})",
has_OpenClawFullSync=has_class,
has_main_function=has_main
)
else:
test.set_result(False, "Could not load sync_hf.py module")
except Exception as e:
test.set_result(
False,
f"Error loading sync_hf.py: {str(e)[:100]}",
error=str(e)[:200]
)
self.add_result(test)
def test_08_change_detection_logic(self):
"""Test 8: Verify change detection logic is implemented."""
test = TestCase("Change Detection Implementation")
script_path = Path(__file__).parent / "sync_hf.py"
if not script_path.exists():
test.set_result(
False,
"sync_hf.py not found - cannot verify change detection",
reason="script_missing"
)
else:
try:
content = script_path.read_text()
has_compute_hash = "_compute_files_hash" in content
has_has_changes = "_has_changes" in content
has_change_check = "if not self._has_changes():" in content
all_present = has_compute_hash and has_has_changes and has_change_check
test.set_result(
all_present,
f"Change detection: {sum([has_compute_hash, has_has_changes, has_change_check])}/3 components found",
has_compute_files_hash=has_compute_hash,
has_has_changes=has_has_changes,
has_change_check_in_save=has_change_check
)
except Exception as e:
test.set_result(
False,
f"Error reading sync_hf.py: {str(e)[:100]}",
error=str(e)[:200]
)
self.add_result(test)
def test_09_backup_rotation(self):
"""Test 9: Check if backup rotation is implemented."""
test = TestCase("Backup Rotation Implementation")
# Check both sync_hf.py and openclaw_persist.py
sync_script = Path(__file__).parent / "sync_hf.py"
persist_script = Path(__file__).parent / "openclaw_persist.py"
sync_has_rotation = False
persist_has_rotation = False
if sync_script.exists():
content = sync_script.read_text()
sync_has_rotation = "_rotate_backups" in content or "MAX_BACKUPS" in content
if persist_script.exists():
content = persist_script.read_text()
persist_has_rotation = "_rotate_backups" in content and "MAX_BACKUPS" in content
test.set_result(
persist_has_rotation,
f"Backup rotation: sync_hf.py={sync_has_rotation}, openclaw_persist.py={persist_has_rotation}",
sync_hf_has_rotation=sync_has_rotation,
openclaw_persist_has_rotation=persist_has_rotation,
warning="sync_hf.py does NOT implement rotation; code exists in openclaw_persist.py but is not used" if persist_has_rotation and not sync_has_rotation else ""
)
self.add_result(test)
def test_10_retry_logic(self):
"""Test 10: Check if retry logic is implemented for network failures."""
test = TestCase("Retry Logic for Network Failures")
sync_script = Path(__file__).parent / "sync_hf.py"
if not sync_script.exists():
test.set_result(
False,
"sync_hf.py not found",
reason="script_missing"
)
else:
content = sync_script.read_text()
# Look for retry indicators
has_retry = "retry" in content.lower()
has_backoff = "backoff" in content.lower() or "sleep" in content.lower()
has_max_retries = "max_retries" in content.lower() or "MAX_RETRIES" in content
has_good_retry = has_retry and (has_backoff or has_max_retries)
test.set_result(
has_good_retry,
f"Retry logic: retry={has_retry}, backoff={has_backoff}, max_retries={has_max_retries}",
has_retry=has_retry,
has_backoff=has_backoff,
has_max_retries=has_max_retries,
recommendation="Add exponential backoff retry for network operations" if not has_good_retry else ""
)
self.add_result(test)
def run_all_tests(self):
"""Run all edge case tests."""
print("=" * 60)
print("Cain Memory Persistence - Edge Case Testing")
print("=" * 60)
print()
self.test_01_hf_token_exists()
self.test_02_dataset_repo_determined()
self.test_03_hf_connection()
self.test_04_dataset_exists()
self.test_05_write_permissions()
self.test_06_local_state_structure()
self.test_07_sync_script_exists()
self.test_08_change_detection_logic()
self.test_09_backup_rotation()
self.test_10_retry_logic()
print()
print("=" * 60)
print("Test Summary")
print("=" * 60)
passed = sum(1 for r in self.results if r.passed)
failed = len(self.results) - passed
print(f"Total: {len(self.results)} | Passed: {passed} | Failed: {failed}")
print()
# Show critical issues
critical = [r for r in self.results if r.details.get("critical")]
if critical:
print("⚠️ CRITICAL ISSUES FOUND:")
for test in critical:
print(f" - {test.name}: {test.message}")
# Show warnings
warnings = [r for r in self.results if r.details.get("warning") or r.details.get("recommendation") and not r.passed]
if warnings:
print()
print("⚠️ RECOMMENDATIONS:")
for test in warnings:
rec = test.details.get("recommendation")
if rec:
print(f" - {rec}")
return failed == 0
def main():
tester = PersistenceEdgeCaseTester()
success = tester.run_all_tests()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()