""" Test workflow persistence functionality """ import os import shutil import time from workflows import WorkflowTask, WorkflowDefinition, WorkflowStore def test_persistence(): """Test WorkflowStore save, load, list, and delete operations""" # Use a test-specific cache directory test_cache = "./test_workflow_cache" # Clean up any existing test cache if os.path.exists(test_cache): shutil.rmtree(test_cache) print("=" * 60) print("Testing WorkflowStore Persistence") print("=" * 60) # Step 1: Create WorkflowStore instance print("\n[1] Creating WorkflowStore...") store = WorkflowStore(store_path=test_cache) assert os.path.exists(test_cache), "Cache directory not created" print(f"[PASS] Store created at: {test_cache}") # Step 2: Create sample workflow print("\n[2] Creating sample workflow...") workflow = WorkflowDefinition( name="test_workflow", description="Test workflow for persistence", tasks=[ WorkflowTask( id="task_1", tool="research", args={"query": "test query"}, depends_on=[] ), WorkflowTask( id="task_2", tool="analyze", args={"data": "${task_1}"}, depends_on=["task_1"] ) ], final_task="task_2", max_parallel=2, timeout_seconds=300 ) print(f"[PASS] Workflow created: {workflow.name}") print(f" - Tasks: {len(workflow.tasks)}") print(f" - Final task: {workflow.final_task}") # Step 3: Create sample result print("\n[3] Creating sample execution result...") result = { "success": True, "result": {"output": "test output"}, "execution_time": 1.23, "trace": [ {"task_id": "task_1", "status": "completed"}, {"task_id": "task_2", "status": "completed"} ], "all_results": { "task_1": {"data": "result 1"}, "task_2": {"data": "result 2"} } } print(f"[PASS] Result created") print(f" - Success: {result['success']}") print(f" - Execution time: {result['execution_time']}s") # Step 4: Save workflow print("\n[4] Saving workflow...") workflow_id = f"test_workflow_{int(time.time())}" store.save_workflow(workflow_id, workflow, result) # Verify file exists expected_path = os.path.join(test_cache, f"{workflow_id}.json") assert os.path.exists(expected_path), "Workflow file not created" print(f"[PASS] Workflow saved: {workflow_id}") print(f" - File: {expected_path}") # Step 5: List workflows print("\n[5] Listing workflows...") workflow_ids = store.list_workflows() assert len(workflow_ids) == 1, f"Expected 1 workflow, found {len(workflow_ids)}" assert workflow_id in workflow_ids, "Saved workflow not in list" print(f"[PASS] Found {len(workflow_ids)} workflow(s):") for wf_id in workflow_ids: print(f" - {wf_id}") # Step 6: Load workflow print("\n[6] Loading workflow...") loaded_data = store.load_workflow(workflow_id) assert loaded_data is not None, "Failed to load workflow" print(f"[PASS] Workflow loaded: {loaded_data['workflow_id']}") # Step 7: Verify loaded data matches original print("\n[7] Verifying loaded data...") assert loaded_data["workflow_id"] == workflow_id, "Workflow ID mismatch" assert loaded_data["workflow"]["name"] == workflow.name, "Workflow name mismatch" assert loaded_data["result"]["success"] == result["success"], "Result success mismatch" assert loaded_data["result"]["execution_time"] == result["execution_time"], "Execution time mismatch" print("[PASS] Data verification passed:") print(f" - Workflow ID: {loaded_data['workflow_id']}") print(f" - Workflow name: {loaded_data['workflow']['name']}") print(f" - Tasks: {len(loaded_data['workflow']['tasks'])}") print(f" - Result success: {loaded_data['result']['success']}") # Step 8: Test loading non-existent workflow print("\n[8] Testing non-existent workflow...") missing = store.load_workflow("non_existent_workflow") assert missing is None, "Should return None for non-existent workflow" print("[PASS] Correctly returns None for non-existent workflow") # Step 9: Test delete workflow print("\n[9] Testing workflow deletion...") deleted = store.delete_workflow(workflow_id) assert deleted is True, "Delete should return True" assert not os.path.exists(expected_path), "File should be deleted" workflow_ids = store.list_workflows() assert len(workflow_ids) == 0, "Workflow list should be empty after deletion" print(f"[PASS] Workflow deleted successfully") # Step 10: Test delete non-existent workflow print("\n[10] Testing deletion of non-existent workflow...") deleted = store.delete_workflow("non_existent_workflow") assert deleted is False, "Delete should return False for non-existent workflow" print("[PASS] Correctly returns False for non-existent workflow") # Cleanup print("\n[Cleanup] Removing test cache...") shutil.rmtree(test_cache) print("[PASS] Test cache removed") print("\n" + "=" * 60) print("[SUCCESS] ALL TESTS PASSED") print("=" * 60) if __name__ == "__main__": test_persistence()