Spaces:
Sleeping
Sleeping
Claude Code
Claude Code: Verify Cain's memory persistence system is functioning correctly in runt
83e5fd6 | #!/usr/bin/env python3 | |
| """ | |
| Memory Persistence Edge Case Testing Script | |
| ============================================ | |
| Tests Cain's memory persistence system against various edge cases to ensure | |
| robustness and reliability. | |
| Usage: | |
| python scripts/test_memory_persistence_edge_cases.py | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import tempfile | |
| import shutil | |
| from pathlib import Path | |
| from typing import Dict, Any, List | |
| # Add parent directory to path | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| os.environ.setdefault("HF_HUB_DOWNLOAD_TIMEOUT", "300") | |
| os.environ.setdefault("HF_HUB_UPLOAD_TIMEOUT", "600") | |
| os.environ.setdefault("HF_HUB_DISABLE_PROGRESS_BARS", "1") | |
| os.environ.setdefault("HF_HUB_VERBOSITY", "warning") | |
| class TestCase: | |
| def __init__(self, name: str): | |
| self.name = name | |
| self.passed = False | |
| self.message = "" | |
| self.details: Dict[str, Any] = {} | |
| def set_result(self, passed: bool, message: str, **details): | |
| self.passed = passed | |
| self.message = message | |
| self.details = details | |
| def __str__(self): | |
| status = "✅ PASS" if self.passed else "❌ FAIL" | |
| return f"{status}: {self.name}\n {self.message}" | |
| class PersistenceEdgeCaseTester: | |
| """Test memory persistence edge cases.""" | |
| def __init__(self): | |
| self.results: List[TestCase] = [] | |
| self.hf_token = os.environ.get("HF_TOKEN") | |
| self.dataset_repo = os.environ.get( | |
| "OPENCLAW_DATASET_REPO", | |
| os.environ.get("SPACE_ID", "tao-shen/HuggingClaw-Home") + "-data" | |
| ) | |
| def add_result(self, test: TestCase): | |
| self.results.append(test) | |
| print(test) | |
| def test_01_hf_token_exists(self): | |
| """Test 1: Verify HF_TOKEN is set.""" | |
| test = TestCase("HF_TOKEN Environment Variable") | |
| if not self.hf_token: | |
| test.set_result( | |
| False, | |
| "HF_TOKEN not set - persistence will be disabled", | |
| has_token=False, | |
| recommendation="Set HF_TOKEN environment variable with write permissions" | |
| ) | |
| else: | |
| test.set_result( | |
| True, | |
| f"HF_TOKEN is set (length: {len(self.hf_token)} chars)", | |
| has_token=True, | |
| token_prefix=self.hf_token[:10] + "..." if len(self.hf_token) > 10 else self.hf_token | |
| ) | |
| self.add_result(test) | |
| def test_02_dataset_repo_determined(self): | |
| """Test 2: Verify dataset repository ID is determined.""" | |
| test = TestCase("Dataset Repository ID") | |
| if not self.dataset_repo: | |
| test.set_result( | |
| False, | |
| "Could not determine dataset repository ID", | |
| repo_id=None, | |
| recommendation="Set SPACE_ID or OPENCLAW_DATASET_REPO environment variable" | |
| ) | |
| else: | |
| test.set_result( | |
| True, | |
| f"Dataset repository: {self.dataset_repo}", | |
| repo_id=self.dataset_repo | |
| ) | |
| self.add_result(test) | |
| def test_03_hf_connection(self): | |
| """Test 3: Verify connection to HuggingFace API.""" | |
| test = TestCase("HuggingFace API Connection") | |
| if not self.hf_token: | |
| test.set_result( | |
| False, | |
| "Cannot test connection - HF_TOKEN not set", | |
| reason="no_token" | |
| ) | |
| self.add_result(test) | |
| return | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi(token=self.hf_token) | |
| whoami = api.whoami() | |
| test.set_result( | |
| True, | |
| f"Connected as: {whoami.get('name', 'unknown')}", | |
| username=whoami.get('name'), | |
| can_write=whoami.get('canWriteTo', {}).get('repos', False) | |
| ) | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "401" in error_msg or "authentication" in error_msg.lower(): | |
| test.set_result( | |
| False, | |
| "Authentication failed - check HF_TOKEN validity", | |
| error=error_msg[:200], | |
| recommendation="Verify HF_TOKEN is valid and not expired" | |
| ) | |
| else: | |
| test.set_result( | |
| False, | |
| f"Connection failed: {error_msg[:100]}", | |
| error=error_msg[:200] | |
| ) | |
| self.add_result(test) | |
| def test_04_dataset_exists(self): | |
| """Test 4: Verify dataset repository exists.""" | |
| test = TestCase("Dataset Repository Existence") | |
| if not self.hf_token or not self.dataset_repo: | |
| test.set_result( | |
| False, | |
| "Cannot test - HF_TOKEN or dataset_repo not set", | |
| reason="prerequisite_missing" | |
| ) | |
| self.add_result(test) | |
| return | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi(token=self.hf_token) | |
| repo_info = api.repo_info(repo_id=self.dataset_repo, repo_type="dataset") | |
| test.set_result( | |
| True, | |
| f"Dataset exists: {self.dataset_repo}", | |
| repo_id=self.dataset_repo, | |
| private=repo_info.private, | |
| author=repo_info.author | |
| ) | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "404" in error_msg or "not found" in error_msg.lower(): | |
| auto_create = os.environ.get("AUTO_CREATE_DATASET", "false").lower() in ("true", "1", "yes") | |
| test.set_result( | |
| not auto_create, # Pass if auto-create is enabled | |
| f"Dataset not found (AUTO_CREATE_DATASET={auto_create})", | |
| repo_id=self.dataset_repo, | |
| auto_create_enabled=auto_create, | |
| recommendation="Set AUTO_CREATE_DATASET=true to auto-create" if not auto_create else "Dataset will be created on first sync" | |
| ) | |
| else: | |
| test.set_result( | |
| False, | |
| f"Error checking dataset: {error_msg[:100]}", | |
| error=error_msg[:200] | |
| ) | |
| self.add_result(test) | |
| def test_05_write_permissions(self): | |
| """Test 5: Verify write permissions to dataset.""" | |
| test = TestCase("Dataset Write Permissions") | |
| if not self.hf_token or not self.dataset_repo: | |
| test.set_result( | |
| False, | |
| "Cannot test - HF_TOKEN or dataset_repo not set", | |
| reason="prerequisite_missing" | |
| ) | |
| self.add_result(test) | |
| return | |
| try: | |
| from huggingface_hub import HfApi | |
| api = HfApi(token=self.hf_token) | |
| # Try to upload a test file to verify write access | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: | |
| f.write("Memory persistence permission test") | |
| test_file = f.name | |
| try: | |
| upload_result = api.upload_file( | |
| path_or_fileobj=test_file, | |
| path_in_repo=".permission-test.txt", | |
| repo_id=self.dataset_repo, | |
| repo_type="dataset", | |
| commit_message="Test write permissions" | |
| ) | |
| test.set_result( | |
| True, | |
| "Write permissions verified", | |
| test_file=".permission-test.txt", | |
| commit_url=upload_result.commit_url | |
| ) | |
| finally: | |
| os.unlink(test_file) | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "403" in error_msg or "401" in error_msg or "permission" in error_msg.lower() or "unauthorized" in error_msg.lower(): | |
| test.set_result( | |
| False, | |
| "❌ CRITICAL: HF_TOKEN lacks write permissions!", | |
| error=error_msg[:200], | |
| critical=True, | |
| recommendation="Regenerate HF_TOKEN with 'Write' permissions" | |
| ) | |
| else: | |
| test.set_result( | |
| False, | |
| f"Write test failed: {error_msg[:100]}", | |
| error=error_msg[:200] | |
| ) | |
| self.add_result(test) | |
| def test_06_local_state_structure(self): | |
| """Test 6: Verify local state directory structure.""" | |
| test = TestCase("Local State Structure") | |
| openclaw_home = Path.home() / ".openclaw" | |
| if not openclaw_home.exists(): | |
| test.set_result( | |
| False, | |
| "~/.openclaw directory does not exist yet", | |
| path=str(openclaw_home), | |
| recommendation="Directory will be created on first run" | |
| ) | |
| else: | |
| # Check for key files/directories | |
| checks = { | |
| "config": (openclaw_home / "openclaw.json").exists(), | |
| "workspace": (openclaw_home / "workspace").is_dir(), | |
| "sync_log": (openclaw_home / "workspace" / "sync.log").exists(), | |
| } | |
| all_present = all(checks.values()) | |
| test.set_result( | |
| all_present, | |
| f"Structure check: {sum(checks.values())}/{len(checks)} present", | |
| checks=checks, | |
| recommendation="Ensure OpenClaw has been initialized" if not all_present else "" | |
| ) | |
| self.add_result(test) | |
| def test_07_sync_script_exists(self): | |
| """Test 7: Verify sync_hf.py script exists and is valid.""" | |
| test = TestCase("Sync Script Availability") | |
| script_path = Path(__file__).parent / "sync_hf.py" | |
| if not script_path.exists(): | |
| test.set_result( | |
| False, | |
| "sync_hf.py script not found", | |
| expected_path=str(script_path), | |
| recommendation="Ensure persistence script is deployed" | |
| ) | |
| else: | |
| # Try to import and check for key functions | |
| try: | |
| import importlib.util | |
| spec = importlib.util.spec_from_file_location("sync_hf", script_path) | |
| if spec and spec.loader: | |
| module = importlib.util.module_from_spec(spec) | |
| spec.loader.exec_module(module) | |
| has_class = hasattr(module, 'OpenClawFullSync') | |
| has_main = hasattr(module, 'main') | |
| test.set_result( | |
| has_class and has_main, | |
| f"Sync script valid (class={has_class}, main={has_main})", | |
| has_OpenClawFullSync=has_class, | |
| has_main_function=has_main | |
| ) | |
| else: | |
| test.set_result(False, "Could not load sync_hf.py module") | |
| except Exception as e: | |
| test.set_result( | |
| False, | |
| f"Error loading sync_hf.py: {str(e)[:100]}", | |
| error=str(e)[:200] | |
| ) | |
| self.add_result(test) | |
| def test_08_change_detection_logic(self): | |
| """Test 8: Verify change detection logic is implemented.""" | |
| test = TestCase("Change Detection Implementation") | |
| script_path = Path(__file__).parent / "sync_hf.py" | |
| if not script_path.exists(): | |
| test.set_result( | |
| False, | |
| "sync_hf.py not found - cannot verify change detection", | |
| reason="script_missing" | |
| ) | |
| else: | |
| try: | |
| content = script_path.read_text() | |
| has_compute_hash = "_compute_files_hash" in content | |
| has_has_changes = "_has_changes" in content | |
| has_change_check = "if not self._has_changes():" in content | |
| all_present = has_compute_hash and has_has_changes and has_change_check | |
| test.set_result( | |
| all_present, | |
| f"Change detection: {sum([has_compute_hash, has_has_changes, has_change_check])}/3 components found", | |
| has_compute_files_hash=has_compute_hash, | |
| has_has_changes=has_has_changes, | |
| has_change_check_in_save=has_change_check | |
| ) | |
| except Exception as e: | |
| test.set_result( | |
| False, | |
| f"Error reading sync_hf.py: {str(e)[:100]}", | |
| error=str(e)[:200] | |
| ) | |
| self.add_result(test) | |
| def test_09_backup_rotation(self): | |
| """Test 9: Check if backup rotation is implemented.""" | |
| test = TestCase("Backup Rotation Implementation") | |
| # Check both sync_hf.py and openclaw_persist.py | |
| sync_script = Path(__file__).parent / "sync_hf.py" | |
| persist_script = Path(__file__).parent / "openclaw_persist.py" | |
| sync_has_rotation = False | |
| persist_has_rotation = False | |
| if sync_script.exists(): | |
| content = sync_script.read_text() | |
| sync_has_rotation = "_rotate_backups" in content or "MAX_BACKUPS" in content | |
| if persist_script.exists(): | |
| content = persist_script.read_text() | |
| persist_has_rotation = "_rotate_backups" in content and "MAX_BACKUPS" in content | |
| test.set_result( | |
| persist_has_rotation, | |
| f"Backup rotation: sync_hf.py={sync_has_rotation}, openclaw_persist.py={persist_has_rotation}", | |
| sync_hf_has_rotation=sync_has_rotation, | |
| openclaw_persist_has_rotation=persist_has_rotation, | |
| warning="sync_hf.py does NOT implement rotation; code exists in openclaw_persist.py but is not used" if persist_has_rotation and not sync_has_rotation else "" | |
| ) | |
| self.add_result(test) | |
| def test_10_retry_logic(self): | |
| """Test 10: Check if retry logic is implemented for network failures.""" | |
| test = TestCase("Retry Logic for Network Failures") | |
| sync_script = Path(__file__).parent / "sync_hf.py" | |
| if not sync_script.exists(): | |
| test.set_result( | |
| False, | |
| "sync_hf.py not found", | |
| reason="script_missing" | |
| ) | |
| else: | |
| content = sync_script.read_text() | |
| # Look for retry indicators | |
| has_retry = "retry" in content.lower() | |
| has_backoff = "backoff" in content.lower() or "sleep" in content.lower() | |
| has_max_retries = "max_retries" in content.lower() or "MAX_RETRIES" in content | |
| has_good_retry = has_retry and (has_backoff or has_max_retries) | |
| test.set_result( | |
| has_good_retry, | |
| f"Retry logic: retry={has_retry}, backoff={has_backoff}, max_retries={has_max_retries}", | |
| has_retry=has_retry, | |
| has_backoff=has_backoff, | |
| has_max_retries=has_max_retries, | |
| recommendation="Add exponential backoff retry for network operations" if not has_good_retry else "" | |
| ) | |
| self.add_result(test) | |
| def run_all_tests(self): | |
| """Run all edge case tests.""" | |
| print("=" * 60) | |
| print("Cain Memory Persistence - Edge Case Testing") | |
| print("=" * 60) | |
| print() | |
| self.test_01_hf_token_exists() | |
| self.test_02_dataset_repo_determined() | |
| self.test_03_hf_connection() | |
| self.test_04_dataset_exists() | |
| self.test_05_write_permissions() | |
| self.test_06_local_state_structure() | |
| self.test_07_sync_script_exists() | |
| self.test_08_change_detection_logic() | |
| self.test_09_backup_rotation() | |
| self.test_10_retry_logic() | |
| print() | |
| print("=" * 60) | |
| print("Test Summary") | |
| print("=" * 60) | |
| passed = sum(1 for r in self.results if r.passed) | |
| failed = len(self.results) - passed | |
| print(f"Total: {len(self.results)} | Passed: {passed} | Failed: {failed}") | |
| print() | |
| # Show critical issues | |
| critical = [r for r in self.results if r.details.get("critical")] | |
| if critical: | |
| print("⚠️ CRITICAL ISSUES FOUND:") | |
| for test in critical: | |
| print(f" - {test.name}: {test.message}") | |
| # Show warnings | |
| warnings = [r for r in self.results if r.details.get("warning") or r.details.get("recommendation") and not r.passed] | |
| if warnings: | |
| print() | |
| print("⚠️ RECOMMENDATIONS:") | |
| for test in warnings: | |
| rec = test.details.get("recommendation") | |
| if rec: | |
| print(f" - {rec}") | |
| return failed == 0 | |
| def main(): | |
| tester = PersistenceEdgeCaseTester() | |
| success = tester.run_all_tests() | |
| sys.exit(0 if success else 1) | |
| if __name__ == "__main__": | |
| main() | |