general-reasoning-agent / test_persistence.py
chmielvu's picture
feat: add production refinements (Phase 1-3)
4454066 verified
"""
Test workflow persistence functionality
"""
import os
import shutil
import time
from workflows import WorkflowTask, WorkflowDefinition, WorkflowStore
def test_persistence():
"""Test WorkflowStore save, load, list, and delete operations"""
# Use a test-specific cache directory
test_cache = "./test_workflow_cache"
# Clean up any existing test cache
if os.path.exists(test_cache):
shutil.rmtree(test_cache)
print("=" * 60)
print("Testing WorkflowStore Persistence")
print("=" * 60)
# Step 1: Create WorkflowStore instance
print("\n[1] Creating WorkflowStore...")
store = WorkflowStore(store_path=test_cache)
assert os.path.exists(test_cache), "Cache directory not created"
print(f"[PASS] Store created at: {test_cache}")
# Step 2: Create sample workflow
print("\n[2] Creating sample workflow...")
workflow = WorkflowDefinition(
name="test_workflow",
description="Test workflow for persistence",
tasks=[
WorkflowTask(
id="task_1",
tool="research",
args={"query": "test query"},
depends_on=[]
),
WorkflowTask(
id="task_2",
tool="analyze",
args={"data": "${task_1}"},
depends_on=["task_1"]
)
],
final_task="task_2",
max_parallel=2,
timeout_seconds=300
)
print(f"[PASS] Workflow created: {workflow.name}")
print(f" - Tasks: {len(workflow.tasks)}")
print(f" - Final task: {workflow.final_task}")
# Step 3: Create sample result
print("\n[3] Creating sample execution result...")
result = {
"success": True,
"result": {"output": "test output"},
"execution_time": 1.23,
"trace": [
{"task_id": "task_1", "status": "completed"},
{"task_id": "task_2", "status": "completed"}
],
"all_results": {
"task_1": {"data": "result 1"},
"task_2": {"data": "result 2"}
}
}
print(f"[PASS] Result created")
print(f" - Success: {result['success']}")
print(f" - Execution time: {result['execution_time']}s")
# Step 4: Save workflow
print("\n[4] Saving workflow...")
workflow_id = f"test_workflow_{int(time.time())}"
store.save_workflow(workflow_id, workflow, result)
# Verify file exists
expected_path = os.path.join(test_cache, f"{workflow_id}.json")
assert os.path.exists(expected_path), "Workflow file not created"
print(f"[PASS] Workflow saved: {workflow_id}")
print(f" - File: {expected_path}")
# Step 5: List workflows
print("\n[5] Listing workflows...")
workflow_ids = store.list_workflows()
assert len(workflow_ids) == 1, f"Expected 1 workflow, found {len(workflow_ids)}"
assert workflow_id in workflow_ids, "Saved workflow not in list"
print(f"[PASS] Found {len(workflow_ids)} workflow(s):")
for wf_id in workflow_ids:
print(f" - {wf_id}")
# Step 6: Load workflow
print("\n[6] Loading workflow...")
loaded_data = store.load_workflow(workflow_id)
assert loaded_data is not None, "Failed to load workflow"
print(f"[PASS] Workflow loaded: {loaded_data['workflow_id']}")
# Step 7: Verify loaded data matches original
print("\n[7] Verifying loaded data...")
assert loaded_data["workflow_id"] == workflow_id, "Workflow ID mismatch"
assert loaded_data["workflow"]["name"] == workflow.name, "Workflow name mismatch"
assert loaded_data["result"]["success"] == result["success"], "Result success mismatch"
assert loaded_data["result"]["execution_time"] == result["execution_time"], "Execution time mismatch"
print("[PASS] Data verification passed:")
print(f" - Workflow ID: {loaded_data['workflow_id']}")
print(f" - Workflow name: {loaded_data['workflow']['name']}")
print(f" - Tasks: {len(loaded_data['workflow']['tasks'])}")
print(f" - Result success: {loaded_data['result']['success']}")
# Step 8: Test loading non-existent workflow
print("\n[8] Testing non-existent workflow...")
missing = store.load_workflow("non_existent_workflow")
assert missing is None, "Should return None for non-existent workflow"
print("[PASS] Correctly returns None for non-existent workflow")
# Step 9: Test delete workflow
print("\n[9] Testing workflow deletion...")
deleted = store.delete_workflow(workflow_id)
assert deleted is True, "Delete should return True"
assert not os.path.exists(expected_path), "File should be deleted"
workflow_ids = store.list_workflows()
assert len(workflow_ids) == 0, "Workflow list should be empty after deletion"
print(f"[PASS] Workflow deleted successfully")
# Step 10: Test delete non-existent workflow
print("\n[10] Testing deletion of non-existent workflow...")
deleted = store.delete_workflow("non_existent_workflow")
assert deleted is False, "Delete should return False for non-existent workflow"
print("[PASS] Correctly returns False for non-existent workflow")
# Cleanup
print("\n[Cleanup] Removing test cache...")
shutil.rmtree(test_cache)
print("[PASS] Test cache removed")
print("\n" + "=" * 60)
print("[SUCCESS] ALL TESTS PASSED")
print("=" * 60)
if __name__ == "__main__":
test_persistence()