Spaces:
Sleeping
Sleeping
| """ | |
| Test workflow persistence functionality | |
| """ | |
| import os | |
| import shutil | |
| import time | |
| from workflows import WorkflowTask, WorkflowDefinition, WorkflowStore | |
| def test_persistence(): | |
| """Test WorkflowStore save, load, list, and delete operations""" | |
| # Use a test-specific cache directory | |
| test_cache = "./test_workflow_cache" | |
| # Clean up any existing test cache | |
| if os.path.exists(test_cache): | |
| shutil.rmtree(test_cache) | |
| print("=" * 60) | |
| print("Testing WorkflowStore Persistence") | |
| print("=" * 60) | |
| # Step 1: Create WorkflowStore instance | |
| print("\n[1] Creating WorkflowStore...") | |
| store = WorkflowStore(store_path=test_cache) | |
| assert os.path.exists(test_cache), "Cache directory not created" | |
| print(f"[PASS] Store created at: {test_cache}") | |
| # Step 2: Create sample workflow | |
| print("\n[2] Creating sample workflow...") | |
| workflow = WorkflowDefinition( | |
| name="test_workflow", | |
| description="Test workflow for persistence", | |
| tasks=[ | |
| WorkflowTask( | |
| id="task_1", | |
| tool="research", | |
| args={"query": "test query"}, | |
| depends_on=[] | |
| ), | |
| WorkflowTask( | |
| id="task_2", | |
| tool="analyze", | |
| args={"data": "${task_1}"}, | |
| depends_on=["task_1"] | |
| ) | |
| ], | |
| final_task="task_2", | |
| max_parallel=2, | |
| timeout_seconds=300 | |
| ) | |
| print(f"[PASS] Workflow created: {workflow.name}") | |
| print(f" - Tasks: {len(workflow.tasks)}") | |
| print(f" - Final task: {workflow.final_task}") | |
| # Step 3: Create sample result | |
| print("\n[3] Creating sample execution result...") | |
| result = { | |
| "success": True, | |
| "result": {"output": "test output"}, | |
| "execution_time": 1.23, | |
| "trace": [ | |
| {"task_id": "task_1", "status": "completed"}, | |
| {"task_id": "task_2", "status": "completed"} | |
| ], | |
| "all_results": { | |
| "task_1": {"data": "result 1"}, | |
| "task_2": {"data": "result 2"} | |
| } | |
| } | |
| print(f"[PASS] Result created") | |
| print(f" - Success: {result['success']}") | |
| print(f" - Execution time: {result['execution_time']}s") | |
| # Step 4: Save workflow | |
| print("\n[4] Saving workflow...") | |
| workflow_id = f"test_workflow_{int(time.time())}" | |
| store.save_workflow(workflow_id, workflow, result) | |
| # Verify file exists | |
| expected_path = os.path.join(test_cache, f"{workflow_id}.json") | |
| assert os.path.exists(expected_path), "Workflow file not created" | |
| print(f"[PASS] Workflow saved: {workflow_id}") | |
| print(f" - File: {expected_path}") | |
| # Step 5: List workflows | |
| print("\n[5] Listing workflows...") | |
| workflow_ids = store.list_workflows() | |
| assert len(workflow_ids) == 1, f"Expected 1 workflow, found {len(workflow_ids)}" | |
| assert workflow_id in workflow_ids, "Saved workflow not in list" | |
| print(f"[PASS] Found {len(workflow_ids)} workflow(s):") | |
| for wf_id in workflow_ids: | |
| print(f" - {wf_id}") | |
| # Step 6: Load workflow | |
| print("\n[6] Loading workflow...") | |
| loaded_data = store.load_workflow(workflow_id) | |
| assert loaded_data is not None, "Failed to load workflow" | |
| print(f"[PASS] Workflow loaded: {loaded_data['workflow_id']}") | |
| # Step 7: Verify loaded data matches original | |
| print("\n[7] Verifying loaded data...") | |
| assert loaded_data["workflow_id"] == workflow_id, "Workflow ID mismatch" | |
| assert loaded_data["workflow"]["name"] == workflow.name, "Workflow name mismatch" | |
| assert loaded_data["result"]["success"] == result["success"], "Result success mismatch" | |
| assert loaded_data["result"]["execution_time"] == result["execution_time"], "Execution time mismatch" | |
| print("[PASS] Data verification passed:") | |
| print(f" - Workflow ID: {loaded_data['workflow_id']}") | |
| print(f" - Workflow name: {loaded_data['workflow']['name']}") | |
| print(f" - Tasks: {len(loaded_data['workflow']['tasks'])}") | |
| print(f" - Result success: {loaded_data['result']['success']}") | |
| # Step 8: Test loading non-existent workflow | |
| print("\n[8] Testing non-existent workflow...") | |
| missing = store.load_workflow("non_existent_workflow") | |
| assert missing is None, "Should return None for non-existent workflow" | |
| print("[PASS] Correctly returns None for non-existent workflow") | |
| # Step 9: Test delete workflow | |
| print("\n[9] Testing workflow deletion...") | |
| deleted = store.delete_workflow(workflow_id) | |
| assert deleted is True, "Delete should return True" | |
| assert not os.path.exists(expected_path), "File should be deleted" | |
| workflow_ids = store.list_workflows() | |
| assert len(workflow_ids) == 0, "Workflow list should be empty after deletion" | |
| print(f"[PASS] Workflow deleted successfully") | |
| # Step 10: Test delete non-existent workflow | |
| print("\n[10] Testing deletion of non-existent workflow...") | |
| deleted = store.delete_workflow("non_existent_workflow") | |
| assert deleted is False, "Delete should return False for non-existent workflow" | |
| print("[PASS] Correctly returns False for non-existent workflow") | |
| # Cleanup | |
| print("\n[Cleanup] Removing test cache...") | |
| shutil.rmtree(test_cache) | |
| print("[PASS] Test cache removed") | |
| print("\n" + "=" * 60) | |
| print("[SUCCESS] ALL TESTS PASSED") | |
| print("=" * 60) | |
| if __name__ == "__main__": | |
| test_persistence() | |